You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/19 06:00:58 UTC

[spark] branch master updated: [SPARK-35122][SQL] Migrate CACHE/UNCACHE TABLE to use AnalysisOnlyCommand

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a06cdd  [SPARK-35122][SQL] Migrate CACHE/UNCACHE TABLE to use AnalysisOnlyCommand
7a06cdd is described below

commit 7a06cdd53b377ffcd58dacd63a1b96d6b422941c
Author: Terry Kim <yu...@gmail.com>
AuthorDate: Mon Apr 19 06:00:23 2021 +0000

    [SPARK-35122][SQL] Migrate CACHE/UNCACHE TABLE to use AnalysisOnlyCommand
    
    ### What changes were proposed in this pull request?
    
    Now that `AnalysisOnlyCommand` in introduced in #32032, `CacheTable` and `UncacheTable` can extend `AnalysisOnlyCommand` to simplify the code base. For example, the logic to handle these commands such that the tables are only analyzed is scattered across different places.
    
    ### Why are the changes needed?
    
    To simplify the code base to handle these two commands.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, just internal refactoring.
    
    ### How was this patch tested?
    
    The existing tests (e.g., `CachedTableSuite`) cover the changes in this PR. For example, if I make `CacheTable`/`UncacheTable` extend `LeafCommand`, there are few failures in `CachedTableSuite`.
    
    Closes #32220 from imback82/cache_cmd_analysis_only.
    
    Authored-by: Terry Kim <yu...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 32 ----------------------
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  6 ----
 .../sql/catalyst/plans/logical/v2Commands.scala    | 25 +++++++++++++++--
 .../execution/datasources/DataSourceStrategy.scala | 16 +----------
 .../datasources/v2/DataSourceV2Strategy.scala      |  6 +++-
 .../org/apache/spark/sql/hive/HiveStrategies.scala | 12 +-------
 6 files changed, 30 insertions(+), 67 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c98dc96..f59e533 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -836,14 +836,6 @@ class Analyzer(override val catalogManager: CatalogManager)
         lookupAndResolveTempView(ident)
           .map(view => i.copy(table = view))
           .getOrElse(i)
-      case c @ CacheTable(UnresolvedRelation(ident, _, false), _, _, _) =>
-        lookupAndResolveTempView(ident)
-          .map(view => c.copy(table = view))
-          .getOrElse(c)
-      case c @ UncacheTable(UnresolvedRelation(ident, _, false), _, _) =>
-        lookupAndResolveTempView(ident)
-          .map(view => c.copy(table = view, isTempView = true))
-          .getOrElse(c)
       // TODO (SPARK-27484): handle streaming write commands when we have them.
       case write: V2WriteCommand =>
         write.table match {
@@ -1022,16 +1014,6 @@ class Analyzer(override val catalogManager: CatalogManager)
           .map(v2Relation => i.copy(table = v2Relation))
           .getOrElse(i)
 
-      case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) =>
-        lookupV2Relation(u.multipartIdentifier, u.options, false)
-          .map(v2Relation => c.copy(table = v2Relation))
-          .getOrElse(c)
-
-      case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
-        lookupV2Relation(u.multipartIdentifier, u.options, false)
-          .map(v2Relation => c.copy(table = v2Relation))
-          .getOrElse(c)
-
       // TODO (SPARK-27484): handle streaming write commands when we have them.
       case write: V2WriteCommand =>
         write.table match {
@@ -1129,20 +1111,6 @@ class Analyzer(override val catalogManager: CatalogManager)
           case other => i.copy(table = other)
         }
 
-      case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) =>
-        lookupRelation(u.multipartIdentifier, u.options, false)
-          .map(resolveViews)
-          .map(EliminateSubqueryAliases(_))
-          .map(relation => c.copy(table = relation))
-          .getOrElse(c)
-
-      case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
-        lookupRelation(u.multipartIdentifier, u.options, false)
-          .map(resolveViews)
-          .map(EliminateSubqueryAliases(_))
-          .map(relation => c.copy(table = relation))
-          .getOrElse(c)
-
       // TODO (SPARK-27484): handle streaming write commands when we have them.
       case write: V2WriteCommand =>
         write.table match {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 76724e7..52e09ae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -125,12 +125,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
       case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _) =>
         u.failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")
 
-      case CacheTable(u: UnresolvedRelation, _, _, _) =>
-        u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")
-
-      case UncacheTable(u: UnresolvedRelation, _, _) =>
-        u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")
-
       // TODO (SPARK-27484): handle streaming write commands when we have them.
       case write: V2WriteCommand if write.table.isInstanceOf[UnresolvedRelation] =>
         val tblName = write.table.asInstanceOf[UnresolvedRelation].multipartIdentifier
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index bbc2b62..310a437 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -1012,7 +1012,18 @@ case class CacheTable(
     table: LogicalPlan,
     multipartIdentifier: Seq[String],
     isLazy: Boolean,
-    options: Map[String, String]) extends LeafCommand
+    options: Map[String, String],
+    isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[LogicalPlan]): CacheTable = {
+    assert(!isAnalyzed)
+    copy(table = newChildren.head)
+  }
+
+  override def childrenToAnalyze: Seq[LogicalPlan] = table :: Nil
+
+  override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+}
 
 /**
  * The logical plan of the CACHE TABLE ... AS SELECT command.
@@ -1041,7 +1052,17 @@ case class CacheTableAsSelect(
 case class UncacheTable(
     table: LogicalPlan,
     ifExists: Boolean,
-    isTempView: Boolean = false) extends LeafCommand
+    isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[LogicalPlan]): UncacheTable = {
+    assert(!isAnalyzed)
+    copy(table = newChildren.head)
+  }
+
+  override def childrenToAnalyze: Seq[LogicalPlan] = table :: Nil
+
+  override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+}
 
 /**
  * The logical plan of the ALTER TABLE ... SET LOCATION command.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 7125ec6..58ac924 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{CacheTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, Project, UncacheTable}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
 import org.apache.spark.sql.connector.catalog.SupportsRead
@@ -271,20 +271,6 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
     case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _, _) =>
       i.copy(table = DDLUtils.readHiveTable(tableMeta))
 
-    case c @ CacheTable(UnresolvedCatalogRelation(tableMeta, options, false), _, _, _)
-      if DDLUtils.isDatasourceTable(tableMeta) =>
-      c.copy(table = readDataSourceTable(tableMeta, options))
-
-    case c @ CacheTable(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _) =>
-      c.copy(table = DDLUtils.readHiveTable(tableMeta))
-
-    case u @ UncacheTable(UnresolvedCatalogRelation(tableMeta, options, false), _, _)
-        if DDLUtils.isDatasourceTable(tableMeta) =>
-      u.copy(table = readDataSourceTable(tableMeta, options))
-
-    case u @ UncacheTable(UnresolvedCatalogRelation(tableMeta, _, false), _, _) =>
-      u.copy(table = DDLUtils.readHiveTable(tableMeta))
-
     case UnresolvedCatalogRelation(tableMeta, options, false)
         if DDLUtils.isDatasourceTable(tableMeta) =>
       readDataSourceTable(tableMeta, options)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 6adb90b..811f418 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -415,7 +415,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       CacheTableAsSelectExec(r.tempViewName, r.plan, r.originalText, r.isLazy, r.options) :: Nil
 
     case r: UncacheTable =>
-      UncacheTableExec(r.table, cascade = !r.isTempView) :: Nil
+      def isTempView(table: LogicalPlan): Boolean = table match {
+        case SubqueryAlias(_, v: View) => v.isTempView
+        case _ => false
+      }
+      UncacheTableExec(r.table, cascade = !isTempView(r.table)) :: Nil
 
     case SetTableLocation(table: ResolvedTable, partitionSpec, location) =>
       if (partitionSpec.nonEmpty) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 08b0a15..84e2f5a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
-import org.apache.spark.sql.catalyst.plans.logical.{CacheTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics, UncacheTable}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
 import org.apache.spark.sql.execution._
@@ -231,16 +231,6 @@ case class RelationConversions(
         assertNoNullTypeInSchema(query.schema)
         OptimizedCreateHiveTableAsSelectCommand(
           tableDesc, query, query.output.map(_.name), mode)
-
-      // Cache table
-      case c @ CacheTable(relation: HiveTableRelation, _, _, _)
-          if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
-        c.copy(table = metastoreCatalog.convert(relation))
-
-      // Uncache table
-      case u @ UncacheTable(relation: HiveTableRelation, _, _)
-          if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
-        u.copy(table = metastoreCatalog.convert(relation))
     }
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org