You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/09/19 08:48:01 UTC

[spark] branch master updated: [SPARK-40425][SQL] DROP TABLE does not need to do table lookup

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a134191024 [SPARK-40425][SQL] DROP TABLE does not need to do table lookup
1a134191024 is described below

commit 1a1341910249d545365ba3d6679c6943896dde22
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Sep 19 16:47:39 2022 +0800

    [SPARK-40425][SQL] DROP TABLE does not need to do table lookup
    
    ### What changes were proposed in this pull request?
    
    This PR updates `DropTable`/`DropView` to use `UnresolvedIdentifier` instead of `UnresolvedTableOrView`/`UnresolvedView`. This has several benefits:
    1. Simplify the `ifExits` handling. No need to handle `DropTable` in `ResolveCommandsWithIfExists` anymore.
    2. Avoid one table lookup if we eventually fallback to v1 command (v1 `DropTableCommand` will look up table again)
    3. v2 catalogs can avoid table lookup entirely if possible.
    
    This PR also improves table uncaching to match by table name directly, so that we don't need to look up the table and resolve to table relations.
    
    ### Why are the changes needed?
    
    Save table lookup.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #37879 from cloud-fan/drop-table.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 core/src/main/resources/error/error-classes.json   |  5 +++
 .../scala/org/apache/spark/SparkFunSuite.scala     | 11 +++--
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  9 +++-
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  6 ---
 .../catalyst/analysis/NoSuchItemException.scala    |  4 ++
 .../sql/catalyst/analysis/ResolveCatalogs.scala    | 12 ++++--
 .../analysis/ResolveCommandsWithIfExists.scala     |  6 +--
 .../sql/catalyst/analysis/v2ResolutionPlans.scala  | 10 ++++-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  8 +---
 .../spark/sql/errors/QueryCompilationErrors.scala  | 13 ++++++
 .../analysis/AnalysisExceptionPositionSuite.scala  |  2 -
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 10 ++---
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 18 ++++----
 .../apache/spark/sql/execution/CacheManager.scala  | 48 ++++++++++++++++++++--
 .../apache/spark/sql/execution/command/ddl.scala   | 37 ++++++++++++-----
 .../datasources/v2/DataSourceV2Strategy.scala      |  7 +++-
 .../execution/datasources/v2/DropTableExec.scala   |  3 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 39 ++++++------------
 .../spark/sql/execution/command/DDLSuite.scala     |  2 +-
 .../execution/command/DropTableParserSuite.scala   | 21 ++++------
 .../sql/execution/command/DropTableSuiteBase.scala |  2 +-
 .../execution/command/PlanResolutionSuite.scala    | 23 ++++++-----
 .../v2/jdbc/JDBCTableCatalogSuite.scala            |  4 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala    |  2 +-
 .../hive/execution/command/DropTableSuite.scala    |  2 +-
 25 files changed, 191 insertions(+), 113 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 22b47c979c6..e16c8c762dc 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -555,6 +555,11 @@
           "AES-<mode> with the padding <padding> by the <functionName> function."
         ]
       },
+      "CATALOG_OPERATION" : {
+        "message" : [
+          "Catalog <catalogName> does not support <operation>."
+        ]
+      },
       "DESC_TABLE_COLUMN_PARTITION" : {
         "message" : [
           "DESC TABLE COLUMN for a specific partition."
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index 10ebbe76d6c..4b0edf6f643 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -300,10 +300,15 @@ abstract class SparkFunSuite
       parameters: Map[String, String] = Map.empty,
       matchPVals: Boolean = false,
       queryContext: Array[QueryContext] = Array.empty): Unit = {
-    assert(exception.getErrorClass === errorClass)
+    val mainErrorClass :: tail = errorClass.split("\\.").toList
+    assert(tail.isEmpty || tail.length == 1)
+    // TODO: remove the `errorSubClass` parameter.
+    assert(tail.isEmpty || errorSubClass.isEmpty)
+    assert(exception.getErrorClass === mainErrorClass)
     if (exception.getErrorSubClass != null) {
-      assert(errorSubClass.isDefined)
-      assert(exception.getErrorSubClass === errorSubClass.get)
+      val subClass = errorSubClass.orElse(tail.headOption)
+      assert(subClass.isDefined)
+      assert(exception.getErrorSubClass === subClass.get)
     }
     sqlState.foreach(state => assert(exception.getSqlState === state))
     val expectedParameters = exception.getMessageParameters.asScala
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7c00b469e7b..70ff0039fa2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1091,7 +1091,7 @@ class Analyzer(override val catalogManager: CatalogManager)
         }.getOrElse(u)
 
       case u @ UnresolvedView(identifier, cmd, allowTemp, relationTypeMismatchHint) =>
-        lookupTableOrView(identifier).map {
+        lookupTableOrView(identifier, viewOnly = true).map {
           case _: ResolvedTempView if !allowTemp =>
             throw QueryCompilationErrors.expectViewNotTempViewError(identifier, cmd, u)
           case t: ResolvedTable =>
@@ -1137,12 +1137,17 @@ class Analyzer(override val catalogManager: CatalogManager)
      * Resolves relations to `ResolvedTable` or `Resolved[Temp/Persistent]View`. This is
      * for resolving DDL and misc commands.
      */
-    private def lookupTableOrView(identifier: Seq[String]): Option[LogicalPlan] = {
+    private def lookupTableOrView(
+        identifier: Seq[String],
+        viewOnly: Boolean = false): Option[LogicalPlan] = {
       lookupTempView(identifier).map { tempView =>
         ResolvedTempView(identifier.asIdentifier, tempView.tableMeta.schema)
       }.orElse {
         expandIdentifier(identifier) match {
           case CatalogAndIdentifier(catalog, ident) =>
+            if (viewOnly && !CatalogV2Util.isSessionCatalog(catalog)) {
+              throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views")
+            }
             CatalogV2Util.loadTable(catalog, ident).map {
               case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) &&
                 v1Table.v1Table.tableType == CatalogTableType.VIEW =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 68ed8991553..587ede9a81b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -131,12 +131,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
       case u: UnresolvedTable =>
         u.failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")
 
-      case u @ UnresolvedView(NonSessionCatalogAndIdentifier(catalog, ident), cmd, _, _) =>
-        u.failAnalysis(
-          s"Cannot specify catalog `${catalog.name}` for view ${ident.quoted} " +
-            "because view support in v2 catalog has not been implemented yet. " +
-            s"$cmd expects a view.")
-
       case u: UnresolvedView =>
         u.failAnalysis(s"View not found: ${u.multipartIdentifier.quoted}")
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
index 5813c2fa4da..5c2c6d918a2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
@@ -54,6 +54,10 @@ case class NoSuchTableException(
   def this(tableIdent: Identifier) = {
     this(s"Table ${tableIdent.quoted} not found")
   }
+
+  def this(nameParts: Seq[String]) = {
+    this(s"Table ${nameParts.quoted} not found")
+  }
 }
 
 case class NoSuchPartitionException(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 9893384b709..221f1a0f313 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, LookupCatalog}
 
 /**
  * Resolves the catalog of the name parts for table/view/function/namespace.
@@ -28,8 +28,14 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
   extends Rule[LogicalPlan] with LookupCatalog {
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case UnresolvedIdentifier(CatalogAndIdentifier(catalog, identifier)) =>
-      ResolvedIdentifier(catalog, identifier)
+    case UnresolvedIdentifier(nameParts, allowTemp) =>
+      if (allowTemp && catalogManager.v1SessionCatalog.isTempView(nameParts)) {
+        val ident = Identifier.of(nameParts.dropRight(1).toArray, nameParts.last)
+        ResolvedIdentifier(FakeSystemCatalog, ident)
+      } else {
+        val CatalogAndIdentifier(catalog, identifier) = nameParts
+        ResolvedIdentifier(catalog, identifier)
+      }
     case s @ ShowTables(UnresolvedNamespace(Seq()), _, _) =>
       s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
     case s @ ShowTableExtended(UnresolvedNamespace(Seq()), _, _, _) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala
index a7370254826..7a2bd1ccc15 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.sql.catalyst.plans.logical.{DropFunction, DropTable, DropView, LogicalPlan, NoopCommand, UncacheTable}
+import org.apache.spark.sql.catalyst.plans.logical.{DropFunction, LogicalPlan, NoopCommand, UncacheTable}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
 
@@ -29,10 +29,6 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
 object ResolveCommandsWithIfExists extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
     _.containsPattern(COMMAND)) {
-    case DropTable(u: UnresolvedTableOrView, ifExists, _) if ifExists =>
-      NoopCommand("DROP TABLE", u.multipartIdentifier)
-    case DropView(u: UnresolvedView, ifExists) if ifExists =>
-      NoopCommand("DROP VIEW", u.multipartIdentifier)
     case UncacheTable(u: UnresolvedRelation, ifExists, _) if ifExists =>
       NoopCommand("UNCACHE TABLE", u.multipartIdentifier)
     case DropFunction(u: UnresolvedFunc, ifExists) if ifExists =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index c3fc5533a8e..321eecf42b0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
  * Holds the name of a namespace that has yet to be looked up in a catalog. It will be resolved to
@@ -135,7 +136,8 @@ case class UnresolvedFunc(
  * Holds the name of a table/view/function identifier that we need to determine the catalog. It will
  * be resolved to [[ResolvedIdentifier]] during analysis.
  */
-case class UnresolvedIdentifier(nameParts: Seq[String]) extends LeafNode {
+case class UnresolvedIdentifier(nameParts: Seq[String], allowTemp: Boolean = false)
+  extends LeafNode {
   override lazy val resolved: Boolean = false
   override def output: Seq[Attribute] = Nil
 }
@@ -244,3 +246,9 @@ case class ResolvedIdentifier(
     identifier: Identifier) extends LeafNodeWithoutStats {
   override def output: Seq[Attribute] = Nil
 }
+
+// A fake v2 catalog to hold temp views.
+object FakeSystemCatalog extends CatalogPlugin {
+  override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}
+  override def name(): String = "SYSTEM"
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 76de49d86dc..d2ea8df4151 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3673,7 +3673,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
   override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) {
     // DROP TABLE works with either a table or a temporary view.
     DropTable(
-      createUnresolvedTableOrView(ctx.multipartIdentifier(), "DROP TABLE"),
+      UnresolvedIdentifier(visitMultipartIdentifier(ctx.multipartIdentifier()), allowTemp = true),
       ctx.EXISTS != null,
       ctx.PURGE != null)
   }
@@ -3683,11 +3683,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
    */
   override def visitDropView(ctx: DropViewContext): AnyRef = withOrigin(ctx) {
     DropView(
-      createUnresolvedView(
-        ctx.multipartIdentifier(),
-        commandName = "DROP VIEW",
-        allowTemp = true,
-        relationTypeMismatchHint = Some("Please use DROP TABLE instead.")),
+      UnresolvedIdentifier(visitMultipartIdentifier(ctx.multipartIdentifier()), allowTemp = true),
       ctx.EXISTS != null)
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index c1d8f0a4a8a..ef1d0dd94fc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -604,6 +604,15 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
         "operation" -> operation))
   }
 
+  def catalogOperationNotSupported(catalog: CatalogPlugin, operation: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNSUPPORTED_FEATURE",
+      errorSubClass = "CATALOG_OPERATION",
+      messageParameters = Map(
+        "catalogName" -> toSQLId(Seq(catalog.name())),
+        "operation" -> operation))
+  }
+
   def alterColumnWithV1TableCannotSpecifyNotNullError(): Throwable = {
     new AnalysisException("ALTER COLUMN with v1 tables cannot specify NOT NULL.")
   }
@@ -958,6 +967,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
     new NoSuchTableException(ident)
   }
 
+  def noSuchTableError(nameParts: Seq[String]): Throwable = {
+    new NoSuchTableException(nameParts)
+  }
+
   def noSuchNamespaceError(namespace: Array[String]): Throwable = {
     new NoSuchNamespaceException(namespace)
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala
index e50a58f8ce5..785d5ae05cf 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala
@@ -34,14 +34,12 @@ class AnalysisExceptionPositionSuite extends AnalysisTest {
   }
 
   test("SPARK-33918: UnresolvedView should retain sql text position") {
-    verifyViewPosition("DROP VIEW unknown", "unknown")
     verifyViewPosition("ALTER VIEW unknown SET TBLPROPERTIES ('k'='v')", "unknown")
     verifyViewPosition("ALTER VIEW unknown UNSET TBLPROPERTIES ('k')", "unknown")
     verifyViewPosition("ALTER VIEW unknown AS SELECT 1", "unknown")
   }
 
   test("SPARK-34057: UnresolvedTableOrView should retain sql text position") {
-    verifyTableOrViewPosition("DROP TABLE unknown", "unknown")
     verifyTableOrViewPosition("DESCRIBE TABLE unknown", "unknown")
     verifyTableOrPermanentViewPosition("ANALYZE TABLE unknown COMPUTE STATISTICS", "unknown")
     verifyTableOrViewPosition("ANALYZE TABLE unknown COMPUTE STATISTICS FOR COLUMNS col", "unknown")
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 25bacc3631e..ba2cc4ff15e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -685,15 +685,15 @@ class DDLParserSuite extends AnalysisTest {
     val cmd = "DROP VIEW"
     val hint = Some("Please use DROP TABLE instead.")
     parseCompare(s"DROP VIEW testcat.db.view",
-      DropView(UnresolvedView(Seq("testcat", "db", "view"), cmd, true, hint), ifExists = false))
+      DropView(UnresolvedIdentifier(Seq("testcat", "db", "view"), true), ifExists = false))
     parseCompare(s"DROP VIEW db.view",
-      DropView(UnresolvedView(Seq("db", "view"), cmd, true, hint), ifExists = false))
+      DropView(UnresolvedIdentifier(Seq("db", "view"), true), ifExists = false))
     parseCompare(s"DROP VIEW IF EXISTS db.view",
-      DropView(UnresolvedView(Seq("db", "view"), cmd, true, hint), ifExists = true))
+      DropView(UnresolvedIdentifier(Seq("db", "view"), true), ifExists = true))
     parseCompare(s"DROP VIEW view",
-      DropView(UnresolvedView(Seq("view"), cmd, true, hint), ifExists = false))
+      DropView(UnresolvedIdentifier(Seq("view"), true), ifExists = false))
     parseCompare(s"DROP VIEW IF EXISTS view",
-      DropView(UnresolvedView(Seq("view"), cmd, true, hint), ifExists = true))
+      DropView(UnresolvedIdentifier(Seq("view"), true), ifExists = true))
   }
 
   private def testCreateOrReplaceDdl(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index cca0a56174d..56236f0d2ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -216,19 +216,23 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         c
       }
 
-    case DropTable(ResolvedV1TableIdentifier(ident), ifExists, purge) =>
+    case DropTable(ResolvedV1Identifier(ident), ifExists, purge) =>
       DropTableCommand(ident, ifExists, isView = false, purge = purge)
 
-    case DropTable(_: ResolvedPersistentView, ifExists, purge) =>
-      throw QueryCompilationErrors.cannotDropViewWithDropTableError
-
     // v1 DROP TABLE supports temp view.
-    case DropTable(ResolvedTempView(ident, _), ifExists, purge) =>
-      DropTableCommand(ident.asTableIdentifier, ifExists, isView = false, purge = purge)
+    case DropTable(ResolvedIdentifier(FakeSystemCatalog, ident), _, _) =>
+      DropTempViewCommand(ident)
 
-    case DropView(ResolvedViewIdentifier(ident), ifExists) =>
+    case DropView(ResolvedV1Identifier(ident), ifExists) =>
       DropTableCommand(ident, ifExists, isView = true, purge = false)
 
+    case DropView(r @ ResolvedIdentifier(catalog, ident), _) =>
+      if (catalog == FakeSystemCatalog) {
+        DropTempViewCommand(ident)
+      } else {
+        throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views")
+      }
+
     case c @ CreateNamespace(DatabaseNameInSessionCatalog(name), _, _) if conf.useV1Command =>
       val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT)
       val location = c.properties.get(SupportsNamespaces.PROP_LOCATION)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 527f78ef10e..e9bbbc717d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -26,7 +26,7 @@ import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
 import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
-import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint}
+import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint, SubqueryAlias, View}
 import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
@@ -159,11 +159,51 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
       plan: LogicalPlan,
       cascade: Boolean,
       blocking: Boolean = false): Unit = {
+    uncacheQuery(spark, _.sameResult(plan), cascade, blocking)
+  }
+
+  def uncacheTableOrView(spark: SparkSession, name: Seq[String], cascade: Boolean): Unit = {
+    uncacheQuery(
+      spark,
+      isMatchedTableOrView(_, name, spark.sessionState.conf),
+      cascade,
+      blocking = false)
+  }
+
+  private def isMatchedTableOrView(plan: LogicalPlan, name: Seq[String], conf: SQLConf): Boolean = {
+    def isSameName(nameInCache: Seq[String]): Boolean = {
+      nameInCache.length == name.length && nameInCache.zip(name).forall(conf.resolver.tupled)
+    }
+
+    plan match {
+      case SubqueryAlias(ident, LogicalRelation(_, _, Some(catalogTable), _)) =>
+        val v1Ident = catalogTable.identifier
+        isSameName(ident.qualifier :+ ident.name) &&
+          isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table)
+
+      case SubqueryAlias(ident, DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _)) =>
+        isSameName(ident.qualifier :+ ident.name) &&
+          isSameName(catalog.name() +: v2Ident.namespace() :+ v2Ident.name())
+
+      case SubqueryAlias(ident, View(catalogTable, _, _)) =>
+        val v1Ident = catalogTable.identifier
+        isSameName(ident.qualifier :+ ident.name) &&
+          isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table)
+
+      case _ => false
+    }
+  }
+
+  def uncacheQuery(
+      spark: SparkSession,
+      isMatchedPlan: LogicalPlan => Boolean,
+      cascade: Boolean,
+      blocking: Boolean): Unit = {
     val shouldRemove: LogicalPlan => Boolean =
       if (cascade) {
-        _.exists(_.sameResult(plan))
+        _.exists(isMatchedPlan)
       } else {
-        _.sameResult(plan)
+        isMatchedPlan
       }
     val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan))
     this.synchronized {
@@ -187,7 +227,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
         //    will keep it as it is. It means the physical plan has been re-compiled already in the
         //    other thread.
         val cacheAlreadyLoaded = cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
-        cd.plan.exists(_.sameResult(plan)) && !cacheAlreadyLoaded
+        cd.plan.exists(isMatchedPlan) && !cacheAlreadyLoaded
       })
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 078358b6c7d..1f71a104707 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog}
 import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -203,7 +203,8 @@ case class DescribeDatabaseCommand(
 }
 
 /**
- * Drops a table/view from the metastore and removes it if it is cached.
+ * Drops a table/view from the metastore and removes it if it is cached. This command does not drop
+ * temp views, which should be handled by [[DropTempViewCommand]].
  *
  * The syntax of this command is:
  * {{{
@@ -219,9 +220,8 @@ case class DropTableCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
-    val isTempView = catalog.isTempView(tableName)
 
-    if (!isTempView && catalog.tableExists(tableName)) {
+    if (catalog.tableExists(tableName)) {
       // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
       // issue an exception.
       catalog.getTableMetadata(tableName).tableType match {
@@ -231,14 +231,10 @@ case class DropTableCommand(
           throw QueryCompilationErrors.cannotDropViewWithDropTableError()
         case _ =>
       }
-    }
 
-    if (isTempView || catalog.tableExists(tableName)) {
       try {
-        val hasViewText = isTempView &&
-          catalog.getTempViewOrPermanentTableMetadata(tableName).viewText.isDefined
         sparkSession.sharedState.cacheManager.uncacheQuery(
-          sparkSession.table(tableName), cascade = !isTempView || hasViewText)
+          sparkSession.table(tableName), cascade = true)
       } catch {
         case NonFatal(e) => log.warn(e.toString, e)
       }
@@ -247,7 +243,28 @@ case class DropTableCommand(
     } else if (ifExists) {
       // no-op
     } else {
-      throw QueryCompilationErrors.tableOrViewNotFoundError(tableName.identifier)
+      throw QueryCompilationErrors.noSuchTableError(
+        tableName.catalog.toSeq ++ tableName.database :+ tableName.table)
+    }
+    Seq.empty[Row]
+  }
+}
+
+case class DropTempViewCommand(ident: Identifier) extends LeafRunnableCommand {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    assert(ident.namespace().isEmpty || ident.namespace().length == 1)
+    val nameParts = ident.namespace() :+ ident.name()
+    val catalog = sparkSession.sessionState.catalog
+    catalog.getRawLocalOrGlobalTempView(nameParts).foreach { view =>
+      val hasViewText = view.tableMeta.viewText.isDefined
+      sparkSession.sharedState.cacheManager.uncacheTableOrView(
+        sparkSession, nameParts, cascade = hasViewText)
+      view.refresh()
+      if (ident.namespace().isEmpty) {
+        catalog.dropTempView(ident.name())
+      } else {
+        catalog.dropGlobalTempView(ident.name())
+      }
     }
     Seq.empty[Row]
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 35a5f41fb17..39ad51ffbe7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -326,8 +326,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
             "DESC TABLE COLUMN", toPrettySQL(nested))
       }
 
-    case DropTable(r: ResolvedTable, ifExists, purge) =>
-      DropTableExec(r.catalog, r.identifier, ifExists, purge, invalidateTableCache(r)) :: Nil
+    case DropTable(r: ResolvedIdentifier, ifExists, purge) =>
+      val invalidateFunc = () => session.sharedState.cacheManager.uncacheTableOrView(
+        session, r.catalog.name() +: r.identifier.namespace() :+ r.identifier.name(),
+        cascade = true)
+      DropTableExec(r.catalog.asTableCatalog, r.identifier, ifExists, purge, invalidateFunc) :: Nil
 
     case _: NoopCommand =>
       LocalTableScanExec(Nil, Nil) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala
index 1e0627fb6df..2125b58813f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala
@@ -37,7 +37,8 @@ case class DropTableExec(
       invalidateCache()
       if (purge) catalog.purgeTable(ident) else catalog.dropTable(ident)
     } else if (!ifExists) {
-      throw QueryCompilationErrors.noSuchTableError(ident)
+      throw QueryCompilationErrors.noSuchTableError(
+        catalog.name() +: ident.namespace() :+ ident.name())
     }
 
     Seq.empty
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 11f4fe0649b..7a97efe088c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2089,33 +2089,18 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
   }
 
   test("View commands are not supported in v2 catalogs") {
-    def validateViewCommand(
-        sql: String,
-        catalogName: String,
-        viewName: String,
-        cmdName: String): Unit = {
-      assertAnalysisError(
-        sql,
-        s"Cannot specify catalog `$catalogName` for view $viewName because view support " +
-          s"in v2 catalog has not been implemented yet. $cmdName expects a view.")
-    }
-
-    validateViewCommand("DROP VIEW testcat.v", "testcat", "v", "DROP VIEW")
-    validateViewCommand(
-      "ALTER VIEW testcat.v SET TBLPROPERTIES ('key' = 'val')",
-      "testcat",
-      "v",
-      "ALTER VIEW ... SET TBLPROPERTIES")
-    validateViewCommand(
-      "ALTER VIEW testcat.v UNSET TBLPROPERTIES ('key')",
-      "testcat",
-      "v",
-      "ALTER VIEW ... UNSET TBLPROPERTIES")
-    validateViewCommand(
-      "ALTER VIEW testcat.v AS SELECT 1",
-      "testcat",
-      "v",
-      "ALTER VIEW ... AS")
+    def validateViewCommand(sqlStatement: String): Unit = {
+      val e = intercept[AnalysisException](sql(sqlStatement))
+      checkError(
+        e,
+        errorClass = "UNSUPPORTED_FEATURE.CATALOG_OPERATION",
+        parameters = Map("catalogName" -> "`testcat`", "operation" -> "views"))
+    }
+
+    validateViewCommand("DROP VIEW testcat.v")
+    validateViewCommand("ALTER VIEW testcat.v SET TBLPROPERTIES ('key' = 'val')")
+    validateViewCommand("ALTER VIEW testcat.v UNSET TBLPROPERTIES ('key')")
+    validateViewCommand("ALTER VIEW testcat.v AS SELECT 1")
   }
 
   test("SPARK-33924: INSERT INTO .. PARTITION preserves the partition location") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index c7fa365abbd..14af2b82411 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -962,7 +962,7 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
       sql("DROP VIEW dbx.tab1")
     }
     assert(e.getMessage.contains(
-      "dbx.tab1 is a table. 'DROP VIEW' expects a view. Please use DROP TABLE instead."))
+      "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead"))
   }
 
   protected def testSetProperties(isDatasourceTable: Boolean): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableParserSuite.scala
index 60c7cd8dd6f..7e81ad66436 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableParserSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedTableOrView}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedIdentifier}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
 import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan}
 import org.apache.spark.sql.test.SharedSparkSession
@@ -29,31 +29,26 @@ class DropTableParserSuite extends AnalysisTest with SharedSparkSession {
 
   test("drop table") {
     parseCompare("DROP TABLE testcat.ns1.ns2.tbl",
-      DropTable(
-        UnresolvedTableOrView(Seq("testcat", "ns1", "ns2", "tbl"), "DROP TABLE", true),
+      DropTable(UnresolvedIdentifier(Seq("testcat", "ns1", "ns2", "tbl"), true),
         ifExists = false,
         purge = false))
     parseCompare(s"DROP TABLE db.tab",
       DropTable(
-        UnresolvedTableOrView(Seq("db", "tab"), "DROP TABLE", true),
+        UnresolvedIdentifier(Seq("db", "tab"), true),
         ifExists = false,
         purge = false))
     parseCompare(s"DROP TABLE IF EXISTS db.tab",
       DropTable(
-        UnresolvedTableOrView(Seq("db", "tab"), "DROP TABLE", true),
+        UnresolvedIdentifier(Seq("db", "tab"), true),
         ifExists = true,
         purge = false))
     parseCompare(s"DROP TABLE tab",
-      DropTable(
-        UnresolvedTableOrView(Seq("tab"), "DROP TABLE", true), ifExists = false, purge = false))
+      DropTable(UnresolvedIdentifier(Seq("tab"), true), ifExists = false, purge = false))
     parseCompare(s"DROP TABLE IF EXISTS tab",
-      DropTable(
-        UnresolvedTableOrView(Seq("tab"), "DROP TABLE", true), ifExists = true, purge = false))
+      DropTable(UnresolvedIdentifier(Seq("tab"), true), ifExists = true, purge = false))
     parseCompare(s"DROP TABLE tab PURGE",
-      DropTable(
-        UnresolvedTableOrView(Seq("tab"), "DROP TABLE", true), ifExists = false, purge = true))
+      DropTable(UnresolvedIdentifier(Seq("tab"), true), ifExists = false, purge = true))
     parseCompare(s"DROP TABLE IF EXISTS tab PURGE",
-      DropTable(
-        UnresolvedTableOrView(Seq("tab"), "DROP TABLE", true), ifExists = true, purge = true))
+      DropTable(UnresolvedIdentifier(Seq("tab"), true), ifExists = true, purge = true))
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableSuiteBase.scala
index 3c9b39af8ef..c26022addf0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableSuiteBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableSuiteBase.scala
@@ -57,7 +57,7 @@ trait DropTableSuiteBase extends QueryTest with DDLCommandTestUtils {
       val errMsg = intercept[AnalysisException] {
         sql(s"DROP TABLE $catalog.ns.tbl")
       }.getMessage
-      assert(errMsg.contains("Table or view not found"))
+      assert(errMsg.contains(s"Table $catalog.ns.tbl not found"))
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 7e881655349..e678b866bfe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -713,13 +713,13 @@ class PlanResolutionSuite extends AnalysisTest {
     val tableIdent2 = Identifier.of(Array.empty, "tab")
 
     parseResolveCompare(s"DROP TABLE $tableName1",
-      DropTable(ResolvedTable.create(testCat, tableIdent1, table), ifExists = false, purge = false))
+      DropTable(ResolvedIdentifier(testCat, tableIdent1), ifExists = false, purge = false))
     parseResolveCompare(s"DROP TABLE IF EXISTS $tableName1",
-      DropTable(ResolvedTable.create(testCat, tableIdent1, table), ifExists = true, purge = false))
+      DropTable(ResolvedIdentifier(testCat, tableIdent1), ifExists = true, purge = false))
     parseResolveCompare(s"DROP TABLE $tableName2",
-      DropTable(ResolvedTable.create(testCat, tableIdent2, table), ifExists = false, purge = false))
+      DropTable(ResolvedIdentifier(testCat, tableIdent2), ifExists = false, purge = false))
     parseResolveCompare(s"DROP TABLE IF EXISTS $tableName2",
-      DropTable(ResolvedTable.create(testCat, tableIdent2, table), ifExists = true, purge = false))
+      DropTable(ResolvedIdentifier(testCat, tableIdent2), ifExists = true, purge = false))
   }
 
   test("drop view") {
@@ -728,7 +728,7 @@ class PlanResolutionSuite extends AnalysisTest {
     val viewName2 = "view"
     val viewIdent2 = TableIdentifier("view", Option("default"), Some(SESSION_CATALOG_NAME))
     val tempViewName = "v"
-    val tempViewIdent = TableIdentifier("v")
+    val tempViewIdent = Identifier.of(Array.empty, "v")
 
     parseResolveCompare(s"DROP VIEW $viewName1",
       DropTableCommand(viewIdent1, ifExists = false, isView = true, purge = false))
@@ -739,16 +739,19 @@ class PlanResolutionSuite extends AnalysisTest {
     parseResolveCompare(s"DROP VIEW IF EXISTS $viewName2",
       DropTableCommand(viewIdent2, ifExists = true, isView = true, purge = false))
     parseResolveCompare(s"DROP VIEW $tempViewName",
-      DropTableCommand(tempViewIdent, ifExists = false, isView = true, purge = false))
+      DropTempViewCommand(tempViewIdent))
     parseResolveCompare(s"DROP VIEW IF EXISTS $tempViewName",
-      DropTableCommand(tempViewIdent, ifExists = true, isView = true, purge = false))
+      DropTempViewCommand(tempViewIdent))
   }
 
   test("drop view in v2 catalog") {
-    intercept[AnalysisException] {
+    val e = intercept[AnalysisException] {
       parseAndResolve("DROP VIEW testcat.db.view", checkAnalysis = true)
-    }.getMessage.toLowerCase(Locale.ROOT).contains(
-      "view support in catalog has not been implemented")
+    }
+    checkError(
+      e,
+      errorClass = "UNSUPPORTED_FEATURE.CATALOG_OPERATION",
+      parameters = Map("catalogName" -> "`testcat`", "operation" -> "views"))
   }
 
   // ALTER VIEW view_name SET TBLPROPERTIES ('comment' = new_comment);
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
index 7aa8adc07ed..7371b6cf0bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
@@ -82,9 +82,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
     checkAnswer(sql("SHOW TABLES IN h2.test"), Seq(Row("test", "people", false)))
     Seq(
       "h2.test.not_existing_table" ->
-        "Table or view not found: h2.test.not_existing_table",
+        "Table h2.test.not_existing_table not found",
       "h2.bad_test.not_existing_table" ->
-        "Table or view not found: h2.bad_test.not_existing_table"
+        "Table h2.bad_test.not_existing_table not found"
     ).foreach { case (table, expectedMsg) =>
       val msg = intercept[AnalysisException] {
         sql(s"DROP TABLE $table")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index f1bb8d30eed..ffb6993ccf1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1047,7 +1047,7 @@ class HiveDDLSuite
       sql("CREATE TABLE tab1(c1 int)")
       assertAnalysisError(
         "DROP VIEW tab1",
-        "tab1 is a table. 'DROP VIEW' expects a view. Please use DROP TABLE instead.")
+        "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala
index 0ca6184c946..8c6d718f18a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala
@@ -26,7 +26,7 @@ class DropTableSuite extends v1.DropTableSuiteBase with CommandSuiteBase {
   test("hive client calls") {
     withNamespaceAndTable("ns", "tbl") { t =>
       sql(s"CREATE TABLE $t (id int) $defaultUsing")
-      checkHiveClientCalls(expected = 15) {
+      checkHiveClientCalls(expected = 11) {
         sql(s"DROP TABLE $t")
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org