You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/12/26 07:17:45 UTC

(spark) branch master updated: [SPARK-46444][SQL] V2SessionCatalog#createTable should not load the table

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d8fb91e61352 [SPARK-46444][SQL] V2SessionCatalog#createTable should not load the table
d8fb91e61352 is described below

commit d8fb91e61352e57e733e7d7e4978c8ce555454b1
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Dec 26 15:17:30 2023 +0800

    [SPARK-46444][SQL] V2SessionCatalog#createTable should not load the table
    
    ### What changes were proposed in this pull request?
    
    It's a perf regression in CREATE TABLE if we switch to the v2 command framework, as `V2SessionCatalog#createTable` does an extra table lookup, which does not happen in v1. This PR fixes it by allowing `TableCatalog#createTable` to return null, and Spark will call `loadTable` to get the new table metadata in the case of CTAS. This PR also fixed `alterTable` in the same way.
    
    ### Why are the changes needed?
    
    fix perf regression in v2. The perf of a single command may not matter, but in a cluster with many Spark applications, it's important to reduce the RPCs to the metastore.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #44377 from cloud-fan/create-table.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../src/main/resources/error/error-classes.json    |   4 +-
 docs/sql-error-conditions.md                       |   4 +-
 .../spark/sql/connector/catalog/TableCatalog.java  |   8 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |   6 +-
 .../datasources/v2/V2SessionCatalog.scala          |  37 ++---
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |   8 +-
 .../spark/sql/connector/DataSourceV2Suite.scala    |  12 +-
 .../datasources/v2/V2SessionCatalogSuite.scala     | 181 +++++++++++++--------
 8 files changed, 157 insertions(+), 103 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json
index 8970045d4ab3..700b1ed07513 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -895,7 +895,9 @@
   },
   "DATA_SOURCE_TABLE_SCHEMA_MISMATCH" : {
     "message" : [
-      "The schema of the data source table <tableSchema> does not match the actual schema <actualSchema>. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema."
+      "The schema of the data source table does not match the expected schema. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema.",
+      "Data Source schema: <dsSchema>",
+      "Expected schema: <expectedSchema>"
     ],
     "sqlState" : "42K03"
   },
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 0722cae5815e..a8d2b6c894bc 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -496,7 +496,9 @@ Failed to find the data source: `<provider>`. Please find packages at `https://s
 
 [SQLSTATE: 42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
 
-The schema of the data source table `<tableSchema>` does not match the actual schema `<actualSchema>`. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema.
+The schema of the data source table does not match the expected schema. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema.
+Data Source schema: `<dsSchema>`
+Expected schema: `<expectedSchema>`
 
 ### DATETIME_OVERFLOW
 
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index 6642adc33548..74700789dde0 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -187,7 +187,9 @@ public interface TableCatalog extends CatalogPlugin {
    * @param columns the columns of the new table.
    * @param partitions transforms to use for partitioning data in the table
    * @param properties a string map of table properties
-   * @return metadata for the new table
+   * @return metadata for the new table. This can be null if getting the metadata for the new table
+   *         is expensive. Spark will call {@link #loadTable(Identifier)} if needed (e.g. CTAS).
+   *
    * @throws TableAlreadyExistsException If a table or view already exists for the identifier
    * @throws UnsupportedOperationException If a requested partition transform is not supported
    * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
@@ -221,7 +223,9 @@ public interface TableCatalog extends CatalogPlugin {
    *
    * @param ident a table identifier
    * @param changes changes to apply to the table
-   * @return updated metadata for the table
+   * @return updated metadata for the table. This can be null if getting the metadata for the
+   *         updated table is expensive. Spark always discard the returned table here.
+   *
    * @throws NoSuchTableException If the table doesn't exist or is a view
    * @throws IllegalArgumentException If any change is rejected by the implementation.
    */
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index bb2b7e7ae066..ee41cbe2f50e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3926,11 +3926,11 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
   }
 
   def dataSourceTableSchemaMismatchError(
-      tableSchema: StructType, actualSchema: StructType): Throwable = {
+      dsSchema: StructType, expectedSchema: StructType): Throwable = {
     new AnalysisException(
       errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
       messageParameters = Map(
-        "tableSchema" -> toSQLType(tableSchema),
-        "actualSchema" -> toSQLType(actualSchema)))
+        "dsSchema" -> toSQLType(dsSchema),
+        "expectedSchema" -> toSQLType(expectedSchema)))
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index a7694f5d829d..e7445e970fa5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -198,10 +198,21 @@ class V2SessionCatalog(catalog: SessionCatalog)
             s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}")
           // Infer the schema and partitions and store them in the catalog.
           (tableProvider.inferSchema(dsOptions), tableProvider.inferPartitioning(dsOptions))
-        } else if (partitions.isEmpty) {
-          (schema, tableProvider.inferPartitioning(dsOptions))
         } else {
-          (schema, partitions)
+          val partitioning = if (partitions.isEmpty) {
+            tableProvider.inferPartitioning(dsOptions)
+          } else {
+            partitions
+          }
+          val table = tableProvider.getTable(schema, partitions, dsOptions)
+          // Check if the schema of the created table matches the given schema.
+          val tableSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
+            table.columns().asSchema)
+          if (!DataType.equalsIgnoreNullability(tableSchema, schema)) {
+            throw QueryCompilationErrors.dataSourceTableSchemaMismatchError(
+              tableSchema, schema)
+          }
+          (schema, partitioning)
         }
 
       case _ =>
@@ -233,21 +244,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
         throw QueryCompilationErrors.tableAlreadyExistsError(ident)
     }
 
-    val table = loadTable(ident)
-
-    // Check if the schema of the created table matches the given schema.
-    // TODO: move this check in loadTable to match the behavior with
-    // existing file data sources.
-    if (schema.nonEmpty) {
-      val tableSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
-        table.columns().asSchema)
-      if (!DataType.equalsIgnoreNullability(tableSchema, schema)) {
-        throw QueryCompilationErrors.dataSourceTableSchemaMismatchError(
-          table.columns().asSchema, schema)
-      }
-    }
-
-    table
+    null // Return null to save the `loadTable` call for CREATE TABLE without AS SELECT.
   }
 
   private def toOptions(properties: Map[String, String]): Map[String, String] = {
@@ -288,7 +285,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
         throw QueryCompilationErrors.noSuchTableError(ident)
     }
 
-    loadTable(ident)
+    null // Return null to save the `loadTable` call for ALTER TABLE.
   }
 
   override def purgeTable(ident: Identifier): Boolean = {
@@ -332,8 +329,6 @@ class V2SessionCatalog(catalog: SessionCatalog)
       throw QueryCompilationErrors.tableAlreadyExistsError(newIdent)
     }
 
-    // Load table to make sure the table exists
-    loadTable(oldIdent)
     catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 97c1f7ced508..c65c15fb0ef2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -82,9 +82,9 @@ case class CreateTableAsSelectExec(
       }
       throw QueryCompilationErrors.tableAlreadyExistsError(ident)
     }
-    val table = catalog.createTable(
+    val table = Option(catalog.createTable(
       ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
-      partitioning.toArray, properties.asJava)
+      partitioning.toArray, properties.asJava)).getOrElse(catalog.loadTable(ident))
     writeToTable(catalog, table, writeOptions, ident, query)
   }
 }
@@ -162,9 +162,9 @@ case class ReplaceTableAsSelectExec(
     } else if (!orCreate) {
       throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
     }
-    val table = catalog.createTable(
+    val table = Option(catalog.createTable(
       ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
-      partitioning.toArray, properties.asJava)
+      partitioning.toArray, properties.asJava)).getOrElse(catalog.loadTable(ident))
     writeToTable(catalog, table, writeOptions, ident, query)
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index 3f3dc82da5ad..ea263b36c76c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -734,8 +734,8 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
         },
         errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
         parameters = Map(
-          "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
-          "actualSchema" -> "\"STRUCT<x: INT, y: INT>\""))
+          "dsSchema" -> "\"STRUCT<i: INT, j: INT>\"",
+          "expectedSchema" -> "\"STRUCT<x: INT, y: INT>\""))
     }
   }
 
@@ -772,8 +772,8 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
         },
         errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
         parameters = Map(
-          "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
-          "actualSchema" -> "\"STRUCT<col1: INT, col2: INT>\""))
+          "dsSchema" -> "\"STRUCT<i: INT, j: INT>\"",
+          "expectedSchema" -> "\"STRUCT<col1: INT, col2: INT>\""))
     }
   }
 
@@ -790,8 +790,8 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
         },
         errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
         parameters = Map(
-          "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
-          "actualSchema" -> "\"STRUCT<i: STRING, j: STRING>\""))
+          "dsSchema" -> "\"STRUCT<i: INT, j: INT>\"",
+          "expectedSchema" -> "\"STRUCT<i: STRING, j: STRING>\""))
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index f9da55ed6ba3..e5473222d429 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -125,7 +125,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
     assert(parsed == Seq("db", "test_table"))
@@ -143,7 +144,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
+    catalog.createTable(testIdent, schema, emptyTrans, properties)
+    val table = catalog.loadTable(testIdent)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
     assert(parsed == Seq("db", "test_table"))
@@ -158,7 +160,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
       .map(part => quoteIdentifier(part)).mkString(".")
@@ -185,26 +188,30 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
     assert(!catalog.tableExists(testIdent))
 
     // default location
-    val t1 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table]
+    catalog.createTable(testIdent, schema, emptyTrans, properties)
+    val t1 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t1.catalogTable.location ===
       spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))
     catalog.dropTable(testIdent)
 
     // relative path
     properties.put(TableCatalog.PROP_LOCATION, "relative/path")
-    val t2 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table]
+    catalog.createTable(testIdent, schema, emptyTrans, properties)
+    val t2 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path"))
     catalog.dropTable(testIdent)
 
     // absolute path without scheme
     properties.put(TableCatalog.PROP_LOCATION, "/absolute/path")
-    val t3 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table]
+    catalog.createTable(testIdent, schema, emptyTrans, properties)
+    val t3 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t3.catalogTable.location.toString === "file:///absolute/path")
     catalog.dropTable(testIdent)
 
     // absolute path with scheme
     properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path")
-    val t4 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table]
+    catalog.createTable(testIdent, schema, emptyTrans, properties)
+    val t4 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t4.catalogTable.location.toString === "file:/absolute/path")
     catalog.dropTable(testIdent)
   }
@@ -226,12 +233,11 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("loadTable") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
     val loaded = catalog.loadTable(testIdent)
 
-    assert(table.name == loaded.name)
-    assert(table.schema == loaded.schema)
-    assert(table.properties == loaded.properties)
+    assert(loaded.name == testIdent.toString)
+    assert(loaded.schema == schema)
   }
 
   test("loadTable: table does not exist") {
@@ -247,7 +253,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("invalidateTable") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
     catalog.invalidateTable(testIdent)
 
     val loaded = catalog.loadTable(testIdent)
@@ -268,11 +275,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: add property") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(filterV2TableProperties(table.properties) == Map())
 
-    val updated = catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1"))
+    catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1"))
+    val updated = catalog.loadTable(testIdent)
     assert(filterV2TableProperties(updated.properties) == Map("prop-1" -> "1"))
 
     val loaded = catalog.loadTable(testIdent)
@@ -287,11 +296,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
     val properties = new util.HashMap[String, String]()
     properties.put("prop-1", "1")
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
+    catalog.createTable(testIdent, schema, emptyTrans, properties)
+    val table = catalog.loadTable(testIdent)
 
     assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1"))
 
-    val updated = catalog.alterTable(testIdent, TableChange.setProperty("prop-2", "2"))
+    catalog.alterTable(testIdent, TableChange.setProperty("prop-2", "2"))
+    val updated = catalog.loadTable(testIdent)
     assert(filterV2TableProperties(updated.properties) == Map("prop-1" -> "1", "prop-2" -> "2"))
 
     val loaded = catalog.loadTable(testIdent)
@@ -306,11 +317,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
     val properties = new util.HashMap[String, String]()
     properties.put("prop-1", "1")
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
+    catalog.createTable(testIdent, schema, emptyTrans, properties)
+    val table = catalog.loadTable(testIdent)
 
     assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1"))
 
-    val updated = catalog.alterTable(testIdent, TableChange.removeProperty("prop-1"))
+    catalog.alterTable(testIdent, TableChange.removeProperty("prop-1"))
+    val updated = catalog.loadTable(testIdent)
     assert(filterV2TableProperties(updated.properties) == Map())
 
     val loaded = catalog.loadTable(testIdent)
@@ -322,11 +335,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: remove missing property") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(filterV2TableProperties(table.properties) == Map())
 
-    val updated = catalog.alterTable(testIdent, TableChange.removeProperty("prop-1"))
+    catalog.alterTable(testIdent, TableChange.removeProperty("prop-1"))
+    val updated = catalog.loadTable(testIdent)
     assert(filterV2TableProperties(updated.properties) == Map())
 
     val loaded = catalog.loadTable(testIdent)
@@ -338,11 +353,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: add top-level column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
-    val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType))
+    catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType))
+    val updated = catalog.loadTable(testIdent)
 
     assert(updated.schema == schema.add("ts", TimestampType))
   }
@@ -350,12 +367,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: add required column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.addColumn(Array("ts"), TimestampType, false))
+    val updated = catalog.loadTable(testIdent)
 
     assert(updated.schema == schema.add("ts", TimestampType, nullable = false))
   }
@@ -363,12 +382,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: add column with comment") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.addColumn(Array("ts"), TimestampType, false, "comment text"))
+    val updated = catalog.loadTable(testIdent)
 
     val field = StructField("ts", TimestampType, nullable = false).withComment("comment text")
     assert(updated.schema == schema.add(field))
@@ -380,12 +401,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == tableSchema)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.addColumn(Array("point", "z"), DoubleType))
+    val updated = catalog.loadTable(testIdent)
 
     val expectedSchema = schema.add("point", pointStruct.add("z", DoubleType))
 
@@ -395,7 +418,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: add column to primitive field fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
@@ -413,7 +437,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: add field to missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
@@ -429,11 +454,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: update column data type") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
-    val updated = catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType))
+    catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType))
+    val updated = catalog.loadTable(testIdent)
 
     val expectedSchema = new StructType().add("id", LongType).add("data", StringType)
     assert(updated.schema == expectedSchema)
@@ -445,12 +472,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
     val originalSchema = new StructType()
         .add("id", IntegerType, nullable = false)
         .add("data", StringType)
-    val table = catalog.createTable(testIdent, originalSchema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, originalSchema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == originalSchema)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.updateColumnNullability(Array("id"), true))
+    val updated = catalog.loadTable(testIdent)
 
     val expectedSchema = new StructType().add("id", IntegerType).add("data", StringType)
     assert(updated.schema == expectedSchema)
@@ -459,7 +488,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: update missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
@@ -475,12 +505,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: add comment") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.updateColumnComment(Array("id"), "comment text"))
+    val updated = catalog.loadTable(testIdent)
 
     val expectedSchema = new StructType()
         .add("id", IntegerType, nullable = true, "comment text")
@@ -491,7 +523,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: replace comment") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
@@ -501,8 +534,9 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
         .add("id", IntegerType, nullable = true, "replacement comment")
         .add("data", StringType)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.updateColumnComment(Array("id"), "replacement comment"))
+    val updated = catalog.loadTable(testIdent)
 
     assert(updated.schema == expectedSchema)
   }
@@ -510,7 +544,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: add comment to missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
@@ -526,11 +561,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: rename top-level column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
-    val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id"))
+    catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id"))
+    val updated = catalog.loadTable(testIdent)
 
     val expectedSchema = new StructType().add("some_id", IntegerType).add("data", StringType)
 
@@ -543,12 +580,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == tableSchema)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.renameColumn(Array("point", "x"), "first"))
+    val updated = catalog.loadTable(testIdent)
 
     val newPointStruct = new StructType().add("first", DoubleType).add("y", DoubleType)
     val expectedSchema = schema.add("point", newPointStruct)
@@ -562,12 +601,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == tableSchema)
 
-    val updated = catalog.alterTable(testIdent,
-      TableChange.renameColumn(Array("point"), "p"))
+    catalog.alterTable(testIdent, TableChange.renameColumn(Array("point"), "p"))
+    val updated = catalog.loadTable(testIdent)
 
     val newPointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
     val expectedSchema = schema.add("p", newPointStruct)
@@ -578,7 +618,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: rename missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
@@ -597,13 +638,15 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == tableSchema)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.renameColumn(Array("point", "x"), "first"),
       TableChange.renameColumn(Array("point", "y"), "second"))
+    val updated = catalog.loadTable(testIdent)
 
     val newPointStruct = new StructType().add("first", DoubleType).add("second", DoubleType)
     val expectedSchema = schema.add("point", newPointStruct)
@@ -614,12 +657,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: delete top-level column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
-    val updated = catalog.alterTable(testIdent,
-      TableChange.deleteColumn(Array("id"), false))
+    catalog.alterTable(testIdent, TableChange.deleteColumn(Array("id"), false))
+    val updated = catalog.loadTable(testIdent)
 
     val expectedSchema = new StructType().add("data", StringType)
     assert(updated.schema == expectedSchema)
@@ -631,12 +675,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == tableSchema)
 
-    val updated = catalog.alterTable(testIdent,
-      TableChange.deleteColumn(Array("point", "y"), false))
+    catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", "y"), false))
+    val updated = catalog.loadTable(testIdent)
 
     val newPointStruct = new StructType().add("x", DoubleType)
     val expectedSchema = schema.add("point", newPointStruct)
@@ -647,7 +692,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
   test("alterTable: delete missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
@@ -669,7 +715,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == tableSchema)
 
@@ -700,23 +747,27 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
     assert(!catalog.tableExists(testIdent))
 
     // default location
-    val t1 = catalog.createTable(testIdent, schema, emptyTrans, emptyProps).asInstanceOf[V1Table]
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val t1 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t1.catalogTable.location ===
       spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))
 
     // relative path
-    val t2 = catalog.alterTable(testIdent,
-      TableChange.setProperty(TableCatalog.PROP_LOCATION, "relative/path")).asInstanceOf[V1Table]
+    catalog.alterTable(testIdent,
+      TableChange.setProperty(TableCatalog.PROP_LOCATION, "relative/path"))
+    val t2 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path"))
 
     // absolute path without scheme
-    val t3 = catalog.alterTable(testIdent,
-      TableChange.setProperty(TableCatalog.PROP_LOCATION, "/absolute/path")).asInstanceOf[V1Table]
+    catalog.alterTable(testIdent,
+      TableChange.setProperty(TableCatalog.PROP_LOCATION, "/absolute/path"))
+    val t3 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t3.catalogTable.location.toString === "file:///absolute/path")
 
     // absolute path with scheme
-    val t4 = catalog.alterTable(testIdent, TableChange.setProperty(
-      TableCatalog.PROP_LOCATION, "file:/absolute/path")).asInstanceOf[V1Table]
+    catalog.alterTable(testIdent, TableChange.setProperty(
+      TableCatalog.PROP_LOCATION, "file:/absolute/path"))
+    val t4 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t4.catalogTable.location.toString === "file:/absolute/path")
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org