You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/12/21 12:06:13 UTC
[spark] branch branch-3.1 updated: [SPARK-28863][SQL][FOLLOWUP]
Make sure optimized plan will not be re-analyzed
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 1171904 [SPARK-28863][SQL][FOLLOWUP] Make sure optimized plan will not be re-analyzed
1171904 is described below
commit 117190474957b7a5e44cf8b9e04c0d928bf06e1d
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Dec 21 20:59:33 2020 +0900
[SPARK-28863][SQL][FOLLOWUP] Make sure optimized plan will not be re-analyzed
### What changes were proposed in this pull request?
It's a known issue that re-analyzing an optimized plan can lead to various issues. We made several attempts to avoid it from happening, but the current solution `AlreadyOptimized` is still not 100% safe, as people can inject catalyst rules to call analyzer directly.
This PR proposes a simpler and safer idea: we set the `analyzed` flag to true after optimization, and analyzer will skip processing plans whose `analyzed` flag is true.
### Why are the changes needed?
make the code simpler and safer
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests.
Closes #30777 from cloud-fan/ds.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit b4bea1aa8972cdfd8901757a0ed990a20fca620f)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 1 +
.../catalyst/plans/logical/AnalysisHelper.scala | 7 +-
.../spark/sql/execution/AlreadyOptimized.scala | 37 ----------
.../spark/sql/execution/QueryExecution.scala | 7 +-
.../datasources/v2/V1FallbackWriters.scala | 7 +-
.../sql/execution/AlreadyOptimizedSuite.scala | 85 ----------------------
6 files changed, 16 insertions(+), 128 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c5c0c68..a5a28a4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -168,6 +168,7 @@ class Analyzer(override val catalogManager: CatalogManager)
}
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
+ if (plan.analyzed) return plan
AnalysisHelper.markInAnalyzer {
val analyzed = executeAndTrack(plan, tracker)
try {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
index 2c6a716a..ffd1f78 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
@@ -46,7 +46,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
* This should only be called by
* [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]].
*/
- private[catalyst] def setAnalyzed(): Unit = {
+ private[sql] def setAnalyzed(): Unit = {
if (!_analyzed) {
_analyzed = true
children.foreach(_.setAnalyzed())
@@ -180,6 +180,11 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
super.transformAllExpressions(rule)
}
+ override def clone(): LogicalPlan = {
+ val cloned = super.clone()
+ if (analyzed) cloned.setAnalyzed()
+ cloned
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala
deleted file mode 100644
index e40b114..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/** Query execution that skips re-analysis and optimize. */
-class AlreadyOptimizedExecution(
- session: SparkSession,
- plan: LogicalPlan) extends QueryExecution(session, plan) {
- override lazy val analyzed: LogicalPlan = plan
- override lazy val optimizedPlan: LogicalPlan = plan
-}
-
-object AlreadyOptimized {
- def dataFrame(sparkSession: SparkSession, optimized: LogicalPlan): DataFrame = {
- val qe = new AlreadyOptimizedExecution(sparkSession, optimized)
- new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 0531dd2..1d5a884 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -84,7 +84,12 @@ class QueryExecution(
lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) {
// clone the plan to avoid sharing the plan instance between different stages like analyzing,
// optimizing and planning.
- sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
+ val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
+ // We do not want optimized plans to be re-analyzed as literals that have been constant folded
+ // and such can cause issues during analysis. While `clone` should maintain the `analyzed` state
+ // of the LogicalPlan, we set the plan as analyzed here as well out of paranoia.
+ plan.setAnalyzed()
+ plan
}
private def assertOptimized(): Unit = optimizedPlan
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
index 9d2cea9..080e977 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
@@ -20,12 +20,13 @@ package org.apache.spark.sql.execution.datasources.v2
import java.util.UUID
import org.apache.spark.SparkException
+import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
-import org.apache.spark.sql.execution.{AlreadyOptimized, SparkPlan}
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -118,9 +119,7 @@ trait SupportsV1Write extends SparkPlan {
protected def writeWithV1(
relation: InsertableRelation,
refreshCache: () => Unit = () => ()): Seq[InternalRow] = {
- val session = sqlContext.sparkSession
- // The `plan` is already optimized, we should not analyze and optimize it again.
- relation.insert(AlreadyOptimized.dataFrame(session, plan), overwrite = false)
+ relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = false)
refreshCache()
Nil
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
deleted file mode 100644
index c266aa9..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.execution.adaptive.EnableAdaptiveExecutionSuite
-import org.apache.spark.sql.test.SharedSparkSession
-
-class AlreadyOptimizedSuite extends QueryTest with SharedSparkSession {
-
- import testImplicits._
-
- test("simple execution") {
- val df = spark.range(10)
- val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
- checkAnswer(planned, df.toDF().collect())
- }
-
- test("planning on top works - projection") {
- val df = spark.range(10)
- val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
- checkAnswer(
- planned.withColumn("data", 'id + 1),
- df.withColumn("data", 'id + 1).collect())
- }
-
- test("planning on top works - filter") {
- val df = spark.range(10)
- val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
- checkAnswer(planned.where('id < 5), df.where('id < 5).toDF().collect())
- }
-
- test("planning on top works - aggregate") {
- val df = spark.range(10)
- val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
- checkAnswer(planned.groupBy('id).count(), df.groupBy('id).count().collect())
- }
-
- test("planning on top works - joins") {
- val df = spark.range(10)
- val planned = AlreadyOptimized.dataFrame(spark, df.queryExecution.optimizedPlan)
-
- val plannedLeft = planned.alias("l")
- val dfLeft = df.alias("l")
- val plannedRight = planned.alias("r")
- val dfRight = df.alias("r")
-
- checkAnswer(
- plannedLeft.where('id < 3).join(plannedRight, Seq("id")),
- dfLeft.where('id < 3).join(dfRight, Seq("id")).collect())
-
- checkAnswer(
- plannedLeft.where('id < 3).join(plannedRight, plannedLeft("id") === plannedRight("id")),
- dfLeft.where('id < 3).join(dfRight, dfLeft("id") === dfRight("id")).collect())
-
- checkAnswer(
- plannedLeft.join(plannedRight, Seq("id")).where('id < 3),
- dfLeft.join(dfRight, Seq("id")).where('id < 3).collect())
-
- checkAnswer(
- plannedLeft.join(plannedRight, plannedLeft("id") === plannedRight("id")).where($"l.id" < 3),
- dfLeft.join(dfRight, dfLeft("id") === dfRight("id")).where($"l.id" < 3).collect())
- }
-}
-
-class AlreadyOptimizedAQESuite extends AlreadyOptimizedSuite with EnableAdaptiveExecutionSuite
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org