You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/02/23 08:43:17 UTC

[spark] branch branch-3.1 updated: [SPARK-34490][SQL] Analysis should fail if the view refers a dropped table

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new ac7cbdb  [SPARK-34490][SQL] Analysis should fail if the view refers a dropped table
ac7cbdb is described below

commit ac7cbdb4abd9e010f9a17dd22d254903ba64fd35
Author: Linhong Liu <li...@databricks.com>
AuthorDate: Tue Feb 23 15:51:02 2021 +0800

    [SPARK-34490][SQL] Analysis should fail if the view refers a dropped table
    
    When resolving a view, we use the captured view name in `AnalysisContext` to
    distinguish whether a relation name is a view or a table. But if the resolution failed,
    other rules (e.g. `ResolveTables`) will try to resolve the relation again but without
    `AnalysisContext`. So, in this case, the resolution may be incorrect. For example,
    if the view refers to a dropped table while a view with the same name exists, the
    dropped table will be resolved as a view rather than an unresolved exception.
    
    bugfix
    
    no
    
    newly added test cases
    
    Closes #31606 from linhongliu-db/fix-temp-view-master.
    
    Lead-authored-by: Linhong Liu <li...@databricks.com>
    Co-authored-by: Linhong Liu <67...@users.noreply.github.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit be675a052c38a36ce5e33ba56bdc69cc8972b3e8)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 27 +++++++++++++++-------
 .../catalyst/analysis/TableLookupCacheSuite.scala  | 13 ++++++++---
 .../spark/sql/execution/SQLViewTestSuite.scala     | 20 ++++++++++++++++
 3 files changed, 49 insertions(+), 11 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e9e8ba8..bf80031 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -872,16 +872,16 @@ class Analyzer(override val catalogManager: CatalogManager)
   object ResolveTempViews extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
       case u @ UnresolvedRelation(ident, _, isStreaming) =>
-        lookupTempView(ident, isStreaming).getOrElse(u)
+        lookupTempView(ident, isStreaming, performCheck = true).getOrElse(u)
       case i @ InsertIntoStatement(UnresolvedRelation(ident, _, false), _, _, _, _, _) =>
-        lookupTempView(ident)
+        lookupTempView(ident, performCheck = true)
           .map(view => i.copy(table = view))
           .getOrElse(i)
       // TODO (SPARK-27484): handle streaming write commands when we have them.
       case write: V2WriteCommand =>
         write.table match {
           case UnresolvedRelation(ident, _, false) =>
-            lookupTempView(ident).map(EliminateSubqueryAliases(_)).map {
+            lookupTempView(ident, performCheck = true).map(EliminateSubqueryAliases(_)).map {
               case r: DataSourceV2Relation => write.withNewTable(r)
               case _ => throw new AnalysisException("Cannot write into temp view " +
                 s"${ident.quoted} as it's not a data source v2 relation.")
@@ -906,7 +906,9 @@ class Analyzer(override val catalogManager: CatalogManager)
     }
 
     def lookupTempView(
-        identifier: Seq[String], isStreaming: Boolean = false): Option[LogicalPlan] = {
+        identifier: Seq[String],
+        isStreaming: Boolean = false,
+        performCheck: Boolean = false): Option[LogicalPlan] = {
       // Permanent View can't refer to temp views, no need to lookup at all.
       if (isResolvingView && !referredTempViewNames.contains(identifier)) return None
 
@@ -920,7 +922,7 @@ class Analyzer(override val catalogManager: CatalogManager)
         throw new AnalysisException(s"${identifier.quoted} is not a temp view of streaming " +
           s"logical plan, please use batch API such as `DataFrameReader.table` to read it.")
       }
-      tmpView.map(ResolveRelations.resolveViews)
+      tmpView.map(ResolveRelations.resolveViews(_, performCheck))
     }
   }
 
@@ -1074,7 +1076,7 @@ class Analyzer(override val catalogManager: CatalogManager)
     // look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name.
     // If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names
     // with it, instead of current catalog and namespace.
-    def resolveViews(plan: LogicalPlan): LogicalPlan = plan match {
+    def resolveViews(plan: LogicalPlan, performCheck: Boolean = false): LogicalPlan = plan match {
       // The view's child should be a logical plan parsed from the `desc.viewText`, the variable
       // `viewText` should be defined, or else we throw an error on the generation of the View
       // operator.
@@ -1093,9 +1095,18 @@ class Analyzer(override val catalogManager: CatalogManager)
             executeSameContext(child)
           }
         }
+        // Fail the analysis eagerly because outside AnalysisContext, the unresolved operators
+        // inside a view maybe resolved incorrectly.
+        // But for commands like `DropViewCommand`, resolving view is unnecessary even though
+        // there is unresolved node. So use the `performCheck` flag to skip the analysis check
+        // for these commands.
+        // TODO(SPARK-34504): avoid unnecessary view resolving and remove the `performCheck` flag
+        if (performCheck) {
+          checkAnalysis(newChild)
+        }
         view.copy(child = newChild)
       case p @ SubqueryAlias(_, view: View) =>
-        p.copy(child = resolveViews(view))
+        p.copy(child = resolveViews(view, performCheck))
       case _ => plan
     }
 
@@ -1133,7 +1144,7 @@ class Analyzer(override val catalogManager: CatalogManager)
 
       case u: UnresolvedRelation =>
         lookupRelation(u.multipartIdentifier, u.options, u.isStreaming)
-          .map(resolveViews).getOrElse(u)
+          .map(resolveViews(_, performCheck = true)).getOrElse(u)
 
       case u @ UnresolvedTable(identifier, cmd) =>
         lookupTableOrView(identifier).map {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala
index 3e9a8b7..ec94805 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.analysis
 
 import java.io.File
 
+import scala.collection.JavaConverters._
+
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
@@ -27,8 +29,8 @@ import org.scalatest.matchers.must.Matchers
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, ExternalCatalog, InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.connector.InMemoryTableCatalog
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, V1Table}
+import org.apache.spark.sql.connector.{InMemoryTable, InMemoryTableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table}
 import org.apache.spark.sql.types._
 
 class TableLookupCacheSuite extends AnalysisTest with Matchers {
@@ -46,7 +48,12 @@ class TableLookupCacheSuite extends AnalysisTest with Matchers {
       ignoreIfExists = false)
     val v2Catalog = new InMemoryTableCatalog {
       override def loadTable(ident: Identifier): Table = {
-        V1Table(externalCatalog.getTable("default", ident.name))
+        val catalogTable = externalCatalog.getTable("default", ident.name)
+        new InMemoryTable(
+          catalogTable.identifier.table,
+          catalogTable.schema,
+          Array.empty,
+          Map.empty[String, String].asJava)
       }
       override def name: String = CatalogManager.SESSION_CATALOG_NAME
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
index 68e1a68..84a20bb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
@@ -258,6 +258,26 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils {
       checkViewOutput(viewName, Seq(Row(2)))
     }
   }
+
+  test("SPARK-34490 - query should fail if the view refers a dropped table") {
+    withTable("t") {
+      Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
+      val viewName = createView("testView", "SELECT * FROM t")
+      withView(viewName) {
+        // Always create a temp view in this case, not use `createView` on purpose
+        sql("CREATE TEMP VIEW t AS SELECT 1 AS c1")
+        withTempView("t") {
+          checkViewOutput(viewName, Seq(Row(2), Row(3), Row(1)))
+          // Manually drop table `t` to see if the query will fail
+          sql("DROP TABLE IF EXISTS default.t")
+          val e = intercept[AnalysisException] {
+            sql(s"SELECT * FROM $viewName").collect()
+          }.getMessage
+          assert(e.contains("Table or view not found: t"))
+        }
+      }
+    }
+  }
 }
 
 class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org