You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/21 03:18:53 UTC
[spark] branch master updated: [SPARK-39263][SQL] Make GetTable, TableExists and DatabaseExists be compatible with 3 layer namespace
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ca5f7e6c35d [SPARK-39263][SQL] Make GetTable, TableExists and DatabaseExists be compatible with 3 layer namespace
ca5f7e6c35d is described below
commit ca5f7e6c35d49e9599c39fcd0828b3e557848d11
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Tue Jun 21 11:18:36 2022 +0800
[SPARK-39263][SQL] Make GetTable, TableExists and DatabaseExists be compatible with 3 layer namespace
### What changes were proposed in this pull request?
Make GetTable, TableExists and DatabaseExists be compatible with 3 layer namespace
### Why are the changes needed?
This is a part of effort to make catalog API be compatible with 3 layer namespace
### Does this PR introduce _any_ user-facing change?
Yes. The API change here is backward compatible and it extends the API to further support 3 layer namespace (e.g. catalog.database.table).
### How was this patch tested?
UT
Closes #36641 from amaliujia/catalogapi2.
Authored-by: Rui Wang <ru...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/catalyst/catalog/SessionCatalog.scala | 2 +-
.../apache/spark/sql/internal/CatalogImpl.scala | 55 ++++++++++++++++---
.../apache/spark/sql/internal/CatalogSuite.scala | 64 +++++++++++++++++++++-
3 files changed, 112 insertions(+), 9 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0152f49c798..54959b523c9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -966,7 +966,7 @@ class SessionCatalog(
}
def isGlobalTempViewDB(dbName: String): Boolean = {
- globalTempViewManager.database.equals(dbName)
+ globalTempViewManager.database.equalsIgnoreCase(dbName)
}
def lookupTempView(name: TableIdentifier): Option[View] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 4b6ea33f3e6..f89a87c3011 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -23,12 +23,13 @@ import scala.util.control.NonFatal
import org.apache.spark.sql._
import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, ResolvedView, UnresolvedDBObjectName, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable, ResolvedView, UnresolvedDBObjectName, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowTables, SubqueryAlias, TableSpec, View}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, SupportsNamespaces, TableCatalog}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.types.StructType
@@ -250,8 +251,26 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* table/view. This throws an `AnalysisException` when no `Table` can be found.
*/
override def getTable(tableName: String): Table = {
- val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
- getTable(tableIdent.database.orNull, tableIdent.table)
+ // calling `sqlParser.parseTableIdentifier` to parse tableName. If it contains only table name
+ // and optionally contains a database name(thus a TableIdentifier), then we look up the table in
+ // sessionCatalog. Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of
+ // string as the qualified identifier and resolve the table through SQL analyzer.
+ try {
+ val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+ if (tableExists(ident.database.orNull, ident.table)) {
+ makeTable(ident)
+ } else {
+ getTable3LNamespace(tableName)
+ }
+ } catch {
+ case e: org.apache.spark.sql.catalyst.parser.ParseException =>
+ getTable3LNamespace(tableName)
+ }
+ }
+
+ private def getTable3LNamespace(tableName: String): Table = {
+ val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
+ makeTable(ident)
}
/**
@@ -287,7 +306,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* Checks if the database with the specified name exists.
*/
override def databaseExists(dbName: String): Boolean = {
- sessionCatalog.databaseExists(dbName)
+ // To maintain backwards compatibility, we first treat the input is a simple dbName and check
+ // if sessionCatalog contains it. If no, we try to parse it as 3 part name. If the parased
+ // identifier contains both catalog name and database name, we then search the database in the
+ // catalog.
+ if (!sessionCatalog.databaseExists(dbName)) {
+ val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
+ val plan = sparkSession.sessionState.executePlan(UnresolvedNamespace(ident)).analyzed
+ plan match {
+ case ResolvedNamespace(catalog: SupportsNamespaces, _) =>
+ catalog.namespaceExists(ident.slice(1, ident.size).toArray)
+ case _ => true
+ }
+ } else {
+ true
+ }
}
/**
@@ -295,8 +328,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* view or a table/view.
*/
override def tableExists(tableName: String): Boolean = {
- val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
- tableExists(tableIdent.database.orNull, tableIdent.table)
+ try {
+ val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+ tableExists(tableIdent.database.orNull, tableIdent.table)
+ } catch {
+ case e: org.apache.spark.sql.catalyst.parser.ParseException =>
+ val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
+ val catalog =
+ sparkSession.sessionState.catalogManager.catalog(ident(0)).asTableCatalog
+ catalog.tableExists(Identifier.of(Array(ident(1)), ident(2)))
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 06db60676ef..4844884f693 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.connector.FakeV2Provider
-import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, InMemoryCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
@@ -681,4 +681,66 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(spark.catalog.listTables("default").collect().map(_.name).toSet ==
Set("my_table1", "my_table2", "my_temp_table"))
}
+
+ test("three layer namespace compatibility - get table") {
+ val catalogName = "testcat"
+ val dbName = "default"
+ val tableName = "my_table"
+ val tableSchema = new StructType().add("i", "int")
+ val description = "this is a test table"
+
+ spark.catalog.createTable(
+ tableName = Array(catalogName, dbName, tableName).mkString("."),
+ source = classOf[FakeV2Provider].getName,
+ schema = tableSchema,
+ description = description,
+ options = Map.empty[String, String])
+
+ val t = spark.catalog.getTable(Array(catalogName, dbName, tableName).mkString("."))
+ val expectedTable =
+ new Table(
+ tableName,
+ catalogName,
+ Array(dbName),
+ description,
+ CatalogTableType.MANAGED.name,
+ false)
+ assert(expectedTable.toString == t.toString)
+
+ // test when both sessionCatalog and testcat contains tables with same name, and we expect
+ // the table in sessionCatalog is returned when use 2 part name.
+ createTable("my_table")
+ val t2 = spark.catalog.getTable(Array(dbName, tableName).mkString("."))
+ assert(t2.catalog == CatalogManager.SESSION_CATALOG_NAME)
+ }
+
+ test("three layer namespace compatibility - table exists") {
+ val catalogName = "testcat"
+ val dbName = "my_db"
+ val tableName = "my_table"
+ val tableSchema = new StructType().add("i", "int")
+
+ assert(!spark.catalog.tableExists(Array(catalogName, dbName, tableName).mkString(".")))
+
+ spark.catalog.createTable(
+ tableName = Array(catalogName, dbName, tableName).mkString("."),
+ source = classOf[FakeV2Provider].getName,
+ schema = tableSchema,
+ description = "",
+ options = Map.empty[String, String])
+
+ assert(spark.catalog.tableExists(Array(catalogName, dbName, tableName).mkString(".")))
+ }
+
+ test("three layer namespace compatibility - database exists") {
+ val catalogName = "testcat"
+ val dbName = "my_db"
+ assert(!spark.catalog.databaseExists(Array(catalogName, dbName).mkString(".")))
+
+ sql(s"CREATE NAMESPACE ${catalogName}.${dbName}")
+ assert(spark.catalog.databaseExists(Array(catalogName, dbName).mkString(".")))
+
+ val catalogName2 = "catalog_not_exists"
+ assert(!spark.catalog.databaseExists(Array(catalogName2, dbName).mkString(".")))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org