You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/02/23 08:43:17 UTC
[spark] branch branch-3.1 updated: [SPARK-34490][SQL] Analysis
should fail if the view refers a dropped table
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new ac7cbdb [SPARK-34490][SQL] Analysis should fail if the view refers a dropped table
ac7cbdb is described below
commit ac7cbdb4abd9e010f9a17dd22d254903ba64fd35
Author: Linhong Liu <li...@databricks.com>
AuthorDate: Tue Feb 23 15:51:02 2021 +0800
[SPARK-34490][SQL] Analysis should fail if the view refers a dropped table
When resolving a view, we use the captured view name in `AnalysisContext` to
distinguish whether a relation name is a view or a table. But if the resolution failed,
other rules (e.g. `ResolveTables`) will try to resolve the relation again but without
`AnalysisContext`. So, in this case, the resolution may be incorrect. For example,
if the view refers to a dropped table while a view with the same name exists, the
dropped table will be resolved as a view rather than an unresolved exception.
bugfix
no
newly added test cases
Closes #31606 from linhongliu-db/fix-temp-view-master.
Lead-authored-by: Linhong Liu <li...@databricks.com>
Co-authored-by: Linhong Liu <67...@users.noreply.github.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit be675a052c38a36ce5e33ba56bdc69cc8972b3e8)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 27 +++++++++++++++-------
.../catalyst/analysis/TableLookupCacheSuite.scala | 13 ++++++++---
.../spark/sql/execution/SQLViewTestSuite.scala | 20 ++++++++++++++++
3 files changed, 49 insertions(+), 11 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e9e8ba8..bf80031 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -872,16 +872,16 @@ class Analyzer(override val catalogManager: CatalogManager)
object ResolveTempViews extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(ident, _, isStreaming) =>
- lookupTempView(ident, isStreaming).getOrElse(u)
+ lookupTempView(ident, isStreaming, performCheck = true).getOrElse(u)
case i @ InsertIntoStatement(UnresolvedRelation(ident, _, false), _, _, _, _, _) =>
- lookupTempView(ident)
+ lookupTempView(ident, performCheck = true)
.map(view => i.copy(table = view))
.getOrElse(i)
// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
case UnresolvedRelation(ident, _, false) =>
- lookupTempView(ident).map(EliminateSubqueryAliases(_)).map {
+ lookupTempView(ident, performCheck = true).map(EliminateSubqueryAliases(_)).map {
case r: DataSourceV2Relation => write.withNewTable(r)
case _ => throw new AnalysisException("Cannot write into temp view " +
s"${ident.quoted} as it's not a data source v2 relation.")
@@ -906,7 +906,9 @@ class Analyzer(override val catalogManager: CatalogManager)
}
def lookupTempView(
- identifier: Seq[String], isStreaming: Boolean = false): Option[LogicalPlan] = {
+ identifier: Seq[String],
+ isStreaming: Boolean = false,
+ performCheck: Boolean = false): Option[LogicalPlan] = {
// Permanent View can't refer to temp views, no need to lookup at all.
if (isResolvingView && !referredTempViewNames.contains(identifier)) return None
@@ -920,7 +922,7 @@ class Analyzer(override val catalogManager: CatalogManager)
throw new AnalysisException(s"${identifier.quoted} is not a temp view of streaming " +
s"logical plan, please use batch API such as `DataFrameReader.table` to read it.")
}
- tmpView.map(ResolveRelations.resolveViews)
+ tmpView.map(ResolveRelations.resolveViews(_, performCheck))
}
}
@@ -1074,7 +1076,7 @@ class Analyzer(override val catalogManager: CatalogManager)
// look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name.
// If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names
// with it, instead of current catalog and namespace.
- def resolveViews(plan: LogicalPlan): LogicalPlan = plan match {
+ def resolveViews(plan: LogicalPlan, performCheck: Boolean = false): LogicalPlan = plan match {
// The view's child should be a logical plan parsed from the `desc.viewText`, the variable
// `viewText` should be defined, or else we throw an error on the generation of the View
// operator.
@@ -1093,9 +1095,18 @@ class Analyzer(override val catalogManager: CatalogManager)
executeSameContext(child)
}
}
+ // Fail the analysis eagerly because outside AnalysisContext, the unresolved operators
+ // inside a view maybe resolved incorrectly.
+ // But for commands like `DropViewCommand`, resolving view is unnecessary even though
+ // there is unresolved node. So use the `performCheck` flag to skip the analysis check
+ // for these commands.
+ // TODO(SPARK-34504): avoid unnecessary view resolving and remove the `performCheck` flag
+ if (performCheck) {
+ checkAnalysis(newChild)
+ }
view.copy(child = newChild)
case p @ SubqueryAlias(_, view: View) =>
- p.copy(child = resolveViews(view))
+ p.copy(child = resolveViews(view, performCheck))
case _ => plan
}
@@ -1133,7 +1144,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case u: UnresolvedRelation =>
lookupRelation(u.multipartIdentifier, u.options, u.isStreaming)
- .map(resolveViews).getOrElse(u)
+ .map(resolveViews(_, performCheck = true)).getOrElse(u)
case u @ UnresolvedTable(identifier, cmd) =>
lookupTableOrView(identifier).map {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala
index 3e9a8b7..ec94805 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.analysis
import java.io.File
+import scala.collection.JavaConverters._
+
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
@@ -27,8 +29,8 @@ import org.scalatest.matchers.must.Matchers
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, ExternalCatalog, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.connector.InMemoryTableCatalog
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, V1Table}
+import org.apache.spark.sql.connector.{InMemoryTable, InMemoryTableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table}
import org.apache.spark.sql.types._
class TableLookupCacheSuite extends AnalysisTest with Matchers {
@@ -46,7 +48,12 @@ class TableLookupCacheSuite extends AnalysisTest with Matchers {
ignoreIfExists = false)
val v2Catalog = new InMemoryTableCatalog {
override def loadTable(ident: Identifier): Table = {
- V1Table(externalCatalog.getTable("default", ident.name))
+ val catalogTable = externalCatalog.getTable("default", ident.name)
+ new InMemoryTable(
+ catalogTable.identifier.table,
+ catalogTable.schema,
+ Array.empty,
+ Map.empty[String, String].asJava)
}
override def name: String = CatalogManager.SESSION_CATALOG_NAME
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
index 68e1a68..84a20bb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
@@ -258,6 +258,26 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils {
checkViewOutput(viewName, Seq(Row(2)))
}
}
+
+ test("SPARK-34490 - query should fail if the view refers a dropped table") {
+ withTable("t") {
+ Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
+ val viewName = createView("testView", "SELECT * FROM t")
+ withView(viewName) {
+ // Always create a temp view in this case, not use `createView` on purpose
+ sql("CREATE TEMP VIEW t AS SELECT 1 AS c1")
+ withTempView("t") {
+ checkViewOutput(viewName, Seq(Row(2), Row(3), Row(1)))
+ // Manually drop table `t` to see if the query will fail
+ sql("DROP TABLE IF EXISTS default.t")
+ val e = intercept[AnalysisException] {
+ sql(s"SELECT * FROM $viewName").collect()
+ }.getMessage
+ assert(e.contains("Table or view not found: t"))
+ }
+ }
+ }
+ }
}
class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org