You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/06/29 02:01:39 UTC

[spark] branch master updated: [SPARK-39615][SQL] Make listColumns be compatible with 3 layer namespace

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f15f2c6ad7 [SPARK-39615][SQL] Make listColumns be compatible with 3 layer namespace
1f15f2c6ad7 is described below

commit 1f15f2c6ad7ff8e593d39dd264b4a6efa89d67af
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Wed Jun 29 10:00:59 2022 +0800

    [SPARK-39615][SQL] Make listColumns be compatible with 3 layer namespace
    
    ### What changes were proposed in this pull request?
    Make listColumns be compatible with 3 layer namespace
    
    ### Why are the changes needed?
    for 3 layer namespace compatiblity
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    
    ### How was this patch tested?
    added UT
    
    Closes #37000 from zhengruifeng/sql_3L_list_cols.
    
    Authored-by: Ruifeng Zheng <ru...@apache.org>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 .../apache/spark/sql/internal/CatalogImpl.scala    | 59 ++++++++++++++++++++--
 .../apache/spark/sql/internal/CatalogSuite.scala   | 46 +++++++++++++++++
 2 files changed, 102 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 49cb9a3e897..97226736691 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowTables, SubqueryAlias, TableSpec, View}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, SupportsNamespaces, TableCatalog}
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, IdentifierHelper, TransformHelper}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.types.StructType
@@ -208,8 +208,23 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    */
   @throws[AnalysisException]("table does not exist")
   override def listColumns(tableName: String): Dataset[Column] = {
-    val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
-    listColumns(tableIdent)
+    // calling `sqlParser.parseTableIdentifier` to parse tableName. If it contains only table name
+    // and optionally contains a database name(thus a TableIdentifier), then we look up the table in
+    // sessionCatalog. Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of
+    // string as the qualified identifier and resolve the table through SQL analyzer.
+    try {
+      val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+      if (tableExists(ident.database.orNull, ident.table)) {
+        listColumns(ident)
+      } else {
+        val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
+        listColumns(ident)
+      }
+    } catch {
+      case e: org.apache.spark.sql.catalyst.parser.ParseException =>
+        val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
+        listColumns(ident)
+    }
   }
 
   /**
@@ -238,6 +253,44 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
     CatalogImpl.makeDataset(columns, sparkSession)
   }
 
+  private def listColumns(ident: Seq[String]): Dataset[Column] = {
+    val plan = UnresolvedTableOrView(ident, "Catalog.listColumns", true)
+
+    val columns = sparkSession.sessionState.executePlan(plan).analyzed match {
+      case ResolvedTable(_, _, table, _) =>
+        val (partitionColumnNames, bucketSpecOpt) = table.partitioning.toSeq.convertTransforms
+        val bucketColumnNames = bucketSpecOpt.map(_.bucketColumnNames).getOrElse(Nil)
+        table.schema.map { field =>
+          new Column(
+            name = field.name,
+            description = field.getComment().orNull,
+            dataType = field.dataType.simpleString,
+            nullable = field.nullable,
+            isPartition = partitionColumnNames.contains(field.name),
+            isBucket = bucketColumnNames.contains(field.name))
+        }
+
+      case ResolvedView(identifier, _) =>
+        val catalog = sparkSession.sessionState.catalog
+        val table = identifier.asTableIdentifier
+        val schema = catalog.getTempViewOrPermanentTableMetadata(table).schema
+        schema.map { field =>
+          new Column(
+            name = field.name,
+            description = field.getComment().orNull,
+            dataType = field.dataType.simpleString,
+            nullable = field.nullable,
+            isPartition = false,
+            isBucket = false)
+        }
+
+      case _ => throw QueryCompilationErrors.tableOrViewNotFound(ident)
+    }
+
+    CatalogImpl.makeDataset(columns, sparkSession)
+  }
+
+
   /**
    * Gets the database with the specified name. This throws an `AnalysisException` when no
    * `Database` can be found.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index a1a946ddd71..13f6965a8e8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -273,6 +273,52 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     testListColumns("tab1", dbName = Some("db1"))
   }
 
+  test("SPARK-39615: three layer namespace compatibility - listColumns") {
+    val answers = Map(
+      "col1" -> ("int", true, false, true),
+      "col2" -> ("string", true, false, false),
+      "a" -> ("int", true, true, false),
+      "b" -> ("string", true, true, false)
+    )
+
+    assert(spark.catalog.currentCatalog() === "spark_catalog")
+    createTable("my_table1")
+
+    val columns1 = spark.catalog.listColumns("my_table1").collect()
+    assert(answers ===
+      columns1.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
+
+    val columns2 = spark.catalog.listColumns("default.my_table1").collect()
+    assert(answers ===
+      columns2.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
+
+    val columns3 = spark.catalog.listColumns("spark_catalog.default.my_table1").collect()
+    assert(answers ===
+      columns3.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
+
+    createDatabase("my_db1")
+    createTable("my_table2", Some("my_db1"))
+
+    val columns4 = spark.catalog.listColumns("my_db1.my_table2").collect()
+    assert(answers ===
+      columns4.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
+
+    val columns5 = spark.catalog.listColumns("spark_catalog.my_db1.my_table2").collect()
+    assert(answers ===
+      columns5.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
+
+    val catalogName = "testcat"
+    val dbName = "my_db2"
+    val tableName = "my_table2"
+    val tableSchema = new StructType().add("i", "int").add("j", "string")
+    val description = "this is a test managed table"
+    createTable(tableName, dbName, catalogName, classOf[FakeV2Provider].getName, tableSchema,
+      Map.empty[String, String], description)
+
+    val columns6 = spark.catalog.listColumns("testcat.my_db2.my_table2").collect()
+    assert(Map("i" -> "int", "j" -> "string") === columns6.map(c => c.name -> c.dataType).toMap)
+  }
+
   test("Database.toString") {
     assert(new Database("cool_db", "cool_desc", "cool_path").toString ==
       "Database[name='cool_db', description='cool_desc', path='cool_path']")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org