You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/14 08:24:55 UTC
[spark] branch master updated: [SPARK-34701][SQL] Introduce
AnalysisOnlyCommand that allows its children to be removed once the command
is marked as analyzed
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b5241c9 [SPARK-34701][SQL] Introduce AnalysisOnlyCommand that allows its children to be removed once the command is marked as analyzed
b5241c9 is described below
commit b5241c97b17a1139a4ff719bfce7f68aef094d95
Author: Terry Kim <yu...@gmail.com>
AuthorDate: Wed Apr 14 08:24:25 2021 +0000
[SPARK-34701][SQL] Introduce AnalysisOnlyCommand that allows its children to be removed once the command is marked as analyzed
### What changes were proposed in this pull request?
This PR proposes to introduce the `AnalysisOnlyCommand` trait such that a command that extends this trait can have its children only analyzed, but not optimized. There is a corresponding analysis rule `HandleAnalysisOnlyCommand` that marks the command as analyzed after all other analysis rules are run.
This can be useful if a logical plan has children where they need to be only analyzed, but not optimized - e.g., `CREATE VIEW` or `CACHE TABLE AS`. This also addresses the issue found in #31933.
This PR also updates `CreateViewCommand`, `CacheTableAsSelect`, and `AlterViewAsCommand` to use the new trait / rule such that their children are only analyzed.
### Why are the changes needed?
To address the issue where the plan is unnecessarily re-analyzed in `CreateViewCommand`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests should cover the changes.
Closes #32032 from imback82/skip_transform.
Authored-by: Terry Kim <yu...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 16 +++++++-
.../spark/sql/catalyst/plans/logical/Command.scala | 11 +++++
.../sql/catalyst/plans/logical/v2Commands.scala | 13 +++++-
.../main/scala/org/apache/spark/sql/Dataset.scala | 5 ++-
.../catalyst/analysis/ResolveSessionCatalog.scala | 22 +++++-----
.../apache/spark/sql/execution/command/views.scala | 47 ++++++++++++++++------
.../execution/datasources/v2/CacheTableExec.scala | 26 ++++++------
.../sql-tests/results/explain-aqe.sql.out | 2 +-
.../resources/sql-tests/results/explain.sql.out | 2 +-
9 files changed, 102 insertions(+), 42 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7a11396..e7c72db 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -309,7 +309,9 @@ class Analyzer(override val catalogManager: CatalogManager)
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
- CleanupAliases)
+ CleanupAliases),
+ Batch("HandleAnalysisOnlyCommand", Once,
+ HandleAnalysisOnlyCommand)
)
/**
@@ -3543,6 +3545,18 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}
}
+
+ /**
+ * A rule that marks a command as analyzed so that its children are removed to avoid
+ * being optimized. This rule should run after all other analysis rules are run.
+ */
+ object HandleAnalysisOnlyCommand extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case c: AnalysisOnlyCommand if c.resolved =>
+ checkAnalysis(c)
+ c.markAsAnalyzed()
+ }
+ }
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
index 94ead5e..81ad92b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
@@ -37,3 +37,14 @@ trait Command extends LogicalPlan {
trait LeafCommand extends Command with LeafLike[LogicalPlan]
trait UnaryCommand extends Command with UnaryLike[LogicalPlan]
trait BinaryCommand extends Command with BinaryLike[LogicalPlan]
+
+/**
+ * A logical node that can be used for a command that requires its children to be only analyzed,
+ * but not optimized.
+ */
+trait AnalysisOnlyCommand extends Command {
+ val isAnalyzed: Boolean
+ def childrenToAnalyze: Seq[LogicalPlan]
+ override final def children: Seq[LogicalPlan] = if (isAnalyzed) Nil else childrenToAnalyze
+ def markAsAnalyzed(): LogicalPlan
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 8b7f2db..bbc2b62 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -1022,7 +1022,18 @@ case class CacheTableAsSelect(
plan: LogicalPlan,
originalText: String,
isLazy: Boolean,
- options: Map[String, String]) extends LeafCommand
+ options: Map[String, String],
+ isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[LogicalPlan]): CacheTableAsSelect = {
+ assert(!isAnalyzed)
+ copy(plan = newChildren.head)
+ }
+
+ override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil
+
+ override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+}
/**
* The logical plan of the UNCACHE TABLE command.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index fd02d0b..540115b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3370,10 +3370,11 @@ class Dataset[T] private[sql](
comment = None,
properties = Map.empty,
originalText = None,
- child = logicalPlan,
+ plan = logicalPlan,
allowExisting = false,
replace = replace,
- viewType = viewType)
+ viewType = viewType,
+ isAnalyzed = true)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index f9b9e5a..7f3d0c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -474,7 +474,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) =>
AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, location)
- case AlterViewAs(ResolvedView(ident, _), originalText, query) if query.resolved =>
+ case AlterViewAs(ResolvedView(ident, _), originalText, query) =>
AlterViewAsCommand(
ident.asTableIdentifier,
originalText,
@@ -482,7 +482,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case CreateViewStatement(
tbl, userSpecifiedColumns, comment, properties,
- originalText, child, allowExisting, replace, viewType) if child.resolved =>
+ originalText, child, allowExisting, replace, viewType) =>
val v1TableName = if (viewType != PersistedView) {
// temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
@@ -491,15 +491,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
parseV1Table(tbl, "CREATE VIEW")
}
CreateViewCommand(
- v1TableName.asTableIdentifier,
- userSpecifiedColumns,
- comment,
- properties,
- originalText,
- child,
- allowExisting,
- replace,
- viewType)
+ name = v1TableName.asTableIdentifier,
+ userSpecifiedColumns = userSpecifiedColumns,
+ comment = comment,
+ properties = properties,
+ originalText = originalText,
+ plan = child,
+ allowExisting = allowExisting,
+ replace = replace,
+ viewType = viewType)
case ShowViews(resolved: ResolvedNamespace, pattern, output) =>
resolved match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 93ea226..10ec4be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, Pe
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
+import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -48,13 +48,14 @@ import org.apache.spark.sql.util.SchemaUtils
* @param properties the properties of this view.
* @param originalText the original SQL text of this view, can be None if this view is created via
* Dataset API.
- * @param child the logical plan that represents the view; this is used to generate the logical
- * plan for temporary view and the view schema.
+ * @param plan the logical plan that represents the view; this is used to generate the logical
+ * plan for temporary view and the view schema.
* @param allowExisting if true, and if the view already exists, noop; if false, and if the view
* already exists, throws analysis exception.
* @param replace if true, and if the view already exists, updates it; if false, and if the view
* already exists, throws analysis exception.
* @param viewType the expected view type to be created with this command.
+ * @param isAnalyzed whether this command is analyzed or not.
*/
case class CreateViewCommand(
name: TableIdentifier,
@@ -62,15 +63,26 @@ case class CreateViewCommand(
comment: Option[String],
properties: Map[String, String],
originalText: Option[String],
- child: LogicalPlan,
+ plan: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
- viewType: ViewType)
- extends LeafRunnableCommand {
+ viewType: ViewType,
+ isAnalyzed: Boolean = false) extends RunnableCommand with AnalysisOnlyCommand {
import ViewHelper._
- override def innerChildren: Seq[QueryPlan[_]] = Seq(child)
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[LogicalPlan]): CreateViewCommand = {
+ assert(!isAnalyzed)
+ copy(plan = newChildren.head)
+ }
+
+ override def innerChildren: Seq[QueryPlan[_]] = Seq(plan)
+
+ // `plan` needs to be analyzed, but shouldn't be optimized so that caching works correctly.
+ override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil
+
+ def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
if (viewType == PersistedView) {
require(originalText.isDefined, "'originalText' must be provided to create permanent view")
@@ -96,10 +108,10 @@ case class CreateViewCommand(
}
override def run(sparkSession: SparkSession): Seq[Row] = {
- // If the plan cannot be analyzed, throw an exception and don't proceed.
- val qe = sparkSession.sessionState.executePlan(child)
- qe.assertAnalyzed()
- val analyzedPlan = qe.analyzed
+ if (!isAnalyzed) {
+ throw new AnalysisException("The logical plan that represents the view is not analyzed.")
+ }
+ val analyzedPlan = plan
if (userSpecifiedColumns.nonEmpty &&
userSpecifiedColumns.length != analyzedPlan.output.length) {
@@ -233,12 +245,23 @@ case class CreateViewCommand(
case class AlterViewAsCommand(
name: TableIdentifier,
originalText: String,
- query: LogicalPlan) extends LeafRunnableCommand {
+ query: LogicalPlan,
+ isAnalyzed: Boolean = false) extends RunnableCommand with AnalysisOnlyCommand {
import ViewHelper._
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[LogicalPlan]): AlterViewAsCommand = {
+ assert(!isAnalyzed)
+ copy(query = newChildren.head)
+ }
+
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+ override def childrenToAnalyze: Seq[LogicalPlan] = query :: Nil
+
+ def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+
override def run(session: SparkSession): Seq[Row] = {
if (session.sessionState.catalog.isTempView(name)) {
alterTemporaryView(session, query)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
index 5b4b9e3..ac97e57 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
@@ -94,19 +94,19 @@ case class CacheTableAsSelectExec(
override lazy val relationName: String = tempViewName
override lazy val planToCache: LogicalPlan = {
- Dataset.ofRows(sparkSession,
- CreateViewCommand(
- name = TableIdentifier(tempViewName),
- userSpecifiedColumns = Nil,
- comment = None,
- properties = Map.empty,
- originalText = Some(originalText),
- child = query,
- allowExisting = false,
- replace = false,
- viewType = LocalTempView
- )
- )
+ CreateViewCommand(
+ name = TableIdentifier(tempViewName),
+ userSpecifiedColumns = Nil,
+ comment = None,
+ properties = Map.empty,
+ originalText = Some(originalText),
+ plan = query,
+ allowExisting = false,
+ replace = false,
+ viewType = LocalTempView,
+ isAnalyzed = true
+ ).run(sparkSession)
+
dataFrameForCachedPlan.logicalPlan
}
diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
index ddfab99..357445a 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
@@ -913,7 +913,7 @@ Execute CreateViewCommand (1)
Output: []
(2) CreateViewCommand
-Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
+Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true
(3) LogicalRelation
Arguments: parquet, [key#x, val#x], CatalogTable(
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
index 1f7f8f6..3d00872 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
@@ -858,7 +858,7 @@ Execute CreateViewCommand (1)
Output: []
(2) CreateViewCommand
-Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
+Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true
(3) LogicalRelation
Arguments: parquet, [key#x, val#x], CatalogTable(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org