You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/26 16:30:24 UTC
[spark] branch branch-3.0 updated: [SPARK-30782][SQL] Column
resolution doesn't respect current catalog/namespace for v2 tables
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 0759e5e [SPARK-30782][SQL] Column resolution doesn't respect current catalog/namespace for v2 tables
0759e5e is described below
commit 0759e5e7e58a689810d063188c598a5747096895
Author: Terry Kim <yu...@gmail.com>
AuthorDate: Thu Feb 27 00:21:38 2020 +0800
[SPARK-30782][SQL] Column resolution doesn't respect current catalog/namespace for v2 tables
### What changes were proposed in this pull request?
This PR proposes to fix an issue where qualified columns are not matched for v2 tables if current catalog/namespace are used.
For v1 tables, you can currently perform the following:
```SQL
SELECT default.t.id FROM t;
```
For v2 tables, the following fails:
```SQL
USE testcat.ns1.ns2;
SELECT testcat.ns1.ns2.t.id FROM t;
org.apache.spark.sql.AnalysisException: cannot resolve '`testcat.ns1.ns2.t.id`' given input columns: [t.id, t.point]; line 1 pos 7;
```
### Why are the changes needed?
It is a bug since qualified column names cannot match if current catalog/namespace are used.
### Does this PR introduce any user-facing change?
Yes, now the following works:
```SQL
USE testcat.ns1.ns2;
SELECT testcat.ns1.ns2.t.id FROM t;
```
### How was this patch tested?
Added new tests
Closes #27532 from imback82/qualifed_col_respect_current.
Authored-by: Terry Kim <yu...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 73305475c10f1218bd2060e8575ab4072d0cc50b)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 8 ++--
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 54 +++++++++++++++-------
2 files changed, 42 insertions(+), 20 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index aec7174..3d79799 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -807,8 +807,10 @@ class Analyzer(
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
case u: UnresolvedRelation =>
lookupV2Relation(u.multipartIdentifier)
- .map(SubqueryAlias(u.multipartIdentifier, _))
- .getOrElse(u)
+ .map { rel =>
+ val ident = rel.identifier.get
+ SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel)
+ }.getOrElse(u)
case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) =>
CatalogV2Util.loadTable(catalog, ident)
@@ -933,7 +935,7 @@ class Analyzer(
v1SessionCatalog.getRelation(v1Table.v1Table)
case table =>
SubqueryAlias(
- identifier,
+ ident.asMultipartIdentifier,
DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
}
val key = catalog.name +: ident.namespace :+ ident.name
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index f642114..4ff2093 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -685,12 +685,21 @@ class DataSourceV2SQLSuite
sql(s"CREATE TABLE $t (id bigint, point struct<x: bigint, y: bigint>) USING foo")
sql(s"INSERT INTO $t VALUES (1, (10, 20))")
- checkAnswer(
- sql(s"SELECT testcat.ns1.ns2.tbl.id, testcat.ns1.ns2.tbl.point.x FROM $t"),
- Row(1, 10))
- checkAnswer(sql(s"SELECT ns1.ns2.tbl.id, ns1.ns2.tbl.point.x FROM $t"), Row(1, 10))
- checkAnswer(sql(s"SELECT ns2.tbl.id, ns2.tbl.point.x FROM $t"), Row(1, 10))
- checkAnswer(sql(s"SELECT tbl.id, tbl.point.x FROM $t"), Row(1, 10))
+ def check(tbl: String): Unit = {
+ checkAnswer(
+ sql(s"SELECT testcat.ns1.ns2.tbl.id, testcat.ns1.ns2.tbl.point.x FROM $tbl"),
+ Row(1, 10))
+ checkAnswer(sql(s"SELECT ns1.ns2.tbl.id, ns1.ns2.tbl.point.x FROM $tbl"), Row(1, 10))
+ checkAnswer(sql(s"SELECT ns2.tbl.id, ns2.tbl.point.x FROM $tbl"), Row(1, 10))
+ checkAnswer(sql(s"SELECT tbl.id, tbl.point.x FROM $tbl"), Row(1, 10))
+ }
+
+ // Test with qualified table name "testcat.ns1.ns2.tbl".
+ check(t)
+
+ // Test if current catalog and namespace is respected in column resolution.
+ sql("USE testcat.ns1.ns2")
+ check("tbl")
val ex = intercept[AnalysisException] {
sql(s"SELECT ns1.ns2.ns3.tbl.id from $t")
@@ -700,19 +709,30 @@ class DataSourceV2SQLSuite
}
test("qualified column names for v1 tables") {
- // unset this config to use the default v2 session catalog.
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
-
- withTable("t") {
- sql("CREATE TABLE t USING json AS SELECT 1 AS i")
- checkAnswer(sql("select t.i from spark_catalog.default.t"), Row(1))
- checkAnswer(sql("select default.t.i from spark_catalog.default.t"), Row(1))
+ Seq(true, false).foreach { useV1Table =>
+ val format = if (useV1Table) "json" else v2Format
+ if (useV1Table) {
+ // unset this config to use the default v2 session catalog.
+ spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
+ } else {
+ spark.conf.set(
+ V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[InMemoryTableSessionCatalog].getName)
+ }
- // catalog name cannot be used for v1 tables.
- val ex = intercept[AnalysisException] {
- sql(s"select spark_catalog.default.t.i from spark_catalog.default.t")
+ withTable("t") {
+ sql(s"CREATE TABLE t USING $format AS SELECT 1 AS i")
+ checkAnswer(sql("select i from t"), Row(1))
+ checkAnswer(sql("select t.i from t"), Row(1))
+ checkAnswer(sql("select default.t.i from t"), Row(1))
+ checkAnswer(sql("select t.i from spark_catalog.default.t"), Row(1))
+ checkAnswer(sql("select default.t.i from spark_catalog.default.t"), Row(1))
+
+ // catalog name cannot be used for tables in the session catalog.
+ val ex = intercept[AnalysisException] {
+ sql(s"select spark_catalog.default.t.i from spark_catalog.default.t")
+ }
+ assert(ex.getMessage.contains("cannot resolve '`spark_catalog.default.t.i`"))
}
- assert(ex.getMessage.contains("cannot resolve '`spark_catalog.default.t.i`"))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org