You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/26 16:30:24 UTC

[spark] branch branch-3.0 updated: [SPARK-30782][SQL] Column resolution doesn't respect current catalog/namespace for v2 tables

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0759e5e  [SPARK-30782][SQL] Column resolution doesn't respect current catalog/namespace for v2 tables
0759e5e is described below

commit 0759e5e7e58a689810d063188c598a5747096895
Author: Terry Kim <yu...@gmail.com>
AuthorDate: Thu Feb 27 00:21:38 2020 +0800

    [SPARK-30782][SQL] Column resolution doesn't respect current catalog/namespace for v2 tables
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to fix an issue where qualified columns are not matched for v2 tables if current catalog/namespace are used.
    
    For v1 tables, you can currently perform the following:
    ```SQL
    SELECT default.t.id FROM t;
    ```
    
    For v2 tables, the following fails:
    ```SQL
    USE testcat.ns1.ns2;
    SELECT testcat.ns1.ns2.t.id FROM t;
    
    org.apache.spark.sql.AnalysisException: cannot resolve '`testcat.ns1.ns2.t.id`' given input columns: [t.id, t.point]; line 1 pos 7;
    ```
    
    ### Why are the changes needed?
    
    It is a bug since qualified column names cannot match if current catalog/namespace are used.
    
    ### Does this PR introduce any user-facing change?
    
    Yes, now the following works:
    ```SQL
    USE testcat.ns1.ns2;
    SELECT testcat.ns1.ns2.t.id FROM t;
    ```
    
    ### How was this patch tested?
    
    Added new tests
    
    Closes #27532 from imback82/qualifed_col_respect_current.
    
    Authored-by: Terry Kim <yu...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 73305475c10f1218bd2060e8575ab4072d0cc50b)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  8 ++--
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 54 +++++++++++++++-------
 2 files changed, 42 insertions(+), 20 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index aec7174..3d79799 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -807,8 +807,10 @@ class Analyzer(
     def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
       case u: UnresolvedRelation =>
         lookupV2Relation(u.multipartIdentifier)
-          .map(SubqueryAlias(u.multipartIdentifier, _))
-          .getOrElse(u)
+          .map { rel =>
+            val ident = rel.identifier.get
+            SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel)
+          }.getOrElse(u)
 
       case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) =>
         CatalogV2Util.loadTable(catalog, ident)
@@ -933,7 +935,7 @@ class Analyzer(
               v1SessionCatalog.getRelation(v1Table.v1Table)
             case table =>
               SubqueryAlias(
-                identifier,
+                ident.asMultipartIdentifier,
                 DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
           }
           val key = catalog.name +: ident.namespace :+ ident.name
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index f642114..4ff2093 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -685,12 +685,21 @@ class DataSourceV2SQLSuite
       sql(s"CREATE TABLE $t (id bigint, point struct<x: bigint, y: bigint>) USING foo")
       sql(s"INSERT INTO $t VALUES (1, (10, 20))")
 
-      checkAnswer(
-        sql(s"SELECT testcat.ns1.ns2.tbl.id, testcat.ns1.ns2.tbl.point.x FROM $t"),
-        Row(1, 10))
-      checkAnswer(sql(s"SELECT ns1.ns2.tbl.id, ns1.ns2.tbl.point.x FROM $t"), Row(1, 10))
-      checkAnswer(sql(s"SELECT ns2.tbl.id, ns2.tbl.point.x FROM $t"), Row(1, 10))
-      checkAnswer(sql(s"SELECT tbl.id, tbl.point.x FROM $t"), Row(1, 10))
+      def check(tbl: String): Unit = {
+        checkAnswer(
+          sql(s"SELECT testcat.ns1.ns2.tbl.id, testcat.ns1.ns2.tbl.point.x FROM $tbl"),
+          Row(1, 10))
+        checkAnswer(sql(s"SELECT ns1.ns2.tbl.id, ns1.ns2.tbl.point.x FROM $tbl"), Row(1, 10))
+        checkAnswer(sql(s"SELECT ns2.tbl.id, ns2.tbl.point.x FROM $tbl"), Row(1, 10))
+        checkAnswer(sql(s"SELECT tbl.id, tbl.point.x FROM $tbl"), Row(1, 10))
+      }
+
+      // Test with qualified table name "testcat.ns1.ns2.tbl".
+      check(t)
+
+      // Test if current catalog and namespace is respected in column resolution.
+      sql("USE testcat.ns1.ns2")
+      check("tbl")
 
       val ex = intercept[AnalysisException] {
         sql(s"SELECT ns1.ns2.ns3.tbl.id from $t")
@@ -700,19 +709,30 @@ class DataSourceV2SQLSuite
   }
 
   test("qualified column names for v1 tables") {
-    // unset this config to use the default v2 session catalog.
-    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
-
-    withTable("t") {
-      sql("CREATE TABLE t USING json AS SELECT 1 AS i")
-      checkAnswer(sql("select t.i from spark_catalog.default.t"), Row(1))
-      checkAnswer(sql("select default.t.i from spark_catalog.default.t"), Row(1))
+    Seq(true, false).foreach { useV1Table =>
+      val format = if (useV1Table) "json" else v2Format
+      if (useV1Table) {
+        // unset this config to use the default v2 session catalog.
+        spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+      } else {
+        spark.conf.set(
+          V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[InMemoryTableSessionCatalog].getName)
+      }
 
-      // catalog name cannot be used for v1 tables.
-      val ex = intercept[AnalysisException] {
-        sql(s"select spark_catalog.default.t.i from spark_catalog.default.t")
+      withTable("t") {
+        sql(s"CREATE TABLE t USING $format AS SELECT 1 AS i")
+        checkAnswer(sql("select i from t"), Row(1))
+        checkAnswer(sql("select t.i from t"), Row(1))
+        checkAnswer(sql("select default.t.i from t"), Row(1))
+        checkAnswer(sql("select t.i from spark_catalog.default.t"), Row(1))
+        checkAnswer(sql("select default.t.i from spark_catalog.default.t"), Row(1))
+
+        // catalog name cannot be used for tables in the session catalog.
+        val ex = intercept[AnalysisException] {
+          sql(s"select spark_catalog.default.t.i from spark_catalog.default.t")
+        }
+        assert(ex.getMessage.contains("cannot resolve '`spark_catalog.default.t.i`"))
       }
-      assert(ex.getMessage.contains("cannot resolve '`spark_catalog.default.t.i`"))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org