You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/12/21 12:06:13 UTC

[spark] branch branch-3.1 updated: [SPARK-28863][SQL][FOLLOWUP] Make sure optimized plan will not be re-analyzed

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 1171904  [SPARK-28863][SQL][FOLLOWUP] Make sure optimized plan will not be re-analyzed
1171904 is described below

commit 117190474957b7a5e44cf8b9e04c0d928bf06e1d
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Dec 21 20:59:33 2020 +0900

    [SPARK-28863][SQL][FOLLOWUP] Make sure optimized plan will not be re-analyzed
    
    ### What changes were proposed in this pull request?
    
    It's a known issue that re-analyzing an optimized plan can lead to various issues. We made several attempts to avoid it from happening, but the current solution `AlreadyOptimized` is still not 100% safe, as people can inject catalyst rules to call analyzer directly.
    
    This PR proposes a simpler and safer idea: we set the `analyzed` flag to true after optimization, and analyzer will skip processing plans whose `analyzed` flag is true.
    
    ### Why are the changes needed?
    
    make the code simpler and safer
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests.
    
    Closes #30777 from cloud-fan/ds.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit b4bea1aa8972cdfd8901757a0ed990a20fca620f)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  1 +
 .../catalyst/plans/logical/AnalysisHelper.scala    |  7 +-
 .../spark/sql/execution/AlreadyOptimized.scala     | 37 ----------
 .../spark/sql/execution/QueryExecution.scala       |  7 +-
 .../datasources/v2/V1FallbackWriters.scala         |  7 +-
 .../sql/execution/AlreadyOptimizedSuite.scala      | 85 ----------------------
 6 files changed, 16 insertions(+), 128 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c5c0c68..a5a28a4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -168,6 +168,7 @@ class Analyzer(override val catalogManager: CatalogManager)
   }
 
   def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
+    if (plan.analyzed) return plan
     AnalysisHelper.markInAnalyzer {
       val analyzed = executeAndTrack(plan, tracker)
       try {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
index 2c6a716a..ffd1f78 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
@@ -46,7 +46,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
    * This should only be called by
    * [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]].
    */
-  private[catalyst] def setAnalyzed(): Unit = {
+  private[sql] def setAnalyzed(): Unit = {
     if (!_analyzed) {
       _analyzed = true
       children.foreach(_.setAnalyzed())
@@ -180,6 +180,11 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
     super.transformAllExpressions(rule)
   }
 
+  override def clone(): LogicalPlan = {
+    val cloned = super.clone()
+    if (analyzed) cloned.setAnalyzed()
+    cloned
+  }
 }
 
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala
deleted file mode 100644
index e40b114..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/** Query execution that skips re-analysis and optimize. */
-class AlreadyOptimizedExecution(
-    session: SparkSession,
-    plan: LogicalPlan) extends QueryExecution(session, plan) {
-  override lazy val analyzed: LogicalPlan = plan
-  override lazy val optimizedPlan: LogicalPlan = plan
-}
-
-object AlreadyOptimized {
-  def dataFrame(sparkSession: SparkSession, optimized: LogicalPlan): DataFrame = {
-    val qe = new AlreadyOptimizedExecution(sparkSession, optimized)
-    new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
-  }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 0531dd2..1d5a884 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -84,7 +84,12 @@ class QueryExecution(
   lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) {
     // clone the plan to avoid sharing the plan instance between different stages like analyzing,
     // optimizing and planning.
-    sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
+    val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
+    // We do not want optimized plans to be re-analyzed as literals that have been constant folded
+    // and such can cause issues during analysis. While `clone` should maintain the `analyzed` state
+    // of the LogicalPlan, we set the plan as analyzed here as well out of paranoia.
+    plan.setAnalyzed()
+    plan
   }
 
   private def assertOptimized(): Unit = optimizedPlan
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
index 9d2cea9..080e977 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
@@ -20,12 +20,13 @@ package org.apache.spark.sql.execution.datasources.v2
 import java.util.UUID
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.catalog.SupportsWrite
 import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
-import org.apache.spark.sql.execution.{AlreadyOptimized, SparkPlan}
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -118,9 +119,7 @@ trait SupportsV1Write extends SparkPlan {
   protected def writeWithV1(
       relation: InsertableRelation,
       refreshCache: () => Unit = () => ()): Seq[InternalRow] = {
-    val session = sqlContext.sparkSession
-    // The `plan` is already optimized, we should not analyze and optimize it again.
-    relation.insert(AlreadyOptimized.dataFrame(session, plan), overwrite = false)
+    relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = false)
     refreshCache()
 
     Nil
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
deleted file mode 100644
index c266aa9..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.execution.adaptive.EnableAdaptiveExecutionSuite
-import org.apache.spark.sql.test.SharedSparkSession
-
-class AlreadyOptimizedSuite extends QueryTest with SharedSparkSession {
-
-  import testImplicits._
-
-  test("simple execution") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
-    checkAnswer(planned, df.toDF().collect())
-  }
-
-  test("planning on top works - projection") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
-    checkAnswer(
-      planned.withColumn("data", 'id + 1),
-      df.withColumn("data", 'id + 1).collect())
-  }
-
-  test("planning on top works - filter") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
-    checkAnswer(planned.where('id < 5), df.where('id < 5).toDF().collect())
-  }
-
-  test("planning on top works - aggregate") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
-    checkAnswer(planned.groupBy('id).count(), df.groupBy('id).count().collect())
-  }
-
-  test("planning on top works - joins") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
-    val plannedLeft = planned.alias("l")
-    val dfLeft = df.alias("l")
-    val plannedRight = planned.alias("r")
-    val dfRight = df.alias("r")
-
-    checkAnswer(
-      plannedLeft.where('id < 3).join(plannedRight, Seq("id")),
-      dfLeft.where('id < 3).join(dfRight, Seq("id")).collect())
-
-    checkAnswer(
-      plannedLeft.where('id < 3).join(plannedRight, plannedLeft("id") === plannedRight("id")),
-      dfLeft.where('id < 3).join(dfRight, dfLeft("id") === dfRight("id")).collect())
-
-    checkAnswer(
-      plannedLeft.join(plannedRight, Seq("id")).where('id < 3),
-      dfLeft.join(dfRight, Seq("id")).where('id < 3).collect())
-
-    checkAnswer(
-      plannedLeft.join(plannedRight, plannedLeft("id") === plannedRight("id")).where($"l.id" < 3),
-      dfLeft.join(dfRight, dfLeft("id") === dfRight("id")).where($"l.id" < 3).collect())
-  }
-}
-
-class AlreadyOptimizedAQESuite extends AlreadyOptimizedSuite with EnableAdaptiveExecutionSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org