You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/12/26 07:17:45 UTC
(spark) branch master updated: [SPARK-46444][SQL] V2SessionCatalog#createTable should not load the table
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d8fb91e61352 [SPARK-46444][SQL] V2SessionCatalog#createTable should not load the table
d8fb91e61352 is described below
commit d8fb91e61352e57e733e7d7e4978c8ce555454b1
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Dec 26 15:17:30 2023 +0800
[SPARK-46444][SQL] V2SessionCatalog#createTable should not load the table
### What changes were proposed in this pull request?
It's a perf regression in CREATE TABLE if we switch to the v2 command framework, as `V2SessionCatalog#createTable` does an extra table lookup, which does not happen in v1. This PR fixes it by allowing `TableCatalog#createTable` to return null, and Spark will call `loadTable` to get the new table metadata in the case of CTAS. This PR also fixed `alterTable` in the same way.
### Why are the changes needed?
fix perf regression in v2. The perf of a single command may not matter, but in a cluster with many Spark applications, it's important to reduce the RPCs to the metastore.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44377 from cloud-fan/create-table.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../src/main/resources/error/error-classes.json | 4 +-
docs/sql-error-conditions.md | 4 +-
.../spark/sql/connector/catalog/TableCatalog.java | 8 +-
.../spark/sql/errors/QueryCompilationErrors.scala | 6 +-
.../datasources/v2/V2SessionCatalog.scala | 37 ++---
.../datasources/v2/WriteToDataSourceV2Exec.scala | 8 +-
.../spark/sql/connector/DataSourceV2Suite.scala | 12 +-
.../datasources/v2/V2SessionCatalogSuite.scala | 181 +++++++++++++--------
8 files changed, 157 insertions(+), 103 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json
index 8970045d4ab3..700b1ed07513 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -895,7 +895,9 @@
},
"DATA_SOURCE_TABLE_SCHEMA_MISMATCH" : {
"message" : [
- "The schema of the data source table <tableSchema> does not match the actual schema <actualSchema>. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema."
+ "The schema of the data source table does not match the expected schema. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema.",
+ "Data Source schema: <dsSchema>",
+ "Expected schema: <expectedSchema>"
],
"sqlState" : "42K03"
},
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 0722cae5815e..a8d2b6c894bc 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -496,7 +496,9 @@ Failed to find the data source: `<provider>`. Please find packages at `https://s
[SQLSTATE: 42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
-The schema of the data source table `<tableSchema>` does not match the actual schema `<actualSchema>`. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema.
+The schema of the data source table does not match the expected schema. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema.
+Data Source schema: `<dsSchema>`
+Expected schema: `<expectedSchema>`
### DATETIME_OVERFLOW
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index 6642adc33548..74700789dde0 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -187,7 +187,9 @@ public interface TableCatalog extends CatalogPlugin {
* @param columns the columns of the new table.
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
- * @return metadata for the new table
+ * @return metadata for the new table. This can be null if getting the metadata for the new table
+ * is expensive. Spark will call {@link #loadTable(Identifier)} if needed (e.g. CTAS).
+ *
* @throws TableAlreadyExistsException If a table or view already exists for the identifier
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
@@ -221,7 +223,9 @@ public interface TableCatalog extends CatalogPlugin {
*
* @param ident a table identifier
* @param changes changes to apply to the table
- * @return updated metadata for the table
+ * @return updated metadata for the table. This can be null if getting the metadata for the
+ * updated table is expensive. Spark always discard the returned table here.
+ *
* @throws NoSuchTableException If the table doesn't exist or is a view
* @throws IllegalArgumentException If any change is rejected by the implementation.
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index bb2b7e7ae066..ee41cbe2f50e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3926,11 +3926,11 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
}
def dataSourceTableSchemaMismatchError(
- tableSchema: StructType, actualSchema: StructType): Throwable = {
+ dsSchema: StructType, expectedSchema: StructType): Throwable = {
new AnalysisException(
errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
messageParameters = Map(
- "tableSchema" -> toSQLType(tableSchema),
- "actualSchema" -> toSQLType(actualSchema)))
+ "dsSchema" -> toSQLType(dsSchema),
+ "expectedSchema" -> toSQLType(expectedSchema)))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index a7694f5d829d..e7445e970fa5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -198,10 +198,21 @@ class V2SessionCatalog(catalog: SessionCatalog)
s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}")
// Infer the schema and partitions and store them in the catalog.
(tableProvider.inferSchema(dsOptions), tableProvider.inferPartitioning(dsOptions))
- } else if (partitions.isEmpty) {
- (schema, tableProvider.inferPartitioning(dsOptions))
} else {
- (schema, partitions)
+ val partitioning = if (partitions.isEmpty) {
+ tableProvider.inferPartitioning(dsOptions)
+ } else {
+ partitions
+ }
+ val table = tableProvider.getTable(schema, partitions, dsOptions)
+ // Check if the schema of the created table matches the given schema.
+ val tableSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
+ table.columns().asSchema)
+ if (!DataType.equalsIgnoreNullability(tableSchema, schema)) {
+ throw QueryCompilationErrors.dataSourceTableSchemaMismatchError(
+ tableSchema, schema)
+ }
+ (schema, partitioning)
}
case _ =>
@@ -233,21 +244,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}
- val table = loadTable(ident)
-
- // Check if the schema of the created table matches the given schema.
- // TODO: move this check in loadTable to match the behavior with
- // existing file data sources.
- if (schema.nonEmpty) {
- val tableSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
- table.columns().asSchema)
- if (!DataType.equalsIgnoreNullability(tableSchema, schema)) {
- throw QueryCompilationErrors.dataSourceTableSchemaMismatchError(
- table.columns().asSchema, schema)
- }
- }
-
- table
+ null // Return null to save the `loadTable` call for CREATE TABLE without AS SELECT.
}
private def toOptions(properties: Map[String, String]): Map[String, String] = {
@@ -288,7 +285,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
throw QueryCompilationErrors.noSuchTableError(ident)
}
- loadTable(ident)
+ null // Return null to save the `loadTable` call for ALTER TABLE.
}
override def purgeTable(ident: Identifier): Boolean = {
@@ -332,8 +329,6 @@ class V2SessionCatalog(catalog: SessionCatalog)
throw QueryCompilationErrors.tableAlreadyExistsError(newIdent)
}
- // Load table to make sure the table exists
- loadTable(oldIdent)
catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 97c1f7ced508..c65c15fb0ef2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -82,9 +82,9 @@ case class CreateTableAsSelectExec(
}
throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}
- val table = catalog.createTable(
+ val table = Option(catalog.createTable(
ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
- partitioning.toArray, properties.asJava)
+ partitioning.toArray, properties.asJava)).getOrElse(catalog.loadTable(ident))
writeToTable(catalog, table, writeOptions, ident, query)
}
}
@@ -162,9 +162,9 @@ case class ReplaceTableAsSelectExec(
} else if (!orCreate) {
throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
}
- val table = catalog.createTable(
+ val table = Option(catalog.createTable(
ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
- partitioning.toArray, properties.asJava)
+ partitioning.toArray, properties.asJava)).getOrElse(catalog.loadTable(ident))
writeToTable(catalog, table, writeOptions, ident, query)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index 3f3dc82da5ad..ea263b36c76c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -734,8 +734,8 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
},
errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
parameters = Map(
- "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
- "actualSchema" -> "\"STRUCT<x: INT, y: INT>\""))
+ "dsSchema" -> "\"STRUCT<i: INT, j: INT>\"",
+ "expectedSchema" -> "\"STRUCT<x: INT, y: INT>\""))
}
}
@@ -772,8 +772,8 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
},
errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
parameters = Map(
- "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
- "actualSchema" -> "\"STRUCT<col1: INT, col2: INT>\""))
+ "dsSchema" -> "\"STRUCT<i: INT, j: INT>\"",
+ "expectedSchema" -> "\"STRUCT<col1: INT, col2: INT>\""))
}
}
@@ -790,8 +790,8 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
},
errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
parameters = Map(
- "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
- "actualSchema" -> "\"STRUCT<i: STRING, j: STRING>\""))
+ "dsSchema" -> "\"STRUCT<i: INT, j: INT>\"",
+ "expectedSchema" -> "\"STRUCT<i: STRING, j: STRING>\""))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index f9da55ed6ba3..e5473222d429 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -125,7 +125,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
assert(parsed == Seq("db", "test_table"))
@@ -143,7 +144,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
- val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
+ catalog.createTable(testIdent, schema, emptyTrans, properties)
+ val table = catalog.loadTable(testIdent)
val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
assert(parsed == Seq("db", "test_table"))
@@ -158,7 +160,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
.map(part => quoteIdentifier(part)).mkString(".")
@@ -185,26 +188,30 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
// default location
- val t1 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table]
+ catalog.createTable(testIdent, schema, emptyTrans, properties)
+ val t1 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
assert(t1.catalogTable.location ===
spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))
catalog.dropTable(testIdent)
// relative path
properties.put(TableCatalog.PROP_LOCATION, "relative/path")
- val t2 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table]
+ catalog.createTable(testIdent, schema, emptyTrans, properties)
+ val t2 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path"))
catalog.dropTable(testIdent)
// absolute path without scheme
properties.put(TableCatalog.PROP_LOCATION, "/absolute/path")
- val t3 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table]
+ catalog.createTable(testIdent, schema, emptyTrans, properties)
+ val t3 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
assert(t3.catalogTable.location.toString === "file:///absolute/path")
catalog.dropTable(testIdent)
// absolute path with scheme
properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path")
- val t4 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table]
+ catalog.createTable(testIdent, schema, emptyTrans, properties)
+ val t4 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
assert(t4.catalogTable.location.toString === "file:/absolute/path")
catalog.dropTable(testIdent)
}
@@ -226,12 +233,11 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("loadTable") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
val loaded = catalog.loadTable(testIdent)
- assert(table.name == loaded.name)
- assert(table.schema == loaded.schema)
- assert(table.properties == loaded.properties)
+ assert(loaded.name == testIdent.toString)
+ assert(loaded.schema == schema)
}
test("loadTable: table does not exist") {
@@ -247,7 +253,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("invalidateTable") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
catalog.invalidateTable(testIdent)
val loaded = catalog.loadTable(testIdent)
@@ -268,11 +275,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: add property") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(filterV2TableProperties(table.properties) == Map())
- val updated = catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1"))
+ catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1"))
+ val updated = catalog.loadTable(testIdent)
assert(filterV2TableProperties(updated.properties) == Map("prop-1" -> "1"))
val loaded = catalog.loadTable(testIdent)
@@ -287,11 +296,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
val properties = new util.HashMap[String, String]()
properties.put("prop-1", "1")
- val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
+ catalog.createTable(testIdent, schema, emptyTrans, properties)
+ val table = catalog.loadTable(testIdent)
assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1"))
- val updated = catalog.alterTable(testIdent, TableChange.setProperty("prop-2", "2"))
+ catalog.alterTable(testIdent, TableChange.setProperty("prop-2", "2"))
+ val updated = catalog.loadTable(testIdent)
assert(filterV2TableProperties(updated.properties) == Map("prop-1" -> "1", "prop-2" -> "2"))
val loaded = catalog.loadTable(testIdent)
@@ -306,11 +317,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
val properties = new util.HashMap[String, String]()
properties.put("prop-1", "1")
- val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
+ catalog.createTable(testIdent, schema, emptyTrans, properties)
+ val table = catalog.loadTable(testIdent)
assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1"))
- val updated = catalog.alterTable(testIdent, TableChange.removeProperty("prop-1"))
+ catalog.alterTable(testIdent, TableChange.removeProperty("prop-1"))
+ val updated = catalog.loadTable(testIdent)
assert(filterV2TableProperties(updated.properties) == Map())
val loaded = catalog.loadTable(testIdent)
@@ -322,11 +335,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: remove missing property") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(filterV2TableProperties(table.properties) == Map())
- val updated = catalog.alterTable(testIdent, TableChange.removeProperty("prop-1"))
+ catalog.alterTable(testIdent, TableChange.removeProperty("prop-1"))
+ val updated = catalog.loadTable(testIdent)
assert(filterV2TableProperties(updated.properties) == Map())
val loaded = catalog.loadTable(testIdent)
@@ -338,11 +353,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: add top-level column") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == schema)
- val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType))
+ catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType))
+ val updated = catalog.loadTable(testIdent)
assert(updated.schema == schema.add("ts", TimestampType))
}
@@ -350,12 +367,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: add required column") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == schema)
- val updated = catalog.alterTable(testIdent,
+ catalog.alterTable(testIdent,
TableChange.addColumn(Array("ts"), TimestampType, false))
+ val updated = catalog.loadTable(testIdent)
assert(updated.schema == schema.add("ts", TimestampType, nullable = false))
}
@@ -363,12 +382,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: add column with comment") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == schema)
- val updated = catalog.alterTable(testIdent,
+ catalog.alterTable(testIdent,
TableChange.addColumn(Array("ts"), TimestampType, false, "comment text"))
+ val updated = catalog.loadTable(testIdent)
val field = StructField("ts", TimestampType, nullable = false).withComment("comment text")
assert(updated.schema == schema.add(field))
@@ -380,12 +401,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == tableSchema)
- val updated = catalog.alterTable(testIdent,
+ catalog.alterTable(testIdent,
TableChange.addColumn(Array("point", "z"), DoubleType))
+ val updated = catalog.loadTable(testIdent)
val expectedSchema = schema.add("point", pointStruct.add("z", DoubleType))
@@ -395,7 +418,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: add column to primitive field fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == schema)
@@ -413,7 +437,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: add field to missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == schema)
@@ -429,11 +454,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: update column data type") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == schema)
- val updated = catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType))
+ catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType))
+ val updated = catalog.loadTable(testIdent)
val expectedSchema = new StructType().add("id", LongType).add("data", StringType)
assert(updated.schema == expectedSchema)
@@ -445,12 +472,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
val originalSchema = new StructType()
.add("id", IntegerType, nullable = false)
.add("data", StringType)
- val table = catalog.createTable(testIdent, originalSchema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, originalSchema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == originalSchema)
- val updated = catalog.alterTable(testIdent,
+ catalog.alterTable(testIdent,
TableChange.updateColumnNullability(Array("id"), true))
+ val updated = catalog.loadTable(testIdent)
val expectedSchema = new StructType().add("id", IntegerType).add("data", StringType)
assert(updated.schema == expectedSchema)
@@ -459,7 +488,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: update missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == schema)
@@ -475,12 +505,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: add comment") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == schema)
- val updated = catalog.alterTable(testIdent,
+ catalog.alterTable(testIdent,
TableChange.updateColumnComment(Array("id"), "comment text"))
+ val updated = catalog.loadTable(testIdent)
val expectedSchema = new StructType()
.add("id", IntegerType, nullable = true, "comment text")
@@ -491,7 +523,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: replace comment") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == schema)
@@ -501,8 +534,9 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
.add("id", IntegerType, nullable = true, "replacement comment")
.add("data", StringType)
- val updated = catalog.alterTable(testIdent,
+ catalog.alterTable(testIdent,
TableChange.updateColumnComment(Array("id"), "replacement comment"))
+ val updated = catalog.loadTable(testIdent)
assert(updated.schema == expectedSchema)
}
@@ -510,7 +544,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: add comment to missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == schema)
@@ -526,11 +561,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: rename top-level column") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == schema)
- val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id"))
+ catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id"))
+ val updated = catalog.loadTable(testIdent)
val expectedSchema = new StructType().add("some_id", IntegerType).add("data", StringType)
@@ -543,12 +580,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == tableSchema)
- val updated = catalog.alterTable(testIdent,
+ catalog.alterTable(testIdent,
TableChange.renameColumn(Array("point", "x"), "first"))
+ val updated = catalog.loadTable(testIdent)
val newPointStruct = new StructType().add("first", DoubleType).add("y", DoubleType)
val expectedSchema = schema.add("point", newPointStruct)
@@ -562,12 +601,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == tableSchema)
- val updated = catalog.alterTable(testIdent,
- TableChange.renameColumn(Array("point"), "p"))
+ catalog.alterTable(testIdent, TableChange.renameColumn(Array("point"), "p"))
+ val updated = catalog.loadTable(testIdent)
val newPointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
val expectedSchema = schema.add("p", newPointStruct)
@@ -578,7 +618,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: rename missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == schema)
@@ -597,13 +638,15 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == tableSchema)
- val updated = catalog.alterTable(testIdent,
+ catalog.alterTable(testIdent,
TableChange.renameColumn(Array("point", "x"), "first"),
TableChange.renameColumn(Array("point", "y"), "second"))
+ val updated = catalog.loadTable(testIdent)
val newPointStruct = new StructType().add("first", DoubleType).add("second", DoubleType)
val expectedSchema = schema.add("point", newPointStruct)
@@ -614,12 +657,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: delete top-level column") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == schema)
- val updated = catalog.alterTable(testIdent,
- TableChange.deleteColumn(Array("id"), false))
+ catalog.alterTable(testIdent, TableChange.deleteColumn(Array("id"), false))
+ val updated = catalog.loadTable(testIdent)
val expectedSchema = new StructType().add("data", StringType)
assert(updated.schema == expectedSchema)
@@ -631,12 +675,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == tableSchema)
- val updated = catalog.alterTable(testIdent,
- TableChange.deleteColumn(Array("point", "y"), false))
+ catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", "y"), false))
+ val updated = catalog.loadTable(testIdent)
val newPointStruct = new StructType().add("x", DoubleType)
val expectedSchema = schema.add("point", newPointStruct)
@@ -647,7 +692,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
test("alterTable: delete missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == schema)
@@ -669,7 +715,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+ catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+ val table = catalog.loadTable(testIdent)
assert(table.schema == tableSchema)
@@ -700,23 +747,27 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
// default location
- val t1 = catalog.createTable(testIdent, schema, emptyTrans, emptyProps).asInstanceOf[V1Table]
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ val t1 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
assert(t1.catalogTable.location ===
spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))
// relative path
- val t2 = catalog.alterTable(testIdent,
- TableChange.setProperty(TableCatalog.PROP_LOCATION, "relative/path")).asInstanceOf[V1Table]
+ catalog.alterTable(testIdent,
+ TableChange.setProperty(TableCatalog.PROP_LOCATION, "relative/path"))
+ val t2 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path"))
// absolute path without scheme
- val t3 = catalog.alterTable(testIdent,
- TableChange.setProperty(TableCatalog.PROP_LOCATION, "/absolute/path")).asInstanceOf[V1Table]
+ catalog.alterTable(testIdent,
+ TableChange.setProperty(TableCatalog.PROP_LOCATION, "/absolute/path"))
+ val t3 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
assert(t3.catalogTable.location.toString === "file:///absolute/path")
// absolute path with scheme
- val t4 = catalog.alterTable(testIdent, TableChange.setProperty(
- TableCatalog.PROP_LOCATION, "file:/absolute/path")).asInstanceOf[V1Table]
+ catalog.alterTable(testIdent, TableChange.setProperty(
+ TableCatalog.PROP_LOCATION, "file:/absolute/path"))
+ val t4 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
assert(t4.catalogTable.location.toString === "file:/absolute/path")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org