You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/21 03:18:53 UTC

[spark] branch master updated: [SPARK-39263][SQL] Make GetTable, TableExists and DatabaseExists be compatible with 3 layer namespace

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ca5f7e6c35d [SPARK-39263][SQL] Make GetTable, TableExists and DatabaseExists be compatible with 3 layer namespace
ca5f7e6c35d is described below

commit ca5f7e6c35d49e9599c39fcd0828b3e557848d11
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Tue Jun 21 11:18:36 2022 +0800

    [SPARK-39263][SQL] Make GetTable, TableExists and DatabaseExists be compatible with 3 layer namespace
    
    ### What changes were proposed in this pull request?
    
    Make GetTable, TableExists and DatabaseExists be compatible with 3 layer namespace
    
    ### Why are the changes needed?
    
    This is a part of effort to make catalog API be compatible with 3 layer namespace
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. The API change here is backward compatible and it extends the API to further support 3 layer namespace (e.g. catalog.database.table).
    ### How was this patch tested?
    
    UT
    
    Closes #36641 from amaliujia/catalogapi2.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/catalog/SessionCatalog.scala      |  2 +-
 .../apache/spark/sql/internal/CatalogImpl.scala    | 55 ++++++++++++++++---
 .../apache/spark/sql/internal/CatalogSuite.scala   | 64 +++++++++++++++++++++-
 3 files changed, 112 insertions(+), 9 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0152f49c798..54959b523c9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -966,7 +966,7 @@ class SessionCatalog(
   }
 
   def isGlobalTempViewDB(dbName: String): Boolean = {
-    globalTempViewManager.database.equals(dbName)
+    globalTempViewManager.database.equalsIgnoreCase(dbName)
   }
 
   def lookupTempView(name: TableIdentifier): Option[View] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 4b6ea33f3e6..f89a87c3011 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -23,12 +23,13 @@ import scala.util.control.NonFatal
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
 import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, ResolvedView, UnresolvedDBObjectName, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable, ResolvedView, UnresolvedDBObjectName, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowTables, SubqueryAlias, TableSpec, View}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, SupportsNamespaces, TableCatalog}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.types.StructType
@@ -250,8 +251,26 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * table/view. This throws an `AnalysisException` when no `Table` can be found.
    */
   override def getTable(tableName: String): Table = {
-    val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
-    getTable(tableIdent.database.orNull, tableIdent.table)
+    // calling `sqlParser.parseTableIdentifier` to parse tableName. If it contains only table name
+    // and optionally contains a database name(thus a TableIdentifier), then we look up the table in
+    // sessionCatalog. Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of
+    // string as the qualified identifier and resolve the table through SQL analyzer.
+    try {
+      val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+      if (tableExists(ident.database.orNull, ident.table)) {
+        makeTable(ident)
+      } else {
+        getTable3LNamespace(tableName)
+      }
+    } catch {
+      case e: org.apache.spark.sql.catalyst.parser.ParseException =>
+        getTable3LNamespace(tableName)
+    }
+  }
+
+  private def getTable3LNamespace(tableName: String): Table = {
+    val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
+    makeTable(ident)
   }
 
   /**
@@ -287,7 +306,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * Checks if the database with the specified name exists.
    */
   override def databaseExists(dbName: String): Boolean = {
-    sessionCatalog.databaseExists(dbName)
+    // To maintain backwards compatibility, we first treat the input is a simple dbName and check
+    // if sessionCatalog contains it. If no, we try to parse it as 3 part name. If the parased
+    // identifier contains both catalog name and database name, we then search the database in the
+    // catalog.
+    if (!sessionCatalog.databaseExists(dbName)) {
+      val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
+      val plan = sparkSession.sessionState.executePlan(UnresolvedNamespace(ident)).analyzed
+      plan match {
+        case ResolvedNamespace(catalog: SupportsNamespaces, _) =>
+          catalog.namespaceExists(ident.slice(1, ident.size).toArray)
+        case _ => true
+      }
+    } else {
+      true
+    }
   }
 
   /**
@@ -295,8 +328,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * view or a table/view.
    */
   override def tableExists(tableName: String): Boolean = {
-    val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
-    tableExists(tableIdent.database.orNull, tableIdent.table)
+    try {
+      val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+      tableExists(tableIdent.database.orNull, tableIdent.table)
+    } catch {
+      case e: org.apache.spark.sql.catalyst.parser.ParseException =>
+        val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
+        val catalog =
+          sparkSession.sessionState.catalogManager.catalog(ident(0)).asTableCatalog
+        catalog.tableExists(Identifier.of(Array(ident(1)), ident(2)))
+    }
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 06db60676ef..4844884f693 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.connector.FakeV2Provider
-import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, InMemoryCatalog}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StructType
@@ -681,4 +681,66 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     assert(spark.catalog.listTables("default").collect().map(_.name).toSet ==
       Set("my_table1", "my_table2", "my_temp_table"))
   }
+
+  test("three layer namespace compatibility - get table") {
+    val catalogName = "testcat"
+    val dbName = "default"
+    val tableName = "my_table"
+    val tableSchema = new StructType().add("i", "int")
+    val description = "this is a test table"
+
+    spark.catalog.createTable(
+      tableName = Array(catalogName, dbName, tableName).mkString("."),
+      source = classOf[FakeV2Provider].getName,
+      schema = tableSchema,
+      description = description,
+      options = Map.empty[String, String])
+
+    val t = spark.catalog.getTable(Array(catalogName, dbName, tableName).mkString("."))
+    val expectedTable =
+      new Table(
+        tableName,
+        catalogName,
+        Array(dbName),
+        description,
+        CatalogTableType.MANAGED.name,
+        false)
+    assert(expectedTable.toString == t.toString)
+
+    // test when both sessionCatalog and testcat contains tables with same name, and we expect
+    // the table in sessionCatalog is returned when use 2 part name.
+    createTable("my_table")
+    val t2 = spark.catalog.getTable(Array(dbName, tableName).mkString("."))
+    assert(t2.catalog == CatalogManager.SESSION_CATALOG_NAME)
+  }
+
+  test("three layer namespace compatibility - table exists") {
+    val catalogName = "testcat"
+    val dbName = "my_db"
+    val tableName = "my_table"
+    val tableSchema = new StructType().add("i", "int")
+
+    assert(!spark.catalog.tableExists(Array(catalogName, dbName, tableName).mkString(".")))
+
+    spark.catalog.createTable(
+      tableName = Array(catalogName, dbName, tableName).mkString("."),
+      source = classOf[FakeV2Provider].getName,
+      schema = tableSchema,
+      description = "",
+      options = Map.empty[String, String])
+
+    assert(spark.catalog.tableExists(Array(catalogName, dbName, tableName).mkString(".")))
+  }
+
+  test("three layer namespace compatibility - database exists") {
+    val catalogName = "testcat"
+    val dbName = "my_db"
+    assert(!spark.catalog.databaseExists(Array(catalogName, dbName).mkString(".")))
+
+    sql(s"CREATE NAMESPACE ${catalogName}.${dbName}")
+    assert(spark.catalog.databaseExists(Array(catalogName, dbName).mkString(".")))
+
+    val catalogName2 = "catalog_not_exists"
+    assert(!spark.catalog.databaseExists(Array(catalogName2, dbName).mkString(".")))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org