You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/06/01 16:52:23 UTC
spark git commit: [SPARK-20941][SQL] Fix SubqueryExec Reuse
Repository: spark
Updated Branches:
refs/heads/master 0975019cd -> f7cf2096f
[SPARK-20941][SQL] Fix SubqueryExec Reuse
### What changes were proposed in this pull request?
Before this PR, Subquery reuse does not work. Below are three issues:
- Subquery reuse does not work.
- It is sharing the same `SQLConf` (`spark.sql.exchange.reuse`) with the one for Exchange Reuse.
- No test case covers the rule Subquery reuse.
This PR is to fix the above three issues.
- Ignored the physical operator `SubqueryExec` when comparing two plans.
- Added a dedicated conf `spark.sql.subqueries.reuse` for controlling Subquery Reuse
- Added a test case for verifying the behavior
### How was this patch tested?
N/A
Author: Xiao Li <ga...@gmail.com>
Closes #18169 from gatorsmile/subqueryReuse.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7cf2096
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7cf2096
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7cf2096
Branch: refs/heads/master
Commit: f7cf2096fdecb8edab61c8973c07c6fc877ee32d
Parents: 0975019
Author: Xiao Li <ga...@gmail.com>
Authored: Thu Jun 1 09:52:18 2017 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Thu Jun 1 09:52:18 2017 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/internal/SQLConf.scala | 8 +++++
.../sql/execution/basicPhysicalOperators.scala | 3 ++
.../apache/spark/sql/execution/subquery.scala | 2 +-
.../org/apache/spark/sql/SQLQuerySuite.scala | 35 ++++++++++++++++++++
4 files changed, 47 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f7cf2096/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c6f5cf6..1739b0c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -552,6 +552,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.subquery.reuse")
+ .internal()
+ .doc("When true, the planner will try to find out duplicated subqueries and re-use them.")
+ .booleanConf
+ .createWithDefault(true)
+
val STATE_STORE_PROVIDER_CLASS =
buildConf("spark.sql.streaming.stateStore.providerClass")
.internal()
@@ -932,6 +938,8 @@ class SQLConf extends Serializable with Logging {
def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
+ def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED)
+
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)
http://git-wip-us.apache.org/repos/asf/spark/blob/f7cf2096/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 85096dc..f69a688 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -595,6 +595,9 @@ case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends Spa
*/
case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
+ // Ignore this wrapper for canonicalizing.
+ override lazy val canonicalized: SparkPlan = child.canonicalized
+
override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
"collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"))
http://git-wip-us.apache.org/repos/asf/spark/blob/f7cf2096/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index d11045f..2abeadf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -156,7 +156,7 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = {
- if (!conf.exchangeReuseEnabled) {
+ if (!conf.subqueryReuseEnabled) {
return plan
}
// Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls.
http://git-wip-us.apache.org/repos/asf/spark/blob/f7cf2096/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index b525c9e..41e9e2c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -23,9 +23,12 @@ import java.net.{MalformedURLException, URL}
import java.sql.Timestamp
import java.util.concurrent.atomic.AtomicBoolean
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.execution.{ScalarSubquery, SubqueryExec}
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
@@ -700,6 +703,38 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
row => Seq.fill(16)(Row.merge(row, row))).collect().toSeq)
}
+ test("Verify spark.sql.subquery.reuse") {
+ Seq(true, false).foreach { reuse =>
+ withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) {
+ val df = sql(
+ """
+ |SELECT key, (SELECT avg(key) FROM testData)
+ |FROM testData
+ |WHERE key > (SELECT avg(key) FROM testData)
+ |ORDER BY key
+ |LIMIT 3
+ """.stripMargin)
+
+ checkAnswer(df, Row(51, 50.5) :: Row(52, 50.5) :: Row(53, 50.5) :: Nil)
+
+ val subqueries = ArrayBuffer.empty[SubqueryExec]
+ df.queryExecution.executedPlan.transformAllExpressions {
+ case s @ ScalarSubquery(plan: SubqueryExec, _) =>
+ subqueries += plan
+ s
+ }
+
+ assert(subqueries.size == 2, "Two ScalarSubquery are expected in the plan")
+
+ if (reuse) {
+ assert(subqueries.distinct.size == 1, "Only one ScalarSubquery exists in the plan")
+ } else {
+ assert(subqueries.distinct.size == 2, "Reuse is not expected")
+ }
+ }
+ }
+ }
+
test("cartesian product join") {
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
checkAnswer(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org