You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2022/08/01 07:27:01 UTC

[spark] branch master updated: [SPARK-39503][SQL][FOLLOWUP] InMemoryCatalog should keep the catalog field when renaming tables

This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 004430054c2 [SPARK-39503][SQL][FOLLOWUP] InMemoryCatalog should keep the catalog field when renaming tables
004430054c2 is described below

commit 004430054c2a1c1599f9451e6c77b64d02de4171
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Aug 1 15:26:44 2022 +0800

    [SPARK-39503][SQL][FOLLOWUP] InMemoryCatalog should keep the catalog field when renaming tables
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/37021 . The `renameTable` method should keep the catalog name in `TableIdentifier`. This is necessary as methods like `getTablesByName` won't qualifier the table identifiers again.
    
    This PR also cleans up `InMemoryCatalog` a bit. The caller side `SessionCatalog` will create tables/functions using qualified identifiers with catalog name, and we don't need to attach catalog name again in places like `getTable`. We just need to make sure we don't drop the catalog field during table updating.
    
    ### Why are the changes needed?
    
    make sure the v1 identifiers are always qualified with catalog name.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. `InMemoryCatalog` is test only
    
    ### How was this patch tested?
    
    N/A
    
    Closes #37347 from cloud-fan/follow.
    
    Lead-authored-by: Wenchen Fan <we...@databricks.com>
    Co-authored-by: Wenchen Fan <cl...@gmail.com>
    Signed-off-by: Yuming Wang <yu...@ebay.com>
---
 .../sql/catalyst/catalog/InMemoryCatalog.scala     | 13 +++++-------
 .../catalyst/catalog/ExternalCatalogSuite.scala    | 23 ++++++++++++++++------
 2 files changed, 22 insertions(+), 14 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 4fe56440c11..218a342e669 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -25,8 +25,6 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.CatalystIdentifier._
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
 import org.apache.spark.sql.catalyst.expressions.Expression
@@ -282,7 +280,7 @@ class InMemoryCatalog(
     requireTableExists(db, oldName)
     requireTableNotExists(db, newName)
     val oldDesc = catalog(db).tables(oldName)
-    oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db)))
+    oldDesc.table = oldDesc.table.copy(identifier = oldDesc.table.identifier.copy(table = newName))
 
     if (oldDesc.table.tableType == CatalogTableType.MANAGED) {
       assert(oldDesc.table.storage.locationUri.isDefined,
@@ -344,8 +342,7 @@ class InMemoryCatalog(
 
   override def getTable(db: String, table: String): CatalogTable = synchronized {
     requireTableExists(db, table)
-    val catalogTable = catalog(db).tables(table).table
-    catalogTable.copy(identifier = attachSessionCatalog(catalogTable.identifier))
+    catalog(db).tables(table).table
   }
 
   override def getTablesByName(db: String, tables: Seq[String]): Seq[CatalogTable] = {
@@ -634,15 +631,15 @@ class InMemoryCatalog(
       newName: String): Unit = synchronized {
     requireFunctionExists(db, oldName)
     requireFunctionNotExists(db, newName)
-    val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))
+    val oldFunc = getFunction(db, oldName)
+    val newFunc = oldFunc.copy(identifier = oldFunc.identifier.copy(funcName = newName))
     catalog(db).functions.remove(oldName)
     catalog(db).functions.put(newName, newFunc)
   }
 
   override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
     requireFunctionExists(db, funcName)
-    val catalogFunction = catalog(db).functions(funcName)
-    catalogFunction.copy(identifier = attachSessionCatalog(catalogFunction.identifier))
+    catalog(db).functions(funcName)
   }
 
   override def functionExists(db: String, funcName: String): Boolean = synchronized {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index bf9bf38b07e..1b0a154a3f4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CatalystIdentifier, FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
@@ -278,8 +278,16 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
   }
 
   test("get tables by name") {
-    assert(newBasicCatalog().getTablesByName("db2", Seq("tbl1", "tbl2"))
-      .map(_.identifier.table) == Seq("tbl1", "tbl2"))
+    val catalog = newBasicCatalog()
+    val tables = catalog.getTablesByName("db2", Seq("tbl1", "tbl2"))
+    assert(tables.map(_.identifier.table).sorted == Seq("tbl1", "tbl2"))
+    assert(tables.forall(_.identifier.catalog.isDefined))
+
+    // After renaming a table, the identifier should still be qualified with catalog.
+    catalog.renameTable("db2", "tbl1", "tblone")
+    val tables2 = catalog.getTablesByName("db2", Seq("tbl2", "tblone"))
+    assert(tables2.map(_.identifier.table).sorted == Seq("tbl2", "tblone"))
+    assert(tables2.forall(_.identifier.catalog.isDefined))
   }
 
   test("get tables by name when some tables do not exists") {
@@ -1029,7 +1037,7 @@ abstract class CatalogTestUtils {
       database: Option[String] = None,
       defaultColumns: Boolean = false): CatalogTable = {
     CatalogTable(
-      identifier = TableIdentifier(name, database),
+      identifier = CatalystIdentifier.attachSessionCatalog(TableIdentifier(name, database)),
       tableType = CatalogTableType.EXTERNAL,
       storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)),
       schema = if (defaultColumns) {
@@ -1073,7 +1081,7 @@ abstract class CatalogTestUtils {
       name: String,
       props: Map[String, String]): CatalogTable = {
     CatalogTable(
-      identifier = TableIdentifier(name, Some(db)),
+      identifier = CatalystIdentifier.attachSessionCatalog(TableIdentifier(name, Some(db))),
       tableType = CatalogTableType.VIEW,
       storage = CatalogStorageFormat.empty,
       schema = new StructType()
@@ -1086,7 +1094,10 @@ abstract class CatalogTestUtils {
   }
 
   def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
-    CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[FunctionResource])
+    CatalogFunction(
+      CatalystIdentifier.attachSessionCatalog(FunctionIdentifier(name, database)),
+      funcClass,
+      Seq.empty[FunctionResource])
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org