You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/25 03:46:31 UTC

spark git commit: [SPARK-23957][SQL] Sorts in subqueries are redundant and can be removed

Repository: spark
Updated Branches:
  refs/heads/master d4c341589 -> afb062753


[SPARK-23957][SQL] Sorts in subqueries are redundant and can be removed

## What changes were proposed in this pull request?
Thanks to henryr for the original idea at https://github.com/apache/spark/pull/21049

Description from the original PR :
Subqueries (at least in SQL) have 'bag of tuples' semantics. Ordering
them is therefore redundant (unless combined with a limit).

This patch removes the top sort operators from the subquery plans.

This closes https://github.com/apache/spark/pull/21049.

## How was this patch tested?
Added test cases in SubquerySuite to cover in, exists and scalar subqueries.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Dilip Biswal <db...@us.ibm.com>

Closes #21853 from dilipbiswal/SPARK-23957.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afb06275
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afb06275
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afb06275

Branch: refs/heads/master
Commit: afb0627536494c654ce5dd72db648f1ee7da641c
Parents: d4c3415
Author: Dilip Biswal <db...@us.ibm.com>
Authored: Tue Jul 24 20:46:27 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Tue Jul 24 20:46:27 2018 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  12 +-
 .../org/apache/spark/sql/SubquerySuite.scala    | 300 ++++++++++++++++++-
 2 files changed, 310 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/afb06275/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 5ed7412..adb1350 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -180,10 +180,20 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
    * Optimize all the subqueries inside expression.
    */
   object OptimizeSubqueries extends Rule[LogicalPlan] {
+    private def removeTopLevelSort(plan: LogicalPlan): LogicalPlan = {
+      plan match {
+        case Sort(_, _, child) => child
+        case Project(fields, child) => Project(fields, removeTopLevelSort(child))
+        case other => other
+      }
+    }
     def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
       case s: SubqueryExpression =>
         val Subquery(newPlan) = Optimizer.this.execute(Subquery(s.plan))
-        s.withNewPlan(newPlan)
+        // At this point we have an optimized subquery plan that we are going to attach
+        // to this subquery expression. Here we can safely remove any top level sort
+        // in the plan as tuples produced by a subquery are un-ordered.
+        s.withNewPlan(removeTopLevelSort(newPlan))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/afb06275/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index acef62d..cbffed9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.plans.logical.Join
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
 import org.apache.spark.sql.test.SharedSQLContext
 
 class SubquerySuite extends QueryTest with SharedSQLContext {
@@ -970,4 +973,299 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
         Row("3", "b") :: Row("4", "b") :: Nil)
     }
   }
+
+  private def getNumSortsInQuery(query: String): Int = {
+    val plan = sql(query).queryExecution.optimizedPlan
+    getNumSorts(plan) + getSubqueryExpressions(plan).map{s => getNumSorts(s.plan)}.sum
+  }
+
+  private def getSubqueryExpressions(plan: LogicalPlan): Seq[SubqueryExpression] = {
+    val subqueryExpressions = ArrayBuffer.empty[SubqueryExpression]
+    plan transformAllExpressions {
+      case s: SubqueryExpression =>
+        subqueryExpressions ++= (getSubqueryExpressions(s.plan) :+ s)
+        s
+    }
+    subqueryExpressions
+  }
+
+  private def getNumSorts(plan: LogicalPlan): Int = {
+    plan.collect { case s: Sort => s }.size
+  }
+
+  test("SPARK-23957 Remove redundant sort from subquery plan(in subquery)") {
+    withTempView("t1", "t2", "t3") {
+      Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
+      Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
+      Seq((1, 1, 1), (2, 2, 2)).toDF("c1", "c2", "c3").createOrReplaceTempView("t3")
+
+      // Simple order by
+      val query1 =
+        """
+           |SELECT c1 FROM t1
+           |WHERE
+           |c1 IN (SELECT c1 FROM t2 ORDER BY c1)
+        """.stripMargin
+      assert(getNumSortsInQuery(query1) == 0)
+
+      // Nested order bys
+      val query2 =
+        """
+           |SELECT c1
+           |FROM   t1
+           |WHERE  c1 IN (SELECT c1
+           |              FROM   (SELECT *
+           |                      FROM   t2
+           |                      ORDER  BY c2)
+           |              ORDER  BY c1)
+        """.stripMargin
+      assert(getNumSortsInQuery(query2) == 0)
+
+
+      // nested IN
+      val query3 =
+        """
+           |SELECT c1
+           |FROM   t1
+           |WHERE  c1 IN (SELECT c1
+           |              FROM   t2
+           |              WHERE  c1 IN (SELECT c1
+           |                            FROM   t3
+           |                            WHERE  c1 = 1
+           |                            ORDER  BY c3)
+           |              ORDER  BY c2)
+        """.stripMargin
+      assert(getNumSortsInQuery(query3) == 0)
+
+      // Complex subplan and multiple sorts
+      val query4 =
+        """
+           |SELECT c1
+           |FROM   t1
+           |WHERE  c1 IN (SELECT c1
+           |              FROM   (SELECT c1, c2, count(*)
+           |                      FROM   t2
+           |                      GROUP BY c1, c2
+           |                      HAVING count(*) > 0
+           |                      ORDER BY c2)
+           |              ORDER  BY c1)
+        """.stripMargin
+      assert(getNumSortsInQuery(query4) == 0)
+
+      // Join in subplan
+      val query5 =
+        """
+           |SELECT c1 FROM t1
+           |WHERE
+           |c1 IN (SELECT t2.c1 FROM t2, t3
+           |       WHERE t2.c1 = t3.c1
+           |       ORDER BY t2.c1)
+        """.stripMargin
+      assert(getNumSortsInQuery(query5) == 0)
+
+      val query6 =
+        """
+           |SELECT c1
+           |FROM   t1
+           |WHERE  (c1, c2) IN (SELECT c1, max(c2)
+           |                    FROM   (SELECT c1, c2, count(*)
+           |                            FROM   t2
+           |                            GROUP BY c1, c2
+           |                            HAVING count(*) > 0
+           |                            ORDER BY c2)
+           |                    GROUP BY c1
+           |                    HAVING max(c2) > 0
+           |                    ORDER  BY c1)
+        """.stripMargin
+      // The rule to remove redundant sorts is not able to remove the inner sort under
+      // an Aggregate operator. We only remove the top level sort.
+      assert(getNumSortsInQuery(query6) == 1)
+
+      // Cases when sort is not removed from the plan
+      // Limit on top of sort
+      val query7 =
+        """
+           |SELECT c1 FROM t1
+           |WHERE
+           |c1 IN (SELECT c1 FROM t2 ORDER BY c1 limit 1)
+        """.stripMargin
+      assert(getNumSortsInQuery(query7) == 1)
+
+      // Sort below a set operations (intersect, union)
+      val query8 =
+        """
+           |SELECT c1 FROM t1
+           |WHERE
+           |c1 IN ((
+           |        SELECT c1 FROM t2
+           |        ORDER BY c1
+           |       )
+           |       UNION
+           |       (
+           |         SELECT c1 FROM t2
+           |         ORDER BY c1
+           |       ))
+        """.stripMargin
+      assert(getNumSortsInQuery(query8) == 2)
+    }
+  }
+
+  test("SPARK-23957 Remove redundant sort from subquery plan(exists subquery)") {
+    withTempView("t1", "t2", "t3") {
+      Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
+      Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
+      Seq((1, 1, 1), (2, 2, 2)).toDF("c1", "c2", "c3").createOrReplaceTempView("t3")
+
+      // Simple order by exists correlated
+      val query1 =
+        """
+           |SELECT c1 FROM t1
+           |WHERE
+           |EXISTS (SELECT t2.c1 FROM t2 WHERE t1.c1 = t2.c1 ORDER BY t2.c1)
+        """.stripMargin
+      assert(getNumSortsInQuery(query1) == 0)
+
+      // Nested order by and correlated.
+      val query2 =
+        """
+           |SELECT c1
+           |FROM   t1
+           |WHERE  EXISTS (SELECT c1
+           |               FROM (SELECT *
+           |                     FROM   t2
+           |                     WHERE t2.c1 = t1.c1
+           |                     ORDER  BY t2.c2) t2
+           |               ORDER BY t2.c1)
+        """.stripMargin
+      assert(getNumSortsInQuery(query2) == 0)
+
+      // nested EXISTS
+      val query3 =
+        """
+           |SELECT c1
+           |FROM   t1
+           |WHERE  EXISTS (SELECT c1
+           |               FROM t2
+           |               WHERE EXISTS (SELECT c1
+           |                             FROM   t3
+           |                             WHERE  t3.c1 = t2.c1
+           |                             ORDER  BY c3)
+           |               AND t2.c1 = t1.c1
+           |               ORDER BY c2)
+        """.stripMargin
+      assert(getNumSortsInQuery(query3) == 0)
+
+      // Cases when sort is not removed from the plan
+      // Limit on top of sort
+      val query4 =
+        """
+           |SELECT c1 FROM t1
+           |WHERE
+           |EXISTS (SELECT t2.c1 FROM t2 WHERE t2.c1 = 1 ORDER BY t2.c1 limit 1)
+        """.stripMargin
+      assert(getNumSortsInQuery(query4) == 1)
+
+      // Sort below a set operations (intersect, union)
+      val query5 =
+        """
+           |SELECT c1 FROM t1
+           |WHERE
+           |EXISTS ((
+           |        SELECT c1 FROM t2
+           |        WHERE t2.c1 = 1
+           |        ORDER BY t2.c1
+           |        )
+           |        UNION
+           |        (
+           |         SELECT c1 FROM t2
+           |         WHERE t2.c1 = 2
+           |         ORDER BY t2.c1
+           |        ))
+        """.stripMargin
+      assert(getNumSortsInQuery(query5) == 2)
+    }
+  }
+
+  test("SPARK-23957 Remove redundant sort from subquery plan(scalar subquery)") {
+    withTempView("t1", "t2", "t3") {
+      Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
+      Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
+      Seq((1, 1, 1), (2, 2, 2)).toDF("c1", "c2", "c3").createOrReplaceTempView("t3")
+
+      // Two scalar subqueries in OR
+      val query1 =
+        """
+          |SELECT * FROM t1
+          |WHERE  c1 = (SELECT max(t2.c1)
+          |             FROM   t2
+          |             ORDER BY max(t2.c1))
+          |OR     c2 = (SELECT min(t3.c2)
+          |             FROM   t3
+          |             WHERE  t3.c1 = 1
+          |             ORDER BY min(t3.c2))
+        """.stripMargin
+      assert(getNumSortsInQuery(query1) == 0)
+
+      // scalar subquery - groupby and having
+      val query2 =
+        """
+          |SELECT *
+          |FROM   t1
+          |WHERE  c1 = (SELECT   max(t2.c1)
+          |             FROM     t2
+          |             GROUP BY t2.c1
+          |             HAVING   count(*) >= 1
+          |             ORDER BY max(t2.c1))
+        """.stripMargin
+      assert(getNumSortsInQuery(query2) == 0)
+
+      // nested scalar subquery
+      val query3 =
+        """
+          |SELECT *
+          |FROM   t1
+          |WHERE  c1 = (SELECT   max(t2.c1)
+          |             FROM     t2
+          |             WHERE c1 = (SELECT max(t3.c1)
+          |                         FROM t3
+          |                         WHERE t3.c1 = 1
+          |                         GROUP BY t3.c1
+          |                         ORDER BY max(t3.c1)
+          |                        )
+          |              GROUP BY t2.c1
+          |              HAVING   count(*) >= 1
+          |              ORDER BY max(t2.c1))
+        """.stripMargin
+      assert(getNumSortsInQuery(query3) == 0)
+
+      // Scalar subquery in projection
+      val query4 =
+        """
+          |SELECT (SELECT min(c1) from t1 group by c1 order by c1)
+          |FROM t1
+          |WHERE t1.c1 = 1
+        """.stripMargin
+      assert(getNumSortsInQuery(query4) == 0)
+
+      // Limit on top of sort prevents it from being pruned.
+      val query5 =
+        """
+          |SELECT *
+          |FROM   t1
+          |WHERE  c1 = (SELECT   max(t2.c1)
+          |             FROM     t2
+          |             WHERE c1 = (SELECT max(t3.c1)
+          |                         FROM t3
+          |                         WHERE t3.c1 = 1
+          |                         GROUP BY t3.c1
+          |                         ORDER BY max(t3.c1)
+          |                         )
+          |             GROUP BY t2.c1
+          |             HAVING   count(*) >= 1
+          |             ORDER BY max(t2.c1)
+          |             LIMIT 1)
+        """.stripMargin
+      assert(getNumSortsInQuery(query5) == 1)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org