You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/11/12 22:16:02 UTC

spark git commit: [SPARK-22488][BACKPORT-2.2][SQL] Fix the view resolution issue in the SparkSession internal table() API

Repository: spark
Updated Branches:
  refs/heads/branch-2.2 114dc4247 -> 00cb9d0b6


[SPARK-22488][BACKPORT-2.2][SQL] Fix the view resolution issue in the SparkSession internal table() API

## What changes were proposed in this pull request?

The current internal `table()` API of `SparkSession` bypasses the Analyzer and directly calls `sessionState.catalog.lookupRelation` API. This skips the view resolution logics in our Analyzer rule `ResolveRelations`. This internal API is widely used by various DDL commands, public and internal APIs.

Users might get the strange error caused by view resolution when the default database is different.
```
Table or view not found: t1; line 1 pos 14
org.apache.spark.sql.AnalysisException: Table or view not found: t1; line 1 pos 14
	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
```

This PR is to fix it by enforcing it to use `ResolveRelations` to resolve the table.

## How was this patch tested?
Added a test case and modified the existing test cases

Author: gatorsmile <ga...@gmail.com>

Closes #19723 from gatorsmile/backport22488.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00cb9d0b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00cb9d0b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00cb9d0b

Branch: refs/heads/branch-2.2
Commit: 00cb9d0b61f9f94906dd51de3c564eda61d791e6
Parents: 114dc42
Author: gatorsmile <ga...@gmail.com>
Authored: Sun Nov 12 23:15:58 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sun Nov 12 23:15:58 2017 +0100

----------------------------------------------------------------------
 R/pkg/tests/fulltests/test_sparkSQL.R               |  2 +-
 .../scala/org/apache/spark/sql/SparkSession.scala   |  3 ++-
 .../apache/spark/sql/execution/command/cache.scala  |  4 +---
 .../spark/sql/execution/GlobalTempViewSuite.scala   | 16 +++++++++++-----
 .../apache/spark/sql/execution/SQLViewSuite.scala   | 15 +++++++++++++++
 .../spark/sql/execution/command/DDLSuite.scala      |  5 +++--
 .../spark/sql/sources/FilteredScanSuite.scala       |  2 +-
 .../apache/spark/sql/hive/CachedTableSuite.scala    | 14 +++++++++-----
 8 files changed, 43 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/00cb9d0b/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 50c60fe..f774554 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -713,7 +713,7 @@ test_that("test cache, uncache and clearCache", {
   expect_true(dropTempView("table1"))
 
   expect_error(uncacheTable("foo"),
-      "Error in uncacheTable : no such table - Table or view 'foo' not found in database 'default'")
+      "Error in uncacheTable : analysis error - Table or view not found: foo")
 })
 
 test_that("insertInto() on a registered table", {

http://git-wip-us.apache.org/repos/asf/spark/blob/00cb9d0b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index cce8a1c..96882c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.catalog.Catalog
 import org.apache.spark.sql.catalyst._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
@@ -614,7 +615,7 @@ class SparkSession private(
   }
 
   private[sql] def table(tableIdent: TableIdentifier): DataFrame = {
-    Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
+    Dataset.ofRows(self, UnresolvedRelation(tableIdent))
   }
 
   /* ----------------- *

http://git-wip-us.apache.org/repos/asf/spark/blob/00cb9d0b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 336f14d..cfcf3ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -56,10 +56,8 @@ case class UncacheTableCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val tableId = tableIdent.quotedString
-    try {
+    if (!ifExists || sparkSession.catalog.tableExists(tableId)) {
       sparkSession.catalog.uncacheTable(tableId)
-    } catch {
-      case _: NoSuchTableException if ifExists => // don't throw
     }
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/00cb9d0b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
index a3d75b2..cc943e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
@@ -35,23 +35,27 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext {
   private var globalTempDB: String = _
 
   test("basic semantic") {
+    val expectedErrorMsg = "not found"
     try {
       sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 'a'")
 
       // If there is no database in table name, we should try local temp view first, if not found,
       // try table/view in current database, which is "default" in this case. So we expect
       // NoSuchTableException here.
-      intercept[NoSuchTableException](spark.table("src"))
+      var e = intercept[AnalysisException](spark.table("src")).getMessage
+      assert(e.contains(expectedErrorMsg))
 
       // Use qualified name to refer to the global temp view explicitly.
       checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
 
       // Table name without database will never refer to a global temp view.
-      intercept[NoSuchTableException](sql("DROP VIEW src"))
+      e = intercept[AnalysisException](sql("DROP VIEW src")).getMessage
+      assert(e.contains(expectedErrorMsg))
 
       sql(s"DROP VIEW $globalTempDB.src")
       // The global temp view should be dropped successfully.
-      intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
+      e = intercept[AnalysisException](spark.table(s"$globalTempDB.src")).getMessage
+      assert(e.contains(expectedErrorMsg))
 
       // We can also use Dataset API to create global temp view
       Seq(1 -> "a").toDF("i", "j").createGlobalTempView("src")
@@ -59,7 +63,8 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext {
 
       // Use qualified name to rename a global temp view.
       sql(s"ALTER VIEW $globalTempDB.src RENAME TO src2")
-      intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
+      e = intercept[AnalysisException](spark.table(s"$globalTempDB.src")).getMessage
+      assert(e.contains(expectedErrorMsg))
       checkAnswer(spark.table(s"$globalTempDB.src2"), Row(1, "a"))
 
       // Use qualified name to alter a global temp view.
@@ -68,7 +73,8 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext {
 
       // We can also use Catalog API to drop global temp view
       spark.catalog.dropGlobalTempView("src2")
-      intercept[NoSuchTableException](spark.table(s"$globalTempDB.src2"))
+      e = intercept[AnalysisException](spark.table(s"$globalTempDB.src2")).getMessage
+      assert(e.contains(expectedErrorMsg))
 
       // We can also use Dataset API to replace global temp view
       Seq(2 -> "b").toDF("i", "j").createOrReplaceGlobalTempView("src")

http://git-wip-us.apache.org/repos/asf/spark/blob/00cb9d0b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index 6761f05..08a4a21 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -679,4 +679,19 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
       assert(spark.table("v").schema.head.name == "cBa")
     }
   }
+
+  test("sparkSession API view resolution with different default database") {
+    withDatabase("db2") {
+      withView("v1") {
+        withTable("t1") {
+          sql("USE default")
+          sql("CREATE TABLE t1 USING parquet AS SELECT 1 AS c0")
+          sql("CREATE VIEW v1 AS SELECT * FROM t1")
+          sql("CREATE DATABASE IF NOT EXISTS db2")
+          sql("USE db2")
+          checkAnswer(spark.table("default.v1"), Row(1))
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/00cb9d0b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 5109c64..a3abb7d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -800,10 +800,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
       spark.range(10).createOrReplaceTempView("tab1")
       sql("ALTER TABLE tab1 RENAME TO tab2")
       checkAnswer(spark.table("tab2"), spark.range(10).toDF())
-      intercept[NoSuchTableException] { spark.table("tab1") }
+      val e = intercept[AnalysisException](spark.table("tab1")).getMessage
+      assert(e.contains("Table or view not found"))
       sql("ALTER VIEW tab2 RENAME TO tab1")
       checkAnswer(spark.table("tab1"), spark.range(10).toDF())
-      intercept[NoSuchTableException] { spark.table("tab2") }
+      intercept[AnalysisException] { spark.table("tab2") }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/00cb9d0b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 5a0388e..c902b0a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -326,7 +326,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
         assert(ColumnsRequired.set === requiredColumnNames)
 
         val table = spark.table("oneToTenFiltered")
-        val relation = table.queryExecution.logical.collectFirst {
+        val relation = table.queryExecution.analyzed.collectFirst {
           case LogicalRelation(r, _, _) => r
         }.get
 

http://git-wip-us.apache.org/repos/asf/spark/blob/00cb9d0b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index d3cbf89..48ab4eb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -102,14 +102,18 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
   }
 
   test("uncache of nonexistant tables") {
+    val expectedErrorMsg = "Table or view not found: nonexistantTable"
     // make sure table doesn't exist
-    intercept[NoSuchTableException](spark.table("nonexistantTable"))
-    intercept[NoSuchTableException] {
+    var e = intercept[AnalysisException](spark.table("nonexistantTable")).getMessage
+    assert(e.contains(expectedErrorMsg))
+    e = intercept[AnalysisException] {
       spark.catalog.uncacheTable("nonexistantTable")
-    }
-    intercept[NoSuchTableException] {
+    }.getMessage
+    assert(e.contains(expectedErrorMsg))
+    e = intercept[AnalysisException] {
       sql("UNCACHE TABLE nonexistantTable")
-    }
+    }.getMessage
+    assert(e.contains(expectedErrorMsg))
     sql("UNCACHE TABLE IF EXISTS nonexistantTable")
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org