You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/08/31 15:13:48 UTC

[spark] branch master updated: [SPARK-40219][SQL] Resolved view logical plan should hold the schema to avoid redundant lookup

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e4a73db9383 [SPARK-40219][SQL] Resolved view logical plan should hold the schema to avoid redundant lookup
e4a73db9383 is described below

commit e4a73db93837018f3d6fd417832e06388338b439
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Aug 31 23:13:12 2022 +0800

    [SPARK-40219][SQL] Resolved view logical plan should hold the schema to avoid redundant lookup
    
    ### What changes were proposed in this pull request?
    
    In `CatalogImpl`, we need to look up the view again to get the schema, as `ResolvedView` only contains the view identifier. This PR fix this issue by refactoring the `ResolvedView`:
    1. Split it into `ResolvedPersistentView` and `ResolvedTempView`
    2. The new logical plans hold the view schema
    3. `ResolvedPersistentView` holds the catalog, to match `ResolvedTable`.
    
    This PR also cleans up the temp view related methods in `SessionCatalog`:
    1. group them together in the section `Methods that interact with temp views only`
    2. rename 2 methods to make the naming more consistent.
    
    ### Why are the changes needed?
    
    code cleanup and avoid redundant view lookup
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #37658 from cloud-fan/ident.
    
    Lead-authored-by: Wenchen Fan <we...@databricks.com>
    Co-authored-by: Wenchen Fan <cl...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 65 ++++++++--------
 .../sql/catalyst/analysis/v2ResolutionPlans.scala  | 26 +++++--
 .../sql/catalyst/catalog/SessionCatalog.scala      | 88 ++++++++++------------
 .../spark/sql/errors/QueryCompilationErrors.scala  | 33 +++++---
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 26 +++----
 .../apache/spark/sql/internal/CatalogImpl.scala    | 51 ++++++-------
 .../spark/sql/execution/command/DDLSuite.scala     |  4 +-
 7 files changed, 153 insertions(+), 140 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ae177efa05e..2582702d0b4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1037,7 +1037,7 @@ class Analyzer(override val catalogManager: CatalogManager)
       case i @ InsertIntoStatement(table, _, _, _, _, _) if i.query.resolved =>
         val relation = table match {
           case u: UnresolvedRelation if !u.isStreaming =>
-            lookupRelation(u).getOrElse(u)
+            resolveRelation(u).getOrElse(u)
           case other => other
         }
 
@@ -1054,7 +1054,7 @@ class Analyzer(override val catalogManager: CatalogManager)
       case write: V2WriteCommand =>
         write.table match {
           case u: UnresolvedRelation if !u.isStreaming =>
-            lookupRelation(u).map(unwrapRelationPlan).map {
+            resolveRelation(u).map(unwrapRelationPlan).map {
               case v: View => throw QueryCompilationErrors.writeIntoViewNotAllowedError(
                 v.desc.identifier, write)
               case r: DataSourceV2Relation => write.withNewTable(r)
@@ -1069,25 +1069,28 @@ class Analyzer(override val catalogManager: CatalogManager)
         }
 
       case u: UnresolvedRelation =>
-        lookupRelation(u).map(resolveViews).getOrElse(u)
+        resolveRelation(u).map(resolveViews).getOrElse(u)
 
       case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version)
           if timestamp.forall(ts => ts.resolved && !SubqueryExpression.hasSubquery(ts)) =>
-        lookupRelation(u, TimeTravelSpec.create(timestamp, version, conf)).getOrElse(r)
+        resolveRelation(u, TimeTravelSpec.create(timestamp, version, conf)).getOrElse(r)
 
       case u @ UnresolvedTable(identifier, cmd, relationTypeMismatchHint) =>
         lookupTableOrView(identifier).map {
-          case v: ResolvedView =>
+          case v: ResolvedPersistentView =>
+            val nameParts = v.catalog.name() +: v.identifier.asMultipartIdentifier
             throw QueryCompilationErrors.expectTableNotViewError(
-              v, cmd, relationTypeMismatchHint, u)
+              nameParts, isTemp = false, cmd, relationTypeMismatchHint, u)
+          case _: ResolvedTempView =>
+            throw QueryCompilationErrors.expectTableNotViewError(
+              identifier, isTemp = true, cmd, relationTypeMismatchHint, u)
           case table => table
         }.getOrElse(u)
 
       case u @ UnresolvedView(identifier, cmd, allowTemp, relationTypeMismatchHint) =>
         lookupTableOrView(identifier).map {
-          case v: ResolvedView if v.isTemp && !allowTemp =>
-            val name = identifier.quoted
-            u.failAnalysis(s"$name is a temp view. '$cmd' expects a permanent view.")
+          case _: ResolvedTempView if !allowTemp =>
+            throw QueryCompilationErrors.expectViewNotTempViewError(identifier, cmd, u)
           case t: ResolvedTable =>
             throw QueryCompilationErrors.expectViewNotTableError(
               t, cmd, relationTypeMismatchHint, u)
@@ -1096,46 +1099,44 @@ class Analyzer(override val catalogManager: CatalogManager)
 
       case u @ UnresolvedTableOrView(identifier, cmd, allowTempView) =>
         lookupTableOrView(identifier).map {
-          case v: ResolvedView if v.isTemp && !allowTempView =>
+          case _: ResolvedTempView if !allowTempView =>
             throw QueryCompilationErrors.expectTableOrPermanentViewNotTempViewError(
-              identifier.quoted, cmd, u)
+              identifier, cmd, u)
           case other => other
         }.getOrElse(u)
     }
 
-    private def lookupTempView(
-        identifier: Seq[String],
-        isStreaming: Boolean = false,
-        isTimeTravel: Boolean = false): Option[LogicalPlan] = {
+    private def lookupTempView(identifier: Seq[String]): Option[TemporaryViewRelation] = {
       // We are resolving a view and this name is not a temp view when that view was created. We
       // return None earlier here.
       if (isResolvingView && !isReferredTempViewName(identifier)) return None
+      v1SessionCatalog.getRawLocalOrGlobalTempView(identifier)
+    }
 
-      val tmpView = identifier match {
-        case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
-        case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, part2)
-        case _ => None
-      }
-
-      tmpView.foreach { v =>
-        if (isStreaming && !v.isStreaming) {
+    private def resolveTempView(
+        identifier: Seq[String],
+        isStreaming: Boolean = false,
+        isTimeTravel: Boolean = false): Option[LogicalPlan] = {
+      lookupTempView(identifier).map { v =>
+        val tempViewPlan = v1SessionCatalog.getTempViewRelation(v)
+        if (isStreaming && !tempViewPlan.isStreaming) {
           throw QueryCompilationErrors.readNonStreamingTempViewError(identifier.quoted)
         }
         if (isTimeTravel) {
-          val target = if (v.isStreaming) "streams" else "views"
+          val target = if (tempViewPlan.isStreaming) "streams" else "views"
           throw QueryCompilationErrors.timeTravelUnsupportedError(target)
         }
+        tempViewPlan
       }
-      tmpView
     }
 
     /**
-     * Resolves relations to `ResolvedTable` or `ResolvedView`. This is for resolving DDL and
-     * misc commands.
+     * Resolves relations to `ResolvedTable` or `Resolved[Temp/Persistent]View`. This is
+     * for resolving DDL and misc commands.
      */
     private def lookupTableOrView(identifier: Seq[String]): Option[LogicalPlan] = {
-      lookupTempView(identifier).map { _ =>
-        ResolvedView(identifier.asIdentifier, isTemp = true)
+      lookupTempView(identifier).map { tempView =>
+        ResolvedTempView(identifier.asIdentifier, tempView.tableMeta.schema)
       }.orElse {
         expandIdentifier(identifier) match {
           case CatalogAndIdentifier(catalog, ident) =>
@@ -1144,7 +1145,7 @@ class Analyzer(override val catalogManager: CatalogManager)
                 v1Table.v1Table.tableType == CatalogTableType.VIEW =>
                 val v1Ident = v1Table.catalogTable.identifier
                 val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier)
-                ResolvedView(v2Ident, isTemp = false)
+                ResolvedPersistentView(catalog, v2Ident, v1Table.catalogTable.schema)
               case table =>
                 ResolvedTable.create(catalog.asTableCatalog, ident, table)
             }
@@ -1196,10 +1197,10 @@ class Analyzer(override val catalogManager: CatalogManager)
      * Resolves relations to v1 relation if it's a v1 table from the session catalog, or to v2
      * relation. This is for resolving DML commands and SELECT queries.
      */
-    private def lookupRelation(
+    private def resolveRelation(
         u: UnresolvedRelation,
         timeTravelSpec: Option[TimeTravelSpec] = None): Option[LogicalPlan] = {
-      lookupTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse {
+      resolveTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse {
         expandIdentifier(u.multipartIdentifier) match {
           case CatalogAndIdentifier(catalog, ident) =>
             val key = catalog.name +: ident.namespace :+ ident.name
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index 6095d812d66..c3fc5533a8e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, I
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
-import org.apache.spark.sql.types.{DataType, StructField}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
 
 /**
  * Holds the name of a namespace that has yet to be looked up in a catalog. It will be resolved to
@@ -54,7 +54,7 @@ case class UnresolvedTable(
 
 /**
  * Holds the name of a view that has yet to be looked up. It will be resolved to
- * [[ResolvedView]] during analysis.
+ * [[ResolvedPersistentView]] or [[ResolvedTempView]] during analysis.
  */
 case class UnresolvedView(
     multipartIdentifier: Seq[String],
@@ -68,7 +68,8 @@ case class UnresolvedView(
 
 /**
  * Holds the name of a table or view that has yet to be looked up in a catalog. It will
- * be resolved to [[ResolvedTable]] or [[ResolvedView]] during analysis.
+ * be resolved to [[ResolvedTable]], [[ResolvedPersistentView]] or [[ResolvedTempView]] during
+ * analysis.
  */
 case class UnresolvedTableOrView(
     multipartIdentifier: Seq[String],
@@ -195,11 +196,22 @@ case class ResolvedFieldPosition(position: ColumnPosition) extends FieldPosition
 
 
 /**
- * A plan containing resolved (temp) views.
+ * A plan containing resolved persistent views.
  */
-// TODO: create a generic representation for temp view, v1 view and v2 view, after we add view
-//       support to v2 catalog. For now we only need the identifier to fallback to v1 command.
-case class ResolvedView(identifier: Identifier, isTemp: Boolean) extends LeafNodeWithoutStats {
+// TODO: create a generic representation for views, after we add view support to v2 catalog. For now
+//       we only hold the view schema.
+case class ResolvedPersistentView(
+    catalog: CatalogPlugin,
+    identifier: Identifier,
+    viewSchema: StructType) extends LeafNodeWithoutStats {
+  override def output: Seq[Attribute] = Nil
+}
+
+/**
+ * A plan containing resolved (global) temp views.
+ */
+case class ResolvedTempView(identifier: Identifier, viewSchema: StructType)
+  extends LeafNodeWithoutStats {
   override def output: Seq[Attribute] = Nil
 }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index e882dc18c2e..5d5d8b202c5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -696,6 +696,24 @@ class SessionCatalog(
     getRawGlobalTempView(name).map(getTempViewPlan)
   }
 
+  /**
+   * Generate a [[View]] operator from the local or global temporary view stored.
+   */
+  def getLocalOrGlobalTempView(name: TableIdentifier): Option[View] = {
+    getRawLocalOrGlobalTempView(toNameParts(name)).map(getTempViewPlan)
+  }
+
+  /**
+   * Return the raw logical plan of a temporary local or global view for the given name.
+   */
+  def getRawLocalOrGlobalTempView(name: Seq[String]): Option[TemporaryViewRelation] = {
+    name match {
+      case Seq(v) => getRawTempView(v)
+      case Seq(db, v) if isGlobalTempViewDB(db) => getRawGlobalTempView(v)
+      case _ => None
+    }
+  }
+
   /**
    * Drop a local temporary view.
    *
@@ -714,6 +732,22 @@ class SessionCatalog(
     globalTempViewManager.remove(format(name))
   }
 
+  private def toNameParts(ident: TableIdentifier): Seq[String] = {
+    ident.database.toSeq :+ ident.table
+  }
+
+  private def getTempViewPlan(viewInfo: TemporaryViewRelation): View = viewInfo.plan match {
+    case Some(p) => View(desc = viewInfo.tableMeta, isTempView = true, child = p)
+    case None => fromCatalogTable(viewInfo.tableMeta, isTempView = true)
+  }
+
+  /**
+   * Generates a [[SubqueryAlias]] operator from the stored temporary view.
+   */
+  def getTempViewRelation(viewInfo: TemporaryViewRelation): SubqueryAlias = {
+    SubqueryAlias(toNameParts(viewInfo.tableMeta.identifier).map(format), getTempViewPlan(viewInfo))
+  }
+
   // -------------------------------------------------------------
   // | Methods that interact with temporary and metastore tables |
   // -------------------------------------------------------------
@@ -868,11 +902,6 @@ class SessionCatalog(
     }
   }
 
-  private def getTempViewPlan(viewInfo: TemporaryViewRelation): View = viewInfo.plan match {
-    case Some(p) => View(desc = viewInfo.tableMeta, isTempView = true, child = p)
-    case None => fromCatalogTable(viewInfo.tableMeta, isTempView = true)
-  }
-
   private def buildViewDDL(metadata: CatalogTable, isTempView: Boolean): Option[String] = {
     if (isTempView) {
       None
@@ -965,61 +994,22 @@ class SessionCatalog(
     View(desc = metadata, isTempView = isTempView, child = Project(projectList, parsedPlan))
   }
 
-  def lookupTempView(table: String): Option[SubqueryAlias] = {
-    val formattedTable = format(table)
-    getTempView(formattedTable).map { view =>
-      SubqueryAlias(formattedTable, view)
-    }
-  }
-
-  def lookupGlobalTempView(db: String, table: String): Option[SubqueryAlias] = {
-    val formattedDB = format(db)
-    if (formattedDB == globalTempViewManager.database) {
-      val formattedTable = format(table)
-      getGlobalTempView(formattedTable).map { view =>
-        SubqueryAlias(formattedTable, formattedDB, view)
-      }
-    } else {
-      None
-    }
-  }
-
-  /**
-   * Return whether the given name parts belong to a temporary or global temporary view.
-   */
-  def isTempView(nameParts: Seq[String]): Boolean = {
-    if (nameParts.length > 2) return false
-    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
-    isTempView(nameParts.asTableIdentifier)
-  }
-
   def isGlobalTempViewDB(dbName: String): Boolean = {
     globalTempViewManager.database.equalsIgnoreCase(dbName)
   }
 
-  def lookupTempView(name: TableIdentifier): Option[View] = {
-    lookupLocalOrGlobalRawTempView(name.database.toSeq :+ name.table).map(getTempViewPlan)
-  }
-
   /**
-   * Return the raw logical plan of a temporary local or global view for the given name.
+   * Return whether the given name parts belong to a temporary or global temporary view.
    */
-  def lookupLocalOrGlobalRawTempView(name: Seq[String]): Option[TemporaryViewRelation] = {
-    name match {
-      case Seq(v) => getRawTempView(v)
-      case Seq(db, v) if isGlobalTempViewDB(db) => getRawGlobalTempView(v)
-      case _ => None
-    }
+  def isTempView(nameParts: Seq[String]): Boolean = {
+    getRawLocalOrGlobalTempView(nameParts).isDefined
   }
 
   /**
    * Return whether a table with the specified name is a temporary view.
-   *
-   * Note: The temporary view cache is checked only when database is not
-   * explicitly specified.
    */
   def isTempView(name: TableIdentifier): Boolean = synchronized {
-    lookupTempView(name).isDefined
+    isTempView(toNameParts(name))
   }
 
   def isView(nameParts: Seq[String]): Boolean = {
@@ -1135,7 +1125,7 @@ class SessionCatalog(
    *      updated.
    */
   def refreshTable(name: TableIdentifier): Unit = synchronized {
-    lookupTempView(name).map(_.refresh).getOrElse {
+    getLocalOrGlobalTempView(name).map(_.refresh).getOrElse {
       val qualifiedIdent = qualifyIdentifier(name)
       val qualifiedTableName = QualifiedTableName(qualifiedIdent.database.get, qualifiedIdent.table)
       tableRelationCache.invalidate(qualifiedTableName)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 7458e201be2..ef4321a4fc7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedTable, ResolvedView, Star, TableAlreadyExistsException, UnresolvedRegex}
+import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedTable, Star, TableAlreadyExistsException, UnresolvedRegex}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, InvalidUDFClassException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CreateMap, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition}
@@ -271,12 +271,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       s"$quoted as it's not a data source v2 relation.")
   }
 
-  def expectTableOrPermanentViewNotTempViewError(
-      quoted: String, cmd: String, t: TreeNode[_]): Throwable = {
-    new AnalysisException(s"$quoted is a temp view. '$cmd' expects a table or permanent view.",
-      t.origin.line, t.origin.startPosition)
-  }
-
   def readNonStreamingTempViewError(quoted: String): Throwable = {
     new AnalysisException(s"$quoted is not a temp view of streaming " +
       "logical plan, please use batch API such as `DataFrameReader.table` to read it.")
@@ -306,10 +300,22 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
   }
 
   def expectTableNotViewError(
-      v: ResolvedView, cmd: String, mismatchHint: Option[String], t: TreeNode[_]): Throwable = {
-    val viewStr = if (v.isTemp) "temp view" else "view"
+      nameParts: Seq[String],
+      isTemp: Boolean,
+      cmd: String,
+      mismatchHint: Option[String],
+      t: TreeNode[_]): Throwable = {
+    val viewStr = if (isTemp) "temp view" else "view"
     val hintStr = mismatchHint.map(" " + _).getOrElse("")
-    new AnalysisException(s"${v.identifier.quoted} is a $viewStr. '$cmd' expects a table.$hintStr",
+    new AnalysisException(s"${nameParts.quoted} is a $viewStr. '$cmd' expects a table.$hintStr",
+      t.origin.line, t.origin.startPosition)
+  }
+
+  def expectViewNotTempViewError(
+      nameParts: Seq[String],
+      cmd: String,
+      t: TreeNode[_]): Throwable = {
+    new AnalysisException(s"${nameParts.quoted} is a temp view. '$cmd' expects a permanent view.",
       t.origin.line, t.origin.startPosition)
   }
 
@@ -320,6 +326,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       t.origin.line, t.origin.startPosition)
   }
 
+  def expectTableOrPermanentViewNotTempViewError(
+      nameParts: Seq[String], cmd: String, t: TreeNode[_]): Throwable = {
+    new AnalysisException(
+      s"${nameParts.quoted} is a temp view. '$cmd' expects a table or permanent view.",
+      t.origin.line, t.origin.startPosition)
+  }
+
   def expectPersistentFuncError(
       name: String, cmd: String, mismatchHint: Option[String], t: TreeNode[_]): Throwable = {
     val hintStr = mismatchHint.map(" " + _).getOrElse("")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 5a2c1b891ad..43b71657a2a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -204,13 +204,12 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
     case DropTable(ResolvedV1TableIdentifier(ident), ifExists, purge) =>
       DropTableCommand(ident, ifExists, isView = false, purge = purge)
 
+    case DropTable(_: ResolvedPersistentView, ifExists, purge) =>
+      throw QueryCompilationErrors.cannotDropViewWithDropTableError
+
     // v1 DROP TABLE supports temp view.
-    case DropTable(r: ResolvedView, ifExists, purge) =>
-      if (!r.isTemp) {
-        throw QueryCompilationErrors.cannotDropViewWithDropTableError
-      }
-      val ResolvedViewIdentifier(ident) = r
-      DropTableCommand(ident, ifExists, isView = false, purge = purge)
+    case DropTable(ResolvedTempView(ident, _), ifExists, purge) =>
+      DropTableCommand(ident.asTableIdentifier, ifExists, isView = false, purge = purge)
 
     case DropView(ResolvedViewIdentifier(ident), ifExists) =>
       DropTableCommand(ident, ifExists, isView = true, purge = false)
@@ -535,14 +534,13 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
 
   object ResolvedViewIdentifier {
     def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
-      case ResolvedView(ident, isTemp) =>
-        if (isTemp) {
-          Some(ident.asTableIdentifier)
-        } else {
-          // TODO: we should get the v1 identifier from v1 view directly, similar to `V1Table`. But
-          //       there is no general view representation in DS v2 yet.
-          Some(catalogManager.v1SessionCatalog.qualifyIdentifier(ident.asTableIdentifier))
-        }
+      case ResolvedPersistentView(catalog, ident, _) =>
+        assert(isSessionCatalog(catalog))
+        assert(ident.namespace().length == 1)
+        Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name)))
+
+      case ResolvedTempView(ident, _) => Some(ident.asTableIdentifier)
+
       case _ => None
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 657ed87e609..e3a303d4c0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -23,12 +23,12 @@ import scala.util.control.NonFatal
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
 import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedNonPersistentFunc, ResolvedPersistentFunc, ResolvedTable, ResolvedView, UnresolvedFunc, UnresolvedIdentifier, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
+import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, SubqueryAlias, TableSpec, View}
 import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table}
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, IdentifierHelper, MultipartIdentifierHelper, TransformHelper}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper, TransformHelper}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.command.ShowTablesCommand
 import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
@@ -137,7 +137,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
   }
 
   private def makeTable(nameParts: Seq[String]): Table = {
-    sessionCatalog.lookupLocalOrGlobalRawTempView(nameParts).map { tempView =>
+    sessionCatalog.getRawLocalOrGlobalTempView(nameParts).map { tempView =>
       new Table(
         name = tempView.tableMeta.identifier.table,
         catalog = null,
@@ -312,29 +312,13 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
       case ResolvedTable(_, _, table, _) =>
         val (partitionColumnNames, bucketSpecOpt) = table.partitioning.toSeq.convertTransforms
         val bucketColumnNames = bucketSpecOpt.map(_.bucketColumnNames).getOrElse(Nil)
-        table.schema.map { field =>
-          new Column(
-            name = field.name,
-            description = field.getComment().orNull,
-            dataType = field.dataType.simpleString,
-            nullable = field.nullable,
-            isPartition = partitionColumnNames.contains(field.name),
-            isBucket = bucketColumnNames.contains(field.name))
-        }
+        schemaToColumns(table.schema(), partitionColumnNames.contains, bucketColumnNames.contains)
 
-      case ResolvedView(identifier, _) =>
-        val catalog = sparkSession.sessionState.catalog
-        val table = identifier.asTableIdentifier
-        val schema = catalog.getTempViewOrPermanentTableMetadata(table).schema
-        schema.map { field =>
-          new Column(
-            name = field.name,
-            description = field.getComment().orNull,
-            dataType = field.dataType.simpleString,
-            nullable = field.nullable,
-            isPartition = false,
-            isBucket = false)
-        }
+      case ResolvedPersistentView(_, _, schema) =>
+        schemaToColumns(schema)
+
+      case ResolvedTempView(_, schema) =>
+        schemaToColumns(schema)
 
       case _ => throw QueryCompilationErrors.tableOrViewNotFound(ident)
     }
@@ -342,6 +326,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
     CatalogImpl.makeDataset(columns, sparkSession)
   }
 
+  private def schemaToColumns(
+      schema: StructType,
+      isPartCol: String => Boolean = _ => false,
+      isBucketCol: String => Boolean = _ => false): Seq[Column] = {
+    schema.map { field =>
+      new Column(
+        name = field.name,
+        description = field.getComment().orNull,
+        dataType = field.dataType.simpleString,
+        nullable = field.nullable,
+        isPartition = isPartCol(field.name),
+        isBucket = isBucketCol(field.name))
+    }
+  }
+
   private def getNamespace(catalog: CatalogPlugin, ns: Seq[String]): Database = catalog match {
     case catalog: SupportsNamespaces =>
       val metadata = catalog.loadNamespaceMetadata(ns.toArray)
@@ -744,7 +743,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
     // if `tableName` is not 2 part name, then we directly uncache it from the cache manager.
     try {
       val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
-      sessionCatalog.lookupTempView(tableIdent).map(uncacheView).getOrElse {
+      sessionCatalog.getLocalOrGlobalTempView(tableIdent).map(uncacheView).getOrElse {
         sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName),
           cascade = true)
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index c405bf046b3..f17a82dedc7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1872,8 +1872,8 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
       val e = intercept[AnalysisException] {
         sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)")
       }
-      assert(e.message.contains(
-        "default.v1 is a view. 'ALTER TABLE ... ADD COLUMNS' expects a table."))
+      assert(e.message.contains(s"${SESSION_CATALOG_NAME}.default.v1 is a view. " +
+        "'ALTER TABLE ... ADD COLUMNS' expects a table."))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org