You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/14 08:24:55 UTC

[spark] branch master updated: [SPARK-34701][SQL] Introduce AnalysisOnlyCommand that allows its children to be removed once the command is marked as analyzed

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b5241c9  [SPARK-34701][SQL] Introduce AnalysisOnlyCommand that allows its children to be removed once the command is marked as analyzed
b5241c9 is described below

commit b5241c97b17a1139a4ff719bfce7f68aef094d95
Author: Terry Kim <yu...@gmail.com>
AuthorDate: Wed Apr 14 08:24:25 2021 +0000

    [SPARK-34701][SQL] Introduce AnalysisOnlyCommand that allows its children to be removed once the command is marked as analyzed
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to introduce the `AnalysisOnlyCommand` trait such that a command that extends this trait can have its children only analyzed, but not optimized. There is a corresponding analysis rule `HandleAnalysisOnlyCommand` that marks the command as analyzed after all other analysis rules are run.
    
    This can be useful if a logical plan has children where they need to be only analyzed, but not optimized - e.g., `CREATE VIEW` or `CACHE TABLE AS`. This also addresses the issue found in #31933.
    
    This PR also updates `CreateViewCommand`, `CacheTableAsSelect`, and `AlterViewAsCommand` to use the new trait / rule such that their children are only analyzed.
    
    ### Why are the changes needed?
    
    To address the issue where the plan is unnecessarily re-analyzed in `CreateViewCommand`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests should cover the changes.
    
    Closes #32032 from imback82/skip_transform.
    
    Authored-by: Terry Kim <yu...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 16 +++++++-
 .../spark/sql/catalyst/plans/logical/Command.scala | 11 +++++
 .../sql/catalyst/plans/logical/v2Commands.scala    | 13 +++++-
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  5 ++-
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 22 +++++-----
 .../apache/spark/sql/execution/command/views.scala | 47 ++++++++++++++++------
 .../execution/datasources/v2/CacheTableExec.scala  | 26 ++++++------
 .../sql-tests/results/explain-aqe.sql.out          |  2 +-
 .../resources/sql-tests/results/explain.sql.out    |  2 +-
 9 files changed, 102 insertions(+), 42 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7a11396..e7c72db 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -309,7 +309,9 @@ class Analyzer(override val catalogManager: CatalogManager)
     Batch("Subquery", Once,
       UpdateOuterReferences),
     Batch("Cleanup", fixedPoint,
-      CleanupAliases)
+      CleanupAliases),
+    Batch("HandleAnalysisOnlyCommand", Once,
+      HandleAnalysisOnlyCommand)
   )
 
   /**
@@ -3543,6 +3545,18 @@ class Analyzer(override val catalogManager: CatalogManager)
       }
     }
   }
+
+  /**
+   * A rule that marks a command as analyzed so that its children are removed to avoid
+   * being optimized. This rule should run after all other analysis rules are run.
+   */
+  object HandleAnalysisOnlyCommand extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+      case c: AnalysisOnlyCommand if c.resolved =>
+        checkAnalysis(c)
+        c.markAsAnalyzed()
+    }
+  }
 }
 
 /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
index 94ead5e..81ad92b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
@@ -37,3 +37,14 @@ trait Command extends LogicalPlan {
 trait LeafCommand extends Command with LeafLike[LogicalPlan]
 trait UnaryCommand extends Command with UnaryLike[LogicalPlan]
 trait BinaryCommand extends Command with BinaryLike[LogicalPlan]
+
+/**
+ * A logical node that can be used for a command that requires its children to be only analyzed,
+ * but not optimized.
+ */
+trait AnalysisOnlyCommand extends Command {
+  val isAnalyzed: Boolean
+  def childrenToAnalyze: Seq[LogicalPlan]
+  override final def children: Seq[LogicalPlan] = if (isAnalyzed) Nil else childrenToAnalyze
+  def markAsAnalyzed(): LogicalPlan
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 8b7f2db..bbc2b62 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -1022,7 +1022,18 @@ case class CacheTableAsSelect(
     plan: LogicalPlan,
     originalText: String,
     isLazy: Boolean,
-    options: Map[String, String]) extends LeafCommand
+    options: Map[String, String],
+    isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[LogicalPlan]): CacheTableAsSelect = {
+    assert(!isAnalyzed)
+    copy(plan = newChildren.head)
+  }
+
+  override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil
+
+  override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+}
 
 /**
  * The logical plan of the UNCACHE TABLE command.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index fd02d0b..540115b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3370,10 +3370,11 @@ class Dataset[T] private[sql](
       comment = None,
       properties = Map.empty,
       originalText = None,
-      child = logicalPlan,
+      plan = logicalPlan,
       allowExisting = false,
       replace = replace,
-      viewType = viewType)
+      viewType = viewType,
+      isAnalyzed = true)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index f9b9e5a..7f3d0c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -474,7 +474,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
     case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) =>
       AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, location)
 
-    case AlterViewAs(ResolvedView(ident, _), originalText, query) if query.resolved =>
+    case AlterViewAs(ResolvedView(ident, _), originalText, query) =>
       AlterViewAsCommand(
         ident.asTableIdentifier,
         originalText,
@@ -482,7 +482,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
 
     case CreateViewStatement(
       tbl, userSpecifiedColumns, comment, properties,
-      originalText, child, allowExisting, replace, viewType) if child.resolved =>
+      originalText, child, allowExisting, replace, viewType) =>
 
       val v1TableName = if (viewType != PersistedView) {
         // temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
@@ -491,15 +491,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         parseV1Table(tbl, "CREATE VIEW")
       }
       CreateViewCommand(
-        v1TableName.asTableIdentifier,
-        userSpecifiedColumns,
-        comment,
-        properties,
-        originalText,
-        child,
-        allowExisting,
-        replace,
-        viewType)
+        name = v1TableName.asTableIdentifier,
+        userSpecifiedColumns = userSpecifiedColumns,
+        comment = comment,
+        properties = properties,
+        originalText = originalText,
+        plan = child,
+        allowExisting = allowExisting,
+        replace = replace,
+        viewType = viewType)
 
     case ShowViews(resolved: ResolvedNamespace, pattern, output) =>
       resolved match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 93ea226..10ec4be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, Pe
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
+import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -48,13 +48,14 @@ import org.apache.spark.sql.util.SchemaUtils
  * @param properties the properties of this view.
  * @param originalText the original SQL text of this view, can be None if this view is created via
  *                     Dataset API.
- * @param child the logical plan that represents the view; this is used to generate the logical
- *              plan for temporary view and the view schema.
+ * @param plan the logical plan that represents the view; this is used to generate the logical
+ *             plan for temporary view and the view schema.
  * @param allowExisting if true, and if the view already exists, noop; if false, and if the view
  *                already exists, throws analysis exception.
  * @param replace if true, and if the view already exists, updates it; if false, and if the view
  *                already exists, throws analysis exception.
  * @param viewType the expected view type to be created with this command.
+ * @param isAnalyzed whether this command is analyzed or not.
  */
 case class CreateViewCommand(
     name: TableIdentifier,
@@ -62,15 +63,26 @@ case class CreateViewCommand(
     comment: Option[String],
     properties: Map[String, String],
     originalText: Option[String],
-    child: LogicalPlan,
+    plan: LogicalPlan,
     allowExisting: Boolean,
     replace: Boolean,
-    viewType: ViewType)
-  extends LeafRunnableCommand {
+    viewType: ViewType,
+    isAnalyzed: Boolean = false) extends RunnableCommand with AnalysisOnlyCommand {
 
   import ViewHelper._
 
-  override def innerChildren: Seq[QueryPlan[_]] = Seq(child)
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[LogicalPlan]): CreateViewCommand = {
+    assert(!isAnalyzed)
+    copy(plan = newChildren.head)
+  }
+
+  override def innerChildren: Seq[QueryPlan[_]] = Seq(plan)
+
+  // `plan` needs to be analyzed, but shouldn't be optimized so that caching works correctly.
+  override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil
+
+  def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
 
   if (viewType == PersistedView) {
     require(originalText.isDefined, "'originalText' must be provided to create permanent view")
@@ -96,10 +108,10 @@ case class CreateViewCommand(
   }
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    // If the plan cannot be analyzed, throw an exception and don't proceed.
-    val qe = sparkSession.sessionState.executePlan(child)
-    qe.assertAnalyzed()
-    val analyzedPlan = qe.analyzed
+    if (!isAnalyzed) {
+      throw new AnalysisException("The logical plan that represents the view is not analyzed.")
+    }
+    val analyzedPlan = plan
 
     if (userSpecifiedColumns.nonEmpty &&
         userSpecifiedColumns.length != analyzedPlan.output.length) {
@@ -233,12 +245,23 @@ case class CreateViewCommand(
 case class AlterViewAsCommand(
     name: TableIdentifier,
     originalText: String,
-    query: LogicalPlan) extends LeafRunnableCommand {
+    query: LogicalPlan,
+    isAnalyzed: Boolean = false) extends RunnableCommand with AnalysisOnlyCommand {
 
   import ViewHelper._
 
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[LogicalPlan]): AlterViewAsCommand = {
+    assert(!isAnalyzed)
+    copy(query = newChildren.head)
+  }
+
   override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
+  override def childrenToAnalyze: Seq[LogicalPlan] = query :: Nil
+
+  def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+
   override def run(session: SparkSession): Seq[Row] = {
     if (session.sessionState.catalog.isTempView(name)) {
       alterTemporaryView(session, query)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
index 5b4b9e3..ac97e57 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
@@ -94,19 +94,19 @@ case class CacheTableAsSelectExec(
   override lazy val relationName: String = tempViewName
 
   override lazy val planToCache: LogicalPlan = {
-    Dataset.ofRows(sparkSession,
-      CreateViewCommand(
-        name = TableIdentifier(tempViewName),
-        userSpecifiedColumns = Nil,
-        comment = None,
-        properties = Map.empty,
-        originalText = Some(originalText),
-        child = query,
-        allowExisting = false,
-        replace = false,
-        viewType = LocalTempView
-      )
-    )
+    CreateViewCommand(
+      name = TableIdentifier(tempViewName),
+      userSpecifiedColumns = Nil,
+      comment = None,
+      properties = Map.empty,
+      originalText = Some(originalText),
+      plan = query,
+      allowExisting = false,
+      replace = false,
+      viewType = LocalTempView,
+      isAnalyzed = true
+    ).run(sparkSession)
+
     dataFrameForCachedPlan.logicalPlan
   }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
index ddfab99..357445a 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
@@ -913,7 +913,7 @@ Execute CreateViewCommand (1)
 Output: []
 
 (2) CreateViewCommand
-Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
+Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true
 
 (3) LogicalRelation
 Arguments: parquet, [key#x, val#x], CatalogTable(
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
index 1f7f8f6..3d00872 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
@@ -858,7 +858,7 @@ Execute CreateViewCommand (1)
 Output: []
 
 (2) CreateViewCommand
-Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
+Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true
 
 (3) LogicalRelation
 Arguments: parquet, [key#x, val#x], CatalogTable(

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org