You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/06/14 18:43:00 UTC

spark git commit: Revert "[SPARK-20941][SQL] Fix SubqueryExec Reuse"

Repository: spark
Updated Branches:
  refs/heads/branch-2.2 3dda682c4 -> e02e0637f


Revert "[SPARK-20941][SQL] Fix SubqueryExec Reuse"

This reverts commit 6a4e023b250a86887475958093f1d3bdcbb49a03.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e02e0637
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e02e0637
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e02e0637

Branch: refs/heads/branch-2.2
Commit: e02e0637f43dddbdc0961cb2af85869f7ef9e12d
Parents: 3dda682
Author: Xiao Li <ga...@gmail.com>
Authored: Wed Jun 14 11:35:14 2017 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Wed Jun 14 11:35:14 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  8 -----
 .../sql/execution/basicPhysicalOperators.scala  |  3 --
 .../apache/spark/sql/execution/subquery.scala   |  2 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 35 --------------------
 4 files changed, 1 insertion(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e02e0637/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1ea9eb5..94244dd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -552,12 +552,6 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
-  val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.subquery.reuse")
-    .internal()
-    .doc("When true, the planner will try to find out duplicated subqueries and re-use them.")
-    .booleanConf
-    .createWithDefault(true)
-
   val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT =
     buildConf("spark.sql.streaming.stateStore.minDeltasForSnapshot")
       .internal()
@@ -927,8 +921,6 @@ class SQLConf extends Serializable with Logging {
 
   def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
 
-  def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED)
-
   def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
 
   def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)

http://git-wip-us.apache.org/repos/asf/spark/blob/e02e0637/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 04c1303..bd7a5c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -599,9 +599,6 @@ case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends Spa
  */
 case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
 
-  // Ignore this wrapper for canonicalizing.
-  override lazy val canonicalized: SparkPlan = child.canonicalized
-
   override lazy val metrics = Map(
     "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
     "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"))

http://git-wip-us.apache.org/repos/asf/spark/blob/e02e0637/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index 2abeadf..d11045f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -156,7 +156,7 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
 case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
 
   def apply(plan: SparkPlan): SparkPlan = {
-    if (!conf.subqueryReuseEnabled) {
+    if (!conf.exchangeReuseEnabled) {
       return plan
     }
     // Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls.

http://git-wip-us.apache.org/repos/asf/spark/blob/e02e0637/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 2e831dc..cd14d24 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -23,12 +23,9 @@ import java.net.{MalformedURLException, URL}
 import java.sql.Timestamp
 import java.util.concurrent.atomic.AtomicBoolean
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.{AccumulatorSuite, SparkException}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql.catalyst.util.StringUtils
-import org.apache.spark.sql.execution.{ScalarSubquery, SubqueryExec}
 import org.apache.spark.sql.execution.aggregate
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
 import org.apache.spark.sql.functions._
@@ -711,38 +708,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
         row => Seq.fill(16)(Row.merge(row, row))).collect().toSeq)
   }
 
-  test("Verify spark.sql.subquery.reuse") {
-    Seq(true, false).foreach { reuse =>
-      withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) {
-        val df = sql(
-          """
-            |SELECT key, (SELECT avg(key) FROM testData)
-            |FROM testData
-            |WHERE key > (SELECT avg(key) FROM testData)
-            |ORDER BY key
-            |LIMIT 3
-          """.stripMargin)
-
-        checkAnswer(df, Row(51, 50.5) :: Row(52, 50.5) :: Row(53, 50.5) :: Nil)
-
-        val subqueries = ArrayBuffer.empty[SubqueryExec]
-        df.queryExecution.executedPlan.transformAllExpressions {
-          case s @ ScalarSubquery(plan: SubqueryExec, _) =>
-            subqueries += plan
-            s
-        }
-
-        assert(subqueries.size == 2, "Two ScalarSubquery are expected in the plan")
-
-        if (reuse) {
-          assert(subqueries.distinct.size == 1, "Only one ScalarSubquery exists in the plan")
-        } else {
-          assert(subqueries.distinct.size == 2, "Reuse is not expected")
-        }
-      }
-    }
-  }
-
   test("cartesian product join") {
     withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
       checkAnswer(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org