You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/03/27 15:45:46 UTC
[spark] branch master updated: [SPARK-27279][SQL] Reuse subquery
should compare child plan of `SubqueryExec`
This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f1fe805 [SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
f1fe805 is described below
commit f1fe805bed5e69c9c8055c83bf854d2d713e6466
Author: Daoyuan Wang <me...@daoyuan.wang>
AuthorDate: Wed Mar 27 08:45:22 2019 -0700
[SPARK-27279][SQL] Reuse subquery should compare child plan of `SubqueryExec`
## What changes were proposed in this pull request?
For now, `ReuseSubquery` in Spark compares two subqueries at `SubqueryExec` level, which invalidates the `ReuseSubquery` rule. This pull request fixes this, and add a configuration key for subquery reuse exclusively.
## How was this patch tested?
add a unit test.
Closes #24214 from adrian-wang/reuse.
Authored-by: Daoyuan Wang <me...@daoyuan.wang>
Signed-off-by: gatorsmile <ga...@gmail.com>
---
.../sql/execution/basicPhysicalOperators.scala | 2 ++
.../scala/org/apache/spark/sql/SQLQuerySuite.scala | 29 +++++++++++++++++++++-
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index eacd35b..731e7da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -674,6 +674,8 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+ override def doCanonicalize(): SparkPlan = child.canonicalized
+
@transient
private lazy val relationFuture: Future[Array[InternalRow]] = {
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index e8d1ecc..5916cbb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
-import org.apache.spark.sql.execution.aggregate
+import org.apache.spark.sql.execution.{aggregate, ScalarSubquery, SubqueryExec}
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
@@ -113,6 +113,33 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
+ test("Reuse Subquery") {
+ Seq(true, false).foreach { reuse =>
+ withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) {
+ val df = sql(
+ """
+ |SELECT (SELECT avg(key) FROM testData) + (SELECT avg(key) FROM testData)
+ |FROM testData
+ |LIMIT 1
+ """.stripMargin)
+
+ import scala.collection.mutable.ArrayBuffer
+ val subqueries = ArrayBuffer[SubqueryExec]()
+ df.queryExecution.executedPlan.transformAllExpressions {
+ case s @ ScalarSubquery(plan: SubqueryExec, _) =>
+ subqueries += plan
+ s
+ }
+
+ if (reuse) {
+ assert(subqueries.distinct.size == 1, "Subquery reusing not working correctly")
+ } else {
+ assert(subqueries.distinct.size == 2, "There should be 2 subqueries when not reusing")
+ }
+ }
+ }
+ }
+
test("SPARK-6743: no columns from cache") {
Seq(
(83, 0, 38),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org