You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/03/27 15:45:46 UTC

[spark] branch master updated: [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f1fe805  [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
f1fe805 is described below

commit f1fe805bed5e69c9c8055c83bf854d2d713e6466
Author: Daoyuan Wang <me...@daoyuan.wang>
AuthorDate: Wed Mar 27 08:45:22 2019 -0700

    [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
    
    ## What changes were proposed in this pull request?
    
    For now, `ReuseSubquery` in Spark compares two subqueries at `SubqueryExec` level, which invalidates the `ReuseSubquery` rule. This pull request fixes this, and add a configuration key for subquery reuse exclusively.
    
    ## How was this patch tested?
    
    add a unit test.
    
    Closes #24214 from adrian-wang/reuse.
    
    Authored-by: Daoyuan Wang <me...@daoyuan.wang>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../sql/execution/basicPhysicalOperators.scala     |  2 ++
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 29 +++++++++++++++++++++-
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index eacd35b..731e7da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -674,6 +674,8 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
   @transient
   private lazy val relationFuture: Future[Array[InternalRow]] = {
     // relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index e8d1ecc..5916cbb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import org.apache.spark.{AccumulatorSuite, SparkException}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql.catalyst.util.StringUtils
-import org.apache.spark.sql.execution.aggregate
+import org.apache.spark.sql.execution.{aggregate, ScalarSubquery, SubqueryExec}
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
 import org.apache.spark.sql.execution.datasources.FilePartition
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
@@ -113,6 +113,33 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("Reuse Subquery") {
+    Seq(true, false).foreach { reuse =>
+      withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) {
+        val df = sql(
+          """
+            |SELECT (SELECT avg(key) FROM testData) + (SELECT avg(key) FROM testData)
+            |FROM testData
+            |LIMIT 1
+          """.stripMargin)
+
+        import scala.collection.mutable.ArrayBuffer
+        val subqueries = ArrayBuffer[SubqueryExec]()
+        df.queryExecution.executedPlan.transformAllExpressions {
+          case s @ ScalarSubquery(plan: SubqueryExec, _) =>
+            subqueries += plan
+            s
+        }
+
+        if (reuse) {
+          assert(subqueries.distinct.size == 1, "Subquery reusing not working correctly")
+        } else {
+          assert(subqueries.distinct.size == 2, "There should be 2 subqueries when not reusing")
+        }
+      }
+    }
+  }
+
   test("SPARK-6743: no columns from cache") {
     Seq(
       (83, 0, 38),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org