You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/12/22 01:33:36 UTC

[spark] branch branch-3.0 updated: [SPARK-28863][SQL][FOLLOWUP][3.0] Make sure optimized plan will not be re-analyzed

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0820beb  [SPARK-28863][SQL][FOLLOWUP][3.0] Make sure optimized plan will not be re-analyzed
0820beb is described below

commit 0820beb60018210ee718b96acb37feb8d8445251
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Dec 22 10:30:34 2020 +0900

    [SPARK-28863][SQL][FOLLOWUP][3.0] Make sure optimized plan will not be re-analyzed
    
    backport https://github.com/apache/spark/pull/30777 to 3.0
    
    ----------
    
    ### What changes were proposed in this pull request?
    
    It's a known issue that re-analyzing an optimized plan can lead to various issues. We made several attempts to avoid it from happening, but the current solution `AlreadyOptimized` is still not 100% safe, as people can inject catalyst rules to call analyzer directly.
    
    This PR proposes a simpler and safer idea: we set the `analyzed` flag to true after optimization, and analyzer will skip processing plans whose `analyzed` flag is true.
    
    ### Why are the changes needed?
    
    make the code simpler and safer
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests.
    
    Closes #30872 from cloud-fan/ds.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  1 +
 .../catalyst/plans/logical/AnalysisHelper.scala    |  7 +-
 .../spark/sql/execution/AlreadyOptimized.scala     | 37 ----------
 .../spark/sql/execution/QueryExecution.scala       |  7 +-
 .../datasources/v2/V1FallbackWriters.scala         |  6 +-
 .../sql/execution/AlreadyOptimizedSuite.scala      | 85 ----------------------
 6 files changed, 16 insertions(+), 127 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 1307fc5..fbe6041 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -144,6 +144,7 @@ class Analyzer(
   }
 
   def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
+    if (plan.analyzed) return plan
     AnalysisHelper.markInAnalyzer {
       val analyzed = executeAndTrack(plan, tracker)
       try {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
index 30447db..9b6fee6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
@@ -46,7 +46,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
    * Recursively marks all nodes in this plan tree as analyzed.
    * This should only be called by [[CheckAnalysis]].
    */
-  private[catalyst] def setAnalyzed(): Unit = {
+  private[sql] def setAnalyzed(): Unit = {
     if (!_analyzed) {
       _analyzed = true
       children.foreach(_.setAnalyzed())
@@ -180,6 +180,11 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
     super.transformAllExpressions(rule)
   }
 
+  override def clone(): LogicalPlan = {
+    val cloned = super.clone()
+    if (analyzed) cloned.setAnalyzed()
+    cloned
+  }
 }
 
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala
deleted file mode 100644
index e40b114..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/** Query execution that skips re-analysis and optimize. */
-class AlreadyOptimizedExecution(
-    session: SparkSession,
-    plan: LogicalPlan) extends QueryExecution(session, plan) {
-  override lazy val analyzed: LogicalPlan = plan
-  override lazy val optimizedPlan: LogicalPlan = plan
-}
-
-object AlreadyOptimized {
-  def dataFrame(sparkSession: SparkSession, optimized: LogicalPlan): DataFrame = {
-    val qe = new AlreadyOptimizedExecution(sparkSession, optimized)
-    new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
-  }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 7f5a5e3..ed36c78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -79,7 +79,12 @@ class QueryExecution(
   lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) {
     // clone the plan to avoid sharing the plan instance between different stages like analyzing,
     // optimizing and planning.
-    sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
+    val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
+    // We do not want optimized plans to be re-analyzed as literals that have been constant folded
+    // and such can cause issues during analysis. While `clone` should maintain the `analyzed` state
+    // of the LogicalPlan, we set the plan as analyzed here as well out of paranoia.
+    plan.setAnalyzed()
+    plan
   }
 
   private def assertOptimized(): Unit = optimizedPlan
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
index 560da39..95082bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
@@ -20,12 +20,13 @@ package org.apache.spark.sql.execution.datasources.v2
 import java.util.UUID
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.catalog.SupportsWrite
 import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
-import org.apache.spark.sql.execution.{AlreadyOptimized, SparkPlan}
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -113,8 +114,7 @@ trait SupportsV1Write extends SparkPlan {
   def plan: LogicalPlan
 
   protected def writeWithV1(relation: InsertableRelation): Seq[InternalRow] = {
-    // The `plan` is already optimized, we should not analyze and optimize it again.
-    relation.insert(AlreadyOptimized.dataFrame(sqlContext.sparkSession, plan), overwrite = false)
+    relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = false)
     Nil
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
deleted file mode 100644
index c266aa9..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.execution.adaptive.EnableAdaptiveExecutionSuite
-import org.apache.spark.sql.test.SharedSparkSession
-
-class AlreadyOptimizedSuite extends QueryTest with SharedSparkSession {
-
-  import testImplicits._
-
-  test("simple execution") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
-    checkAnswer(planned, df.toDF().collect())
-  }
-
-  test("planning on top works - projection") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
-    checkAnswer(
-      planned.withColumn("data", 'id + 1),
-      df.withColumn("data", 'id + 1).collect())
-  }
-
-  test("planning on top works - filter") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
-    checkAnswer(planned.where('id < 5), df.where('id < 5).toDF().collect())
-  }
-
-  test("planning on top works - aggregate") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
-    checkAnswer(planned.groupBy('id).count(), df.groupBy('id).count().collect())
-  }
-
-  test("planning on top works - joins") {
-    val df = spark.range(10)
-    val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
-    val plannedLeft = planned.alias("l")
-    val dfLeft = df.alias("l")
-    val plannedRight = planned.alias("r")
-    val dfRight = df.alias("r")
-
-    checkAnswer(
-      plannedLeft.where('id < 3).join(plannedRight, Seq("id")),
-      dfLeft.where('id < 3).join(dfRight, Seq("id")).collect())
-
-    checkAnswer(
-      plannedLeft.where('id < 3).join(plannedRight, plannedLeft("id") === plannedRight("id")),
-      dfLeft.where('id < 3).join(dfRight, dfLeft("id") === dfRight("id")).collect())
-
-    checkAnswer(
-      plannedLeft.join(plannedRight, Seq("id")).where('id < 3),
-      dfLeft.join(dfRight, Seq("id")).where('id < 3).collect())
-
-    checkAnswer(
-      plannedLeft.join(plannedRight, plannedLeft("id") === plannedRight("id")).where($"l.id" < 3),
-      dfLeft.join(dfRight, dfLeft("id") === dfRight("id")).where($"l.id" < 3).collect())
-  }
-}
-
-class AlreadyOptimizedAQESuite extends AlreadyOptimizedSuite with EnableAdaptiveExecutionSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org