You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/19 06:00:58 UTC
[spark] branch master updated: [SPARK-35122][SQL] Migrate
CACHE/UNCACHE TABLE to use AnalysisOnlyCommand
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7a06cdd [SPARK-35122][SQL] Migrate CACHE/UNCACHE TABLE to use AnalysisOnlyCommand
7a06cdd is described below
commit 7a06cdd53b377ffcd58dacd63a1b96d6b422941c
Author: Terry Kim <yu...@gmail.com>
AuthorDate: Mon Apr 19 06:00:23 2021 +0000
[SPARK-35122][SQL] Migrate CACHE/UNCACHE TABLE to use AnalysisOnlyCommand
### What changes were proposed in this pull request?
Now that `AnalysisOnlyCommand` in introduced in #32032, `CacheTable` and `UncacheTable` can extend `AnalysisOnlyCommand` to simplify the code base. For example, the logic to handle these commands such that the tables are only analyzed is scattered across different places.
### Why are the changes needed?
To simplify the code base to handle these two commands.
### Does this PR introduce _any_ user-facing change?
No, just internal refactoring.
### How was this patch tested?
The existing tests (e.g., `CachedTableSuite`) cover the changes in this PR. For example, if I make `CacheTable`/`UncacheTable` extend `LeafCommand`, there are few failures in `CachedTableSuite`.
Closes #32220 from imback82/cache_cmd_analysis_only.
Authored-by: Terry Kim <yu...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 32 ----------------------
.../sql/catalyst/analysis/CheckAnalysis.scala | 6 ----
.../sql/catalyst/plans/logical/v2Commands.scala | 25 +++++++++++++++--
.../execution/datasources/DataSourceStrategy.scala | 16 +----------
.../datasources/v2/DataSourceV2Strategy.scala | 6 +++-
.../org/apache/spark/sql/hive/HiveStrategies.scala | 12 +-------
6 files changed, 30 insertions(+), 67 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c98dc96..f59e533 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -836,14 +836,6 @@ class Analyzer(override val catalogManager: CatalogManager)
lookupAndResolveTempView(ident)
.map(view => i.copy(table = view))
.getOrElse(i)
- case c @ CacheTable(UnresolvedRelation(ident, _, false), _, _, _) =>
- lookupAndResolveTempView(ident)
- .map(view => c.copy(table = view))
- .getOrElse(c)
- case c @ UncacheTable(UnresolvedRelation(ident, _, false), _, _) =>
- lookupAndResolveTempView(ident)
- .map(view => c.copy(table = view, isTempView = true))
- .getOrElse(c)
// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
@@ -1022,16 +1014,6 @@ class Analyzer(override val catalogManager: CatalogManager)
.map(v2Relation => i.copy(table = v2Relation))
.getOrElse(i)
- case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) =>
- lookupV2Relation(u.multipartIdentifier, u.options, false)
- .map(v2Relation => c.copy(table = v2Relation))
- .getOrElse(c)
-
- case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
- lookupV2Relation(u.multipartIdentifier, u.options, false)
- .map(v2Relation => c.copy(table = v2Relation))
- .getOrElse(c)
-
// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
@@ -1129,20 +1111,6 @@ class Analyzer(override val catalogManager: CatalogManager)
case other => i.copy(table = other)
}
- case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) =>
- lookupRelation(u.multipartIdentifier, u.options, false)
- .map(resolveViews)
- .map(EliminateSubqueryAliases(_))
- .map(relation => c.copy(table = relation))
- .getOrElse(c)
-
- case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
- lookupRelation(u.multipartIdentifier, u.options, false)
- .map(resolveViews)
- .map(EliminateSubqueryAliases(_))
- .map(relation => c.copy(table = relation))
- .getOrElse(c)
-
// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 76724e7..52e09ae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -125,12 +125,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _) =>
u.failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")
- case CacheTable(u: UnresolvedRelation, _, _, _) =>
- u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")
-
- case UncacheTable(u: UnresolvedRelation, _, _) =>
- u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")
-
// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand if write.table.isInstanceOf[UnresolvedRelation] =>
val tblName = write.table.asInstanceOf[UnresolvedRelation].multipartIdentifier
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index bbc2b62..310a437 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -1012,7 +1012,18 @@ case class CacheTable(
table: LogicalPlan,
multipartIdentifier: Seq[String],
isLazy: Boolean,
- options: Map[String, String]) extends LeafCommand
+ options: Map[String, String],
+ isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[LogicalPlan]): CacheTable = {
+ assert(!isAnalyzed)
+ copy(table = newChildren.head)
+ }
+
+ override def childrenToAnalyze: Seq[LogicalPlan] = table :: Nil
+
+ override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+}
/**
* The logical plan of the CACHE TABLE ... AS SELECT command.
@@ -1041,7 +1052,17 @@ case class CacheTableAsSelect(
case class UncacheTable(
table: LogicalPlan,
ifExists: Boolean,
- isTempView: Boolean = false) extends LeafCommand
+ isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[LogicalPlan]): UncacheTable = {
+ assert(!isAnalyzed)
+ copy(table = newChildren.head)
+ }
+
+ override def childrenToAnalyze: Seq[LogicalPlan] = table :: Nil
+
+ override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+}
/**
* The logical plan of the ALTER TABLE ... SET LOCATION command.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 7125ec6..58ac924 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{CacheTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, Project, UncacheTable}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.catalog.SupportsRead
@@ -271,20 +271,6 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _, _) =>
i.copy(table = DDLUtils.readHiveTable(tableMeta))
- case c @ CacheTable(UnresolvedCatalogRelation(tableMeta, options, false), _, _, _)
- if DDLUtils.isDatasourceTable(tableMeta) =>
- c.copy(table = readDataSourceTable(tableMeta, options))
-
- case c @ CacheTable(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _) =>
- c.copy(table = DDLUtils.readHiveTable(tableMeta))
-
- case u @ UncacheTable(UnresolvedCatalogRelation(tableMeta, options, false), _, _)
- if DDLUtils.isDatasourceTable(tableMeta) =>
- u.copy(table = readDataSourceTable(tableMeta, options))
-
- case u @ UncacheTable(UnresolvedCatalogRelation(tableMeta, _, false), _, _) =>
- u.copy(table = DDLUtils.readHiveTable(tableMeta))
-
case UnresolvedCatalogRelation(tableMeta, options, false)
if DDLUtils.isDatasourceTable(tableMeta) =>
readDataSourceTable(tableMeta, options)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 6adb90b..811f418 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -415,7 +415,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
CacheTableAsSelectExec(r.tempViewName, r.plan, r.originalText, r.isLazy, r.options) :: Nil
case r: UncacheTable =>
- UncacheTableExec(r.table, cascade = !r.isTempView) :: Nil
+ def isTempView(table: LogicalPlan): Boolean = table match {
+ case SubqueryAlias(_, v: View) => v.isTempView
+ case _ => false
+ }
+ UncacheTableExec(r.table, cascade = !isTempView(r.table)) :: Nil
case SetTableLocation(table: ResolvedTable, partitionSpec, location) =>
if (partitionSpec.nonEmpty) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 08b0a15..84e2f5a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
-import org.apache.spark.sql.catalyst.plans.logical.{CacheTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics, UncacheTable}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
import org.apache.spark.sql.execution._
@@ -231,16 +231,6 @@ case class RelationConversions(
assertNoNullTypeInSchema(query.schema)
OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)
-
- // Cache table
- case c @ CacheTable(relation: HiveTableRelation, _, _, _)
- if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
- c.copy(table = metastoreCatalog.convert(relation))
-
- // Uncache table
- case u @ UncacheTable(relation: HiveTableRelation, _, _)
- if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
- u.copy(table = metastoreCatalog.convert(relation))
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org