You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/06/29 02:01:39 UTC
[spark] branch master updated: [SPARK-39615][SQL] Make listColumns be compatible with 3 layer namespace
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1f15f2c6ad7 [SPARK-39615][SQL] Make listColumns be compatible with 3 layer namespace
1f15f2c6ad7 is described below
commit 1f15f2c6ad7ff8e593d39dd264b4a6efa89d67af
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Wed Jun 29 10:00:59 2022 +0800
[SPARK-39615][SQL] Make listColumns be compatible with 3 layer namespace
### What changes were proposed in this pull request?
Make listColumns be compatible with 3 layer namespace
### Why are the changes needed?
for 3 layer namespace compatiblity
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
added UT
Closes #37000 from zhengruifeng/sql_3L_list_cols.
Authored-by: Ruifeng Zheng <ru...@apache.org>
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
.../apache/spark/sql/internal/CatalogImpl.scala | 59 ++++++++++++++++++++--
.../apache/spark/sql/internal/CatalogSuite.scala | 46 +++++++++++++++++
2 files changed, 102 insertions(+), 3 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 49cb9a3e897..97226736691 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowTables, SubqueryAlias, TableSpec, View}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, SupportsNamespaces, TableCatalog}
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, IdentifierHelper, TransformHelper}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.types.StructType
@@ -208,8 +208,23 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
@throws[AnalysisException]("table does not exist")
override def listColumns(tableName: String): Dataset[Column] = {
- val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
- listColumns(tableIdent)
+ // calling `sqlParser.parseTableIdentifier` to parse tableName. If it contains only table name
+ // and optionally contains a database name(thus a TableIdentifier), then we look up the table in
+ // sessionCatalog. Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of
+ // string as the qualified identifier and resolve the table through SQL analyzer.
+ try {
+ val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+ if (tableExists(ident.database.orNull, ident.table)) {
+ listColumns(ident)
+ } else {
+ val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
+ listColumns(ident)
+ }
+ } catch {
+ case e: org.apache.spark.sql.catalyst.parser.ParseException =>
+ val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
+ listColumns(ident)
+ }
}
/**
@@ -238,6 +253,44 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
CatalogImpl.makeDataset(columns, sparkSession)
}
+ private def listColumns(ident: Seq[String]): Dataset[Column] = {
+ val plan = UnresolvedTableOrView(ident, "Catalog.listColumns", true)
+
+ val columns = sparkSession.sessionState.executePlan(plan).analyzed match {
+ case ResolvedTable(_, _, table, _) =>
+ val (partitionColumnNames, bucketSpecOpt) = table.partitioning.toSeq.convertTransforms
+ val bucketColumnNames = bucketSpecOpt.map(_.bucketColumnNames).getOrElse(Nil)
+ table.schema.map { field =>
+ new Column(
+ name = field.name,
+ description = field.getComment().orNull,
+ dataType = field.dataType.simpleString,
+ nullable = field.nullable,
+ isPartition = partitionColumnNames.contains(field.name),
+ isBucket = bucketColumnNames.contains(field.name))
+ }
+
+ case ResolvedView(identifier, _) =>
+ val catalog = sparkSession.sessionState.catalog
+ val table = identifier.asTableIdentifier
+ val schema = catalog.getTempViewOrPermanentTableMetadata(table).schema
+ schema.map { field =>
+ new Column(
+ name = field.name,
+ description = field.getComment().orNull,
+ dataType = field.dataType.simpleString,
+ nullable = field.nullable,
+ isPartition = false,
+ isBucket = false)
+ }
+
+ case _ => throw QueryCompilationErrors.tableOrViewNotFound(ident)
+ }
+
+ CatalogImpl.makeDataset(columns, sparkSession)
+ }
+
+
/**
* Gets the database with the specified name. This throws an `AnalysisException` when no
* `Database` can be found.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index a1a946ddd71..13f6965a8e8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -273,6 +273,52 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
testListColumns("tab1", dbName = Some("db1"))
}
+ test("SPARK-39615: three layer namespace compatibility - listColumns") {
+ val answers = Map(
+ "col1" -> ("int", true, false, true),
+ "col2" -> ("string", true, false, false),
+ "a" -> ("int", true, true, false),
+ "b" -> ("string", true, true, false)
+ )
+
+ assert(spark.catalog.currentCatalog() === "spark_catalog")
+ createTable("my_table1")
+
+ val columns1 = spark.catalog.listColumns("my_table1").collect()
+ assert(answers ===
+ columns1.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
+
+ val columns2 = spark.catalog.listColumns("default.my_table1").collect()
+ assert(answers ===
+ columns2.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
+
+ val columns3 = spark.catalog.listColumns("spark_catalog.default.my_table1").collect()
+ assert(answers ===
+ columns3.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
+
+ createDatabase("my_db1")
+ createTable("my_table2", Some("my_db1"))
+
+ val columns4 = spark.catalog.listColumns("my_db1.my_table2").collect()
+ assert(answers ===
+ columns4.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
+
+ val columns5 = spark.catalog.listColumns("spark_catalog.my_db1.my_table2").collect()
+ assert(answers ===
+ columns5.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
+
+ val catalogName = "testcat"
+ val dbName = "my_db2"
+ val tableName = "my_table2"
+ val tableSchema = new StructType().add("i", "int").add("j", "string")
+ val description = "this is a test managed table"
+ createTable(tableName, dbName, catalogName, classOf[FakeV2Provider].getName, tableSchema,
+ Map.empty[String, String], description)
+
+ val columns6 = spark.catalog.listColumns("testcat.my_db2.my_table2").collect()
+ assert(Map("i" -> "int", "j" -> "string") === columns6.map(c => c.name -> c.dataType).toMap)
+ }
+
test("Database.toString") {
assert(new Database("cool_db", "cool_desc", "cool_path").toString ==
"Database[name='cool_db', description='cool_desc', path='cool_path']")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org