You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2022/08/01 07:27:01 UTC
[spark] branch master updated: [SPARK-39503][SQL][FOLLOWUP] InMemoryCatalog should keep the catalog field when renaming tables
This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 004430054c2 [SPARK-39503][SQL][FOLLOWUP] InMemoryCatalog should keep the catalog field when renaming tables
004430054c2 is described below
commit 004430054c2a1c1599f9451e6c77b64d02de4171
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Aug 1 15:26:44 2022 +0800
[SPARK-39503][SQL][FOLLOWUP] InMemoryCatalog should keep the catalog field when renaming tables
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/37021 . The `renameTable` method should keep the catalog name in `TableIdentifier`. This is necessary as methods like `getTablesByName` won't qualifier the table identifiers again.
This PR also cleans up `InMemoryCatalog` a bit. The caller side `SessionCatalog` will create tables/functions using qualified identifiers with catalog name, and we don't need to attach catalog name again in places like `getTable`. We just need to make sure we don't drop the catalog field during table updating.
### Why are the changes needed?
make sure the v1 identifiers are always qualified with catalog name.
### Does this PR introduce _any_ user-facing change?
No. `InMemoryCatalog` is test only
### How was this patch tested?
N/A
Closes #37347 from cloud-fan/follow.
Lead-authored-by: Wenchen Fan <we...@databricks.com>
Co-authored-by: Wenchen Fan <cl...@gmail.com>
Signed-off-by: Yuming Wang <yu...@ebay.com>
---
.../sql/catalyst/catalog/InMemoryCatalog.scala | 13 +++++-------
.../catalyst/catalog/ExternalCatalogSuite.scala | 23 ++++++++++++++++------
2 files changed, 22 insertions(+), 14 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 4fe56440c11..218a342e669 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -25,8 +25,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.CatalystIdentifier._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -282,7 +280,7 @@ class InMemoryCatalog(
requireTableExists(db, oldName)
requireTableNotExists(db, newName)
val oldDesc = catalog(db).tables(oldName)
- oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db)))
+ oldDesc.table = oldDesc.table.copy(identifier = oldDesc.table.identifier.copy(table = newName))
if (oldDesc.table.tableType == CatalogTableType.MANAGED) {
assert(oldDesc.table.storage.locationUri.isDefined,
@@ -344,8 +342,7 @@ class InMemoryCatalog(
override def getTable(db: String, table: String): CatalogTable = synchronized {
requireTableExists(db, table)
- val catalogTable = catalog(db).tables(table).table
- catalogTable.copy(identifier = attachSessionCatalog(catalogTable.identifier))
+ catalog(db).tables(table).table
}
override def getTablesByName(db: String, tables: Seq[String]): Seq[CatalogTable] = {
@@ -634,15 +631,15 @@ class InMemoryCatalog(
newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
requireFunctionNotExists(db, newName)
- val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))
+ val oldFunc = getFunction(db, oldName)
+ val newFunc = oldFunc.copy(identifier = oldFunc.identifier.copy(funcName = newName))
catalog(db).functions.remove(oldName)
catalog(db).functions.put(newName, newFunc)
}
override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
requireFunctionExists(db, funcName)
- val catalogFunction = catalog(db).functions(funcName)
- catalogFunction.copy(identifier = attachSessionCatalog(catalogFunction.identifier))
+ catalog(db).functions(funcName)
}
override def functionExists(db: String, funcName: String): Boolean = synchronized {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index bf9bf38b07e..1b0a154a3f4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CatalystIdentifier, FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
@@ -278,8 +278,16 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
}
test("get tables by name") {
- assert(newBasicCatalog().getTablesByName("db2", Seq("tbl1", "tbl2"))
- .map(_.identifier.table) == Seq("tbl1", "tbl2"))
+ val catalog = newBasicCatalog()
+ val tables = catalog.getTablesByName("db2", Seq("tbl1", "tbl2"))
+ assert(tables.map(_.identifier.table).sorted == Seq("tbl1", "tbl2"))
+ assert(tables.forall(_.identifier.catalog.isDefined))
+
+ // After renaming a table, the identifier should still be qualified with catalog.
+ catalog.renameTable("db2", "tbl1", "tblone")
+ val tables2 = catalog.getTablesByName("db2", Seq("tbl2", "tblone"))
+ assert(tables2.map(_.identifier.table).sorted == Seq("tbl2", "tblone"))
+ assert(tables2.forall(_.identifier.catalog.isDefined))
}
test("get tables by name when some tables do not exists") {
@@ -1029,7 +1037,7 @@ abstract class CatalogTestUtils {
database: Option[String] = None,
defaultColumns: Boolean = false): CatalogTable = {
CatalogTable(
- identifier = TableIdentifier(name, database),
+ identifier = CatalystIdentifier.attachSessionCatalog(TableIdentifier(name, database)),
tableType = CatalogTableType.EXTERNAL,
storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)),
schema = if (defaultColumns) {
@@ -1073,7 +1081,7 @@ abstract class CatalogTestUtils {
name: String,
props: Map[String, String]): CatalogTable = {
CatalogTable(
- identifier = TableIdentifier(name, Some(db)),
+ identifier = CatalystIdentifier.attachSessionCatalog(TableIdentifier(name, Some(db))),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = new StructType()
@@ -1086,7 +1094,10 @@ abstract class CatalogTestUtils {
}
def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
- CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[FunctionResource])
+ CatalogFunction(
+ CatalystIdentifier.attachSessionCatalog(FunctionIdentifier(name, database)),
+ funcClass,
+ Seq.empty[FunctionResource])
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org