You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/08/31 15:13:48 UTC
[spark] branch master updated: [SPARK-40219][SQL] Resolved view logical plan should hold the schema to avoid redundant lookup
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e4a73db9383 [SPARK-40219][SQL] Resolved view logical plan should hold the schema to avoid redundant lookup
e4a73db9383 is described below
commit e4a73db93837018f3d6fd417832e06388338b439
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Aug 31 23:13:12 2022 +0800
[SPARK-40219][SQL] Resolved view logical plan should hold the schema to avoid redundant lookup
### What changes were proposed in this pull request?
In `CatalogImpl`, we need to look up the view again to get the schema, as `ResolvedView` only contains the view identifier. This PR fix this issue by refactoring the `ResolvedView`:
1. Split it into `ResolvedPersistentView` and `ResolvedTempView`
2. The new logical plans hold the view schema
3. `ResolvedPersistentView` holds the catalog, to match `ResolvedTable`.
This PR also cleans up the temp view related methods in `SessionCatalog`:
1. group them together in the section `Methods that interact with temp views only`
2. rename 2 methods to make the naming more consistent.
### Why are the changes needed?
code cleanup and avoid redundant view lookup
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
Closes #37658 from cloud-fan/ident.
Lead-authored-by: Wenchen Fan <we...@databricks.com>
Co-authored-by: Wenchen Fan <cl...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 65 ++++++++--------
.../sql/catalyst/analysis/v2ResolutionPlans.scala | 26 +++++--
.../sql/catalyst/catalog/SessionCatalog.scala | 88 ++++++++++------------
.../spark/sql/errors/QueryCompilationErrors.scala | 33 +++++---
.../catalyst/analysis/ResolveSessionCatalog.scala | 26 +++----
.../apache/spark/sql/internal/CatalogImpl.scala | 51 ++++++-------
.../spark/sql/execution/command/DDLSuite.scala | 4 +-
7 files changed, 153 insertions(+), 140 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ae177efa05e..2582702d0b4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1037,7 +1037,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case i @ InsertIntoStatement(table, _, _, _, _, _) if i.query.resolved =>
val relation = table match {
case u: UnresolvedRelation if !u.isStreaming =>
- lookupRelation(u).getOrElse(u)
+ resolveRelation(u).getOrElse(u)
case other => other
}
@@ -1054,7 +1054,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case write: V2WriteCommand =>
write.table match {
case u: UnresolvedRelation if !u.isStreaming =>
- lookupRelation(u).map(unwrapRelationPlan).map {
+ resolveRelation(u).map(unwrapRelationPlan).map {
case v: View => throw QueryCompilationErrors.writeIntoViewNotAllowedError(
v.desc.identifier, write)
case r: DataSourceV2Relation => write.withNewTable(r)
@@ -1069,25 +1069,28 @@ class Analyzer(override val catalogManager: CatalogManager)
}
case u: UnresolvedRelation =>
- lookupRelation(u).map(resolveViews).getOrElse(u)
+ resolveRelation(u).map(resolveViews).getOrElse(u)
case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version)
if timestamp.forall(ts => ts.resolved && !SubqueryExpression.hasSubquery(ts)) =>
- lookupRelation(u, TimeTravelSpec.create(timestamp, version, conf)).getOrElse(r)
+ resolveRelation(u, TimeTravelSpec.create(timestamp, version, conf)).getOrElse(r)
case u @ UnresolvedTable(identifier, cmd, relationTypeMismatchHint) =>
lookupTableOrView(identifier).map {
- case v: ResolvedView =>
+ case v: ResolvedPersistentView =>
+ val nameParts = v.catalog.name() +: v.identifier.asMultipartIdentifier
throw QueryCompilationErrors.expectTableNotViewError(
- v, cmd, relationTypeMismatchHint, u)
+ nameParts, isTemp = false, cmd, relationTypeMismatchHint, u)
+ case _: ResolvedTempView =>
+ throw QueryCompilationErrors.expectTableNotViewError(
+ identifier, isTemp = true, cmd, relationTypeMismatchHint, u)
case table => table
}.getOrElse(u)
case u @ UnresolvedView(identifier, cmd, allowTemp, relationTypeMismatchHint) =>
lookupTableOrView(identifier).map {
- case v: ResolvedView if v.isTemp && !allowTemp =>
- val name = identifier.quoted
- u.failAnalysis(s"$name is a temp view. '$cmd' expects a permanent view.")
+ case _: ResolvedTempView if !allowTemp =>
+ throw QueryCompilationErrors.expectViewNotTempViewError(identifier, cmd, u)
case t: ResolvedTable =>
throw QueryCompilationErrors.expectViewNotTableError(
t, cmd, relationTypeMismatchHint, u)
@@ -1096,46 +1099,44 @@ class Analyzer(override val catalogManager: CatalogManager)
case u @ UnresolvedTableOrView(identifier, cmd, allowTempView) =>
lookupTableOrView(identifier).map {
- case v: ResolvedView if v.isTemp && !allowTempView =>
+ case _: ResolvedTempView if !allowTempView =>
throw QueryCompilationErrors.expectTableOrPermanentViewNotTempViewError(
- identifier.quoted, cmd, u)
+ identifier, cmd, u)
case other => other
}.getOrElse(u)
}
- private def lookupTempView(
- identifier: Seq[String],
- isStreaming: Boolean = false,
- isTimeTravel: Boolean = false): Option[LogicalPlan] = {
+ private def lookupTempView(identifier: Seq[String]): Option[TemporaryViewRelation] = {
// We are resolving a view and this name is not a temp view when that view was created. We
// return None earlier here.
if (isResolvingView && !isReferredTempViewName(identifier)) return None
+ v1SessionCatalog.getRawLocalOrGlobalTempView(identifier)
+ }
- val tmpView = identifier match {
- case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
- case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, part2)
- case _ => None
- }
-
- tmpView.foreach { v =>
- if (isStreaming && !v.isStreaming) {
+ private def resolveTempView(
+ identifier: Seq[String],
+ isStreaming: Boolean = false,
+ isTimeTravel: Boolean = false): Option[LogicalPlan] = {
+ lookupTempView(identifier).map { v =>
+ val tempViewPlan = v1SessionCatalog.getTempViewRelation(v)
+ if (isStreaming && !tempViewPlan.isStreaming) {
throw QueryCompilationErrors.readNonStreamingTempViewError(identifier.quoted)
}
if (isTimeTravel) {
- val target = if (v.isStreaming) "streams" else "views"
+ val target = if (tempViewPlan.isStreaming) "streams" else "views"
throw QueryCompilationErrors.timeTravelUnsupportedError(target)
}
+ tempViewPlan
}
- tmpView
}
/**
- * Resolves relations to `ResolvedTable` or `ResolvedView`. This is for resolving DDL and
- * misc commands.
+ * Resolves relations to `ResolvedTable` or `Resolved[Temp/Persistent]View`. This is
+ * for resolving DDL and misc commands.
*/
private def lookupTableOrView(identifier: Seq[String]): Option[LogicalPlan] = {
- lookupTempView(identifier).map { _ =>
- ResolvedView(identifier.asIdentifier, isTemp = true)
+ lookupTempView(identifier).map { tempView =>
+ ResolvedTempView(identifier.asIdentifier, tempView.tableMeta.schema)
}.orElse {
expandIdentifier(identifier) match {
case CatalogAndIdentifier(catalog, ident) =>
@@ -1144,7 +1145,7 @@ class Analyzer(override val catalogManager: CatalogManager)
v1Table.v1Table.tableType == CatalogTableType.VIEW =>
val v1Ident = v1Table.catalogTable.identifier
val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier)
- ResolvedView(v2Ident, isTemp = false)
+ ResolvedPersistentView(catalog, v2Ident, v1Table.catalogTable.schema)
case table =>
ResolvedTable.create(catalog.asTableCatalog, ident, table)
}
@@ -1196,10 +1197,10 @@ class Analyzer(override val catalogManager: CatalogManager)
* Resolves relations to v1 relation if it's a v1 table from the session catalog, or to v2
* relation. This is for resolving DML commands and SELECT queries.
*/
- private def lookupRelation(
+ private def resolveRelation(
u: UnresolvedRelation,
timeTravelSpec: Option[TimeTravelSpec] = None): Option[LogicalPlan] = {
- lookupTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse {
+ resolveTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse {
expandIdentifier(u.multipartIdentifier) match {
case CatalogAndIdentifier(catalog, ident) =>
val key = catalog.name +: ident.namespace :+ ident.name
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index 6095d812d66..c3fc5533a8e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, I
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
-import org.apache.spark.sql.types.{DataType, StructField}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
/**
* Holds the name of a namespace that has yet to be looked up in a catalog. It will be resolved to
@@ -54,7 +54,7 @@ case class UnresolvedTable(
/**
* Holds the name of a view that has yet to be looked up. It will be resolved to
- * [[ResolvedView]] during analysis.
+ * [[ResolvedPersistentView]] or [[ResolvedTempView]] during analysis.
*/
case class UnresolvedView(
multipartIdentifier: Seq[String],
@@ -68,7 +68,8 @@ case class UnresolvedView(
/**
* Holds the name of a table or view that has yet to be looked up in a catalog. It will
- * be resolved to [[ResolvedTable]] or [[ResolvedView]] during analysis.
+ * be resolved to [[ResolvedTable]], [[ResolvedPersistentView]] or [[ResolvedTempView]] during
+ * analysis.
*/
case class UnresolvedTableOrView(
multipartIdentifier: Seq[String],
@@ -195,11 +196,22 @@ case class ResolvedFieldPosition(position: ColumnPosition) extends FieldPosition
/**
- * A plan containing resolved (temp) views.
+ * A plan containing resolved persistent views.
*/
-// TODO: create a generic representation for temp view, v1 view and v2 view, after we add view
-// support to v2 catalog. For now we only need the identifier to fallback to v1 command.
-case class ResolvedView(identifier: Identifier, isTemp: Boolean) extends LeafNodeWithoutStats {
+// TODO: create a generic representation for views, after we add view support to v2 catalog. For now
+// we only hold the view schema.
+case class ResolvedPersistentView(
+ catalog: CatalogPlugin,
+ identifier: Identifier,
+ viewSchema: StructType) extends LeafNodeWithoutStats {
+ override def output: Seq[Attribute] = Nil
+}
+
+/**
+ * A plan containing resolved (global) temp views.
+ */
+case class ResolvedTempView(identifier: Identifier, viewSchema: StructType)
+ extends LeafNodeWithoutStats {
override def output: Seq[Attribute] = Nil
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index e882dc18c2e..5d5d8b202c5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -696,6 +696,24 @@ class SessionCatalog(
getRawGlobalTempView(name).map(getTempViewPlan)
}
+ /**
+ * Generate a [[View]] operator from the local or global temporary view stored.
+ */
+ def getLocalOrGlobalTempView(name: TableIdentifier): Option[View] = {
+ getRawLocalOrGlobalTempView(toNameParts(name)).map(getTempViewPlan)
+ }
+
+ /**
+ * Return the raw logical plan of a temporary local or global view for the given name.
+ */
+ def getRawLocalOrGlobalTempView(name: Seq[String]): Option[TemporaryViewRelation] = {
+ name match {
+ case Seq(v) => getRawTempView(v)
+ case Seq(db, v) if isGlobalTempViewDB(db) => getRawGlobalTempView(v)
+ case _ => None
+ }
+ }
+
/**
* Drop a local temporary view.
*
@@ -714,6 +732,22 @@ class SessionCatalog(
globalTempViewManager.remove(format(name))
}
+ private def toNameParts(ident: TableIdentifier): Seq[String] = {
+ ident.database.toSeq :+ ident.table
+ }
+
+ private def getTempViewPlan(viewInfo: TemporaryViewRelation): View = viewInfo.plan match {
+ case Some(p) => View(desc = viewInfo.tableMeta, isTempView = true, child = p)
+ case None => fromCatalogTable(viewInfo.tableMeta, isTempView = true)
+ }
+
+ /**
+ * Generates a [[SubqueryAlias]] operator from the stored temporary view.
+ */
+ def getTempViewRelation(viewInfo: TemporaryViewRelation): SubqueryAlias = {
+ SubqueryAlias(toNameParts(viewInfo.tableMeta.identifier).map(format), getTempViewPlan(viewInfo))
+ }
+
// -------------------------------------------------------------
// | Methods that interact with temporary and metastore tables |
// -------------------------------------------------------------
@@ -868,11 +902,6 @@ class SessionCatalog(
}
}
- private def getTempViewPlan(viewInfo: TemporaryViewRelation): View = viewInfo.plan match {
- case Some(p) => View(desc = viewInfo.tableMeta, isTempView = true, child = p)
- case None => fromCatalogTable(viewInfo.tableMeta, isTempView = true)
- }
-
private def buildViewDDL(metadata: CatalogTable, isTempView: Boolean): Option[String] = {
if (isTempView) {
None
@@ -965,61 +994,22 @@ class SessionCatalog(
View(desc = metadata, isTempView = isTempView, child = Project(projectList, parsedPlan))
}
- def lookupTempView(table: String): Option[SubqueryAlias] = {
- val formattedTable = format(table)
- getTempView(formattedTable).map { view =>
- SubqueryAlias(formattedTable, view)
- }
- }
-
- def lookupGlobalTempView(db: String, table: String): Option[SubqueryAlias] = {
- val formattedDB = format(db)
- if (formattedDB == globalTempViewManager.database) {
- val formattedTable = format(table)
- getGlobalTempView(formattedTable).map { view =>
- SubqueryAlias(formattedTable, formattedDB, view)
- }
- } else {
- None
- }
- }
-
- /**
- * Return whether the given name parts belong to a temporary or global temporary view.
- */
- def isTempView(nameParts: Seq[String]): Boolean = {
- if (nameParts.length > 2) return false
- import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
- isTempView(nameParts.asTableIdentifier)
- }
-
def isGlobalTempViewDB(dbName: String): Boolean = {
globalTempViewManager.database.equalsIgnoreCase(dbName)
}
- def lookupTempView(name: TableIdentifier): Option[View] = {
- lookupLocalOrGlobalRawTempView(name.database.toSeq :+ name.table).map(getTempViewPlan)
- }
-
/**
- * Return the raw logical plan of a temporary local or global view for the given name.
+ * Return whether the given name parts belong to a temporary or global temporary view.
*/
- def lookupLocalOrGlobalRawTempView(name: Seq[String]): Option[TemporaryViewRelation] = {
- name match {
- case Seq(v) => getRawTempView(v)
- case Seq(db, v) if isGlobalTempViewDB(db) => getRawGlobalTempView(v)
- case _ => None
- }
+ def isTempView(nameParts: Seq[String]): Boolean = {
+ getRawLocalOrGlobalTempView(nameParts).isDefined
}
/**
* Return whether a table with the specified name is a temporary view.
- *
- * Note: The temporary view cache is checked only when database is not
- * explicitly specified.
*/
def isTempView(name: TableIdentifier): Boolean = synchronized {
- lookupTempView(name).isDefined
+ isTempView(toNameParts(name))
}
def isView(nameParts: Seq[String]): Boolean = {
@@ -1135,7 +1125,7 @@ class SessionCatalog(
* updated.
*/
def refreshTable(name: TableIdentifier): Unit = synchronized {
- lookupTempView(name).map(_.refresh).getOrElse {
+ getLocalOrGlobalTempView(name).map(_.refresh).getOrElse {
val qualifiedIdent = qualifyIdentifier(name)
val qualifiedTableName = QualifiedTableName(qualifiedIdent.database.get, qualifiedIdent.table)
tableRelationCache.invalidate(qualifiedTableName)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 7458e201be2..ef4321a4fc7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedTable, ResolvedView, Star, TableAlreadyExistsException, UnresolvedRegex}
+import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedTable, Star, TableAlreadyExistsException, UnresolvedRegex}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, InvalidUDFClassException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CreateMap, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition}
@@ -271,12 +271,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
s"$quoted as it's not a data source v2 relation.")
}
- def expectTableOrPermanentViewNotTempViewError(
- quoted: String, cmd: String, t: TreeNode[_]): Throwable = {
- new AnalysisException(s"$quoted is a temp view. '$cmd' expects a table or permanent view.",
- t.origin.line, t.origin.startPosition)
- }
-
def readNonStreamingTempViewError(quoted: String): Throwable = {
new AnalysisException(s"$quoted is not a temp view of streaming " +
"logical plan, please use batch API such as `DataFrameReader.table` to read it.")
@@ -306,10 +300,22 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
}
def expectTableNotViewError(
- v: ResolvedView, cmd: String, mismatchHint: Option[String], t: TreeNode[_]): Throwable = {
- val viewStr = if (v.isTemp) "temp view" else "view"
+ nameParts: Seq[String],
+ isTemp: Boolean,
+ cmd: String,
+ mismatchHint: Option[String],
+ t: TreeNode[_]): Throwable = {
+ val viewStr = if (isTemp) "temp view" else "view"
val hintStr = mismatchHint.map(" " + _).getOrElse("")
- new AnalysisException(s"${v.identifier.quoted} is a $viewStr. '$cmd' expects a table.$hintStr",
+ new AnalysisException(s"${nameParts.quoted} is a $viewStr. '$cmd' expects a table.$hintStr",
+ t.origin.line, t.origin.startPosition)
+ }
+
+ def expectViewNotTempViewError(
+ nameParts: Seq[String],
+ cmd: String,
+ t: TreeNode[_]): Throwable = {
+ new AnalysisException(s"${nameParts.quoted} is a temp view. '$cmd' expects a permanent view.",
t.origin.line, t.origin.startPosition)
}
@@ -320,6 +326,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
t.origin.line, t.origin.startPosition)
}
+ def expectTableOrPermanentViewNotTempViewError(
+ nameParts: Seq[String], cmd: String, t: TreeNode[_]): Throwable = {
+ new AnalysisException(
+ s"${nameParts.quoted} is a temp view. '$cmd' expects a table or permanent view.",
+ t.origin.line, t.origin.startPosition)
+ }
+
def expectPersistentFuncError(
name: String, cmd: String, mismatchHint: Option[String], t: TreeNode[_]): Throwable = {
val hintStr = mismatchHint.map(" " + _).getOrElse("")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 5a2c1b891ad..43b71657a2a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -204,13 +204,12 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case DropTable(ResolvedV1TableIdentifier(ident), ifExists, purge) =>
DropTableCommand(ident, ifExists, isView = false, purge = purge)
+ case DropTable(_: ResolvedPersistentView, ifExists, purge) =>
+ throw QueryCompilationErrors.cannotDropViewWithDropTableError
+
// v1 DROP TABLE supports temp view.
- case DropTable(r: ResolvedView, ifExists, purge) =>
- if (!r.isTemp) {
- throw QueryCompilationErrors.cannotDropViewWithDropTableError
- }
- val ResolvedViewIdentifier(ident) = r
- DropTableCommand(ident, ifExists, isView = false, purge = purge)
+ case DropTable(ResolvedTempView(ident, _), ifExists, purge) =>
+ DropTableCommand(ident.asTableIdentifier, ifExists, isView = false, purge = purge)
case DropView(ResolvedViewIdentifier(ident), ifExists) =>
DropTableCommand(ident, ifExists, isView = true, purge = false)
@@ -535,14 +534,13 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
object ResolvedViewIdentifier {
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
- case ResolvedView(ident, isTemp) =>
- if (isTemp) {
- Some(ident.asTableIdentifier)
- } else {
- // TODO: we should get the v1 identifier from v1 view directly, similar to `V1Table`. But
- // there is no general view representation in DS v2 yet.
- Some(catalogManager.v1SessionCatalog.qualifyIdentifier(ident.asTableIdentifier))
- }
+ case ResolvedPersistentView(catalog, ident, _) =>
+ assert(isSessionCatalog(catalog))
+ assert(ident.namespace().length == 1)
+ Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name)))
+
+ case ResolvedTempView(ident, _) => Some(ident.asTableIdentifier)
+
case _ => None
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 657ed87e609..e3a303d4c0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -23,12 +23,12 @@ import scala.util.control.NonFatal
import org.apache.spark.sql._
import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedNonPersistentFunc, ResolvedPersistentFunc, ResolvedTable, ResolvedView, UnresolvedFunc, UnresolvedIdentifier, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
+import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, SubqueryAlias, TableSpec, View}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table}
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, IdentifierHelper, MultipartIdentifierHelper, TransformHelper}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper, TransformHelper}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
@@ -137,7 +137,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
private def makeTable(nameParts: Seq[String]): Table = {
- sessionCatalog.lookupLocalOrGlobalRawTempView(nameParts).map { tempView =>
+ sessionCatalog.getRawLocalOrGlobalTempView(nameParts).map { tempView =>
new Table(
name = tempView.tableMeta.identifier.table,
catalog = null,
@@ -312,29 +312,13 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
case ResolvedTable(_, _, table, _) =>
val (partitionColumnNames, bucketSpecOpt) = table.partitioning.toSeq.convertTransforms
val bucketColumnNames = bucketSpecOpt.map(_.bucketColumnNames).getOrElse(Nil)
- table.schema.map { field =>
- new Column(
- name = field.name,
- description = field.getComment().orNull,
- dataType = field.dataType.simpleString,
- nullable = field.nullable,
- isPartition = partitionColumnNames.contains(field.name),
- isBucket = bucketColumnNames.contains(field.name))
- }
+ schemaToColumns(table.schema(), partitionColumnNames.contains, bucketColumnNames.contains)
- case ResolvedView(identifier, _) =>
- val catalog = sparkSession.sessionState.catalog
- val table = identifier.asTableIdentifier
- val schema = catalog.getTempViewOrPermanentTableMetadata(table).schema
- schema.map { field =>
- new Column(
- name = field.name,
- description = field.getComment().orNull,
- dataType = field.dataType.simpleString,
- nullable = field.nullable,
- isPartition = false,
- isBucket = false)
- }
+ case ResolvedPersistentView(_, _, schema) =>
+ schemaToColumns(schema)
+
+ case ResolvedTempView(_, schema) =>
+ schemaToColumns(schema)
case _ => throw QueryCompilationErrors.tableOrViewNotFound(ident)
}
@@ -342,6 +326,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
CatalogImpl.makeDataset(columns, sparkSession)
}
+ private def schemaToColumns(
+ schema: StructType,
+ isPartCol: String => Boolean = _ => false,
+ isBucketCol: String => Boolean = _ => false): Seq[Column] = {
+ schema.map { field =>
+ new Column(
+ name = field.name,
+ description = field.getComment().orNull,
+ dataType = field.dataType.simpleString,
+ nullable = field.nullable,
+ isPartition = isPartCol(field.name),
+ isBucket = isBucketCol(field.name))
+ }
+ }
+
private def getNamespace(catalog: CatalogPlugin, ns: Seq[String]): Database = catalog match {
case catalog: SupportsNamespaces =>
val metadata = catalog.loadNamespaceMetadata(ns.toArray)
@@ -744,7 +743,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
// if `tableName` is not 2 part name, then we directly uncache it from the cache manager.
try {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
- sessionCatalog.lookupTempView(tableIdent).map(uncacheView).getOrElse {
+ sessionCatalog.getLocalOrGlobalTempView(tableIdent).map(uncacheView).getOrElse {
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName),
cascade = true)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index c405bf046b3..f17a82dedc7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1872,8 +1872,8 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
val e = intercept[AnalysisException] {
sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)")
}
- assert(e.message.contains(
- "default.v1 is a view. 'ALTER TABLE ... ADD COLUMNS' expects a table."))
+ assert(e.message.contains(s"${SESSION_CATALOG_NAME}.default.v1 is a view. " +
+ "'ALTER TABLE ... ADD COLUMNS' expects a table."))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org