You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/08 12:06:21 UTC
[spark] branch branch-3.4 updated: [SPARK-42684][SQL] v2 catalog should not allow column default value by default
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new ee3daecfe7c [SPARK-42684][SQL] v2 catalog should not allow column default value by default
ee3daecfe7c is described below
commit ee3daecfe7c4a2115ffc94f2b85cd9800ea74196
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Mar 8 20:05:32 2023 +0800
[SPARK-42684][SQL] v2 catalog should not allow column default value by default
### What changes were proposed in this pull request?
Following generated columns, column default value should also have a catalog capability and v2 catalogs must explicitly declare SUPPORT_COLUMN_DEFAULT_VALUE to support it.
### Why are the changes needed?
column default value needs dedicated handling and if a catalog simply ignores it, then query result can be wrong.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
new tests
Closes #40299 from cloud-fan/default.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 69dd20b5e45c7e3533efbfdc1974f59931c1b781)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../catalog/DelegatingCatalogExtension.java | 6 ++
.../connector/catalog/TableCatalogCapability.java | 39 ++++++++----
.../spark/sql/catalyst/util/GeneratedColumn.scala | 7 ++-
.../catalyst/util/ResolveDefaultColumnsUtil.scala | 69 ++++++++++++++--------
.../sql/connector/catalog/CatalogV2Util.scala | 3 +-
.../spark/sql/errors/QueryCompilationErrors.scala | 31 ++++++----
.../sql/catalyst/catalog/SessionCatalogSuite.scala | 2 +-
.../connector/catalog/InMemoryTableCatalog.scala | 5 +-
.../catalyst/analysis/ResolveSessionCatalog.scala | 37 ++++--------
.../spark/sql/execution/command/tables.scala | 5 +-
.../execution/datasources/DataSourceStrategy.scala | 9 +--
.../datasources/v2/DataSourceV2Strategy.scala | 20 ++++---
.../datasources/v2/V2SessionCatalog.scala | 8 ++-
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 34 ++++++++++-
14 files changed, 178 insertions(+), 97 deletions(-)
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
index 8bbfe535295..534e1b86eca 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
@@ -18,6 +18,7 @@
package org.apache.spark.sql.connector.catalog;
import java.util.Map;
+import java.util.Set;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.analysis.*;
@@ -52,6 +53,11 @@ public abstract class DelegatingCatalogExtension implements CatalogExtension {
@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {}
+ @Override
+ public Set<TableCatalogCapability> capabilities() {
+ return asTableCatalog().capabilities();
+ }
+
@Override
public String[] defaultNamespace() {
return delegate.defaultNamespace();
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
index 84a2a0f7648..5ccb15ff1f0 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
@@ -33,16 +33,31 @@ import org.apache.spark.annotation.Evolving;
public enum TableCatalogCapability {
/**
- * Signals that the TableCatalog supports defining generated columns upon table creation in SQL.
- * <p>
- * Without this capability, any create/replace table statements with a generated column defined
- * in the table schema will throw an exception during analysis.
- * <p>
- * A generated column is defined with syntax: {@code colName colType GENERATED ALWAYS AS (expr)}
- * <p>
- * Generation expression are included in the column definition for APIs like
- * {@link TableCatalog#createTable}.
- * See {@link Column#generationExpression()}.
- */
- SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS
+ * Signals that the TableCatalog supports defining generated columns upon table creation in SQL.
+ * <p>
+ * Without this capability, any create/replace table statements with a generated column defined
+ * in the table schema will throw an exception during analysis.
+ * <p>
+ * A generated column is defined with syntax: {@code colName colType GENERATED ALWAYS AS (expr)}
+ * <p>
+ * Generation expression are included in the column definition for APIs like
+ * {@link TableCatalog#createTable}.
+ * See {@link Column#generationExpression()}.
+ */
+ SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS,
+
+ /**
+ * Signals that the TableCatalog supports defining column default value as expression in
+ * CREATE/REPLACE/ALTER TABLE.
+ * <p>
+ * Without this capability, any CREATE/REPLACE/ALTER TABLE statement with a column default value
+ * defined in the table schema will throw an exception during analysis.
+ * <p>
+ * A column default value is defined with syntax: {@code colName colType DEFAULT expr}
+ * <p>
+ * Column default value expression is included in the column definition for APIs like
+ * {@link TableCatalog#createTable}.
+ * See {@link Column#defaultValue()}.
+ */
+ SUPPORT_COLUMN_DEFAULT_VALUE
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
index 6ff5df98d3c..9a1ce5b0295 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
-import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog, TableCatalogCapability}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, TableCatalog, TableCatalogCapability}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructField, StructType}
@@ -182,12 +182,13 @@ object GeneratedColumn {
def validateGeneratedColumns(
schema: StructType,
catalog: TableCatalog,
- ident: Seq[String],
+ ident: Identifier,
statementType: String): Unit = {
if (hasGeneratedColumns(schema)) {
if (!catalog.capabilities().contains(
TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS)) {
- throw QueryCompilationErrors.generatedColumnsUnsupported(ident)
+ throw QueryCompilationErrors.unsupportedTableOperationError(
+ catalog, ident, "generated columns")
}
GeneratedColumn.verifyGeneratedColumns(schema, statementType)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index be7d74b0782..d0287cc602b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
-import org.apache.spark.sql.connector.catalog.{CatalogManager, FunctionCatalog, Identifier}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, FunctionCatalog, Identifier, TableCatalog, TableCatalogCapability}
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
@@ -90,39 +90,15 @@ object ResolveDefaultColumns {
* EXISTS_DEFAULT metadata for such columns where the value is not present in storage.
*
* @param tableSchema represents the names and types of the columns of the statement to process.
- * @param tableProvider provider of the target table to store default values for, if any.
* @param statementType name of the statement being processed, such as INSERT; useful for errors.
- * @param addNewColumnToExistingTable true if the statement being processed adds a new column to
- * a table that already exists.
* @return a copy of `tableSchema` with field metadata updated with the constant-folded values.
*/
def constantFoldCurrentDefaultsToExistDefaults(
tableSchema: StructType,
- tableProvider: Option[String],
- statementType: String,
- addNewColumnToExistingTable: Boolean): StructType = {
+ statementType: String): StructType = {
if (SQLConf.get.enableDefaultColumns) {
- val keywords: Array[String] =
- SQLConf.get.getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS)
- .toLowerCase().split(",").map(_.trim)
- val allowedTableProviders: Array[String] =
- keywords.map(_.stripSuffix("*"))
- val addColumnExistingTableBannedProviders: Array[String] =
- keywords.filter(_.endsWith("*")).map(_.stripSuffix("*"))
- val givenTableProvider: String = tableProvider.getOrElse("").toLowerCase()
val newFields: Seq[StructField] = tableSchema.fields.map { field =>
if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
- // Make sure that the target table has a provider that supports default column values.
- if (!allowedTableProviders.contains(givenTableProvider)) {
- throw QueryCompilationErrors
- .defaultReferencesNotAllowedInDataSource(statementType, givenTableProvider)
- }
- if (addNewColumnToExistingTable &&
- givenTableProvider.nonEmpty &&
- addColumnExistingTableBannedProviders.contains(givenTableProvider)) {
- throw QueryCompilationErrors
- .addNewDefaultColumnToExistingTableNotAllowed(statementType, givenTableProvider)
- }
val analyzed: Expression = analyze(field, statementType)
val newMetadata: Metadata = new MetadataBuilder().withMetadata(field.metadata)
.putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, analyzed.sql).build()
@@ -137,6 +113,47 @@ object ResolveDefaultColumns {
}
}
+ // Fails if the given catalog does not support column default value.
+ def validateCatalogForDefaultValue(
+ schema: StructType,
+ catalog: TableCatalog,
+ ident: Identifier): Unit = {
+ if (SQLConf.get.enableDefaultColumns &&
+ schema.exists(_.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) &&
+ !catalog.capabilities().contains(TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE)) {
+ throw QueryCompilationErrors.unsupportedTableOperationError(
+ catalog, ident, "column default value")
+ }
+ }
+
+ // Fails if the given table provider of the session catalog does not support column default value.
+ def validateTableProviderForDefaultValue(
+ schema: StructType,
+ tableProvider: Option[String],
+ statementType: String,
+ addNewColumnToExistingTable: Boolean): Unit = {
+ if (SQLConf.get.enableDefaultColumns &&
+ schema.exists(_.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY))) {
+ val keywords: Array[String] = SQLConf.get.getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS)
+ .toLowerCase().split(",").map(_.trim)
+ val allowedTableProviders: Array[String] = keywords.map(_.stripSuffix("*"))
+ val addColumnExistingTableBannedProviders: Array[String] =
+ keywords.filter(_.endsWith("*")).map(_.stripSuffix("*"))
+ val givenTableProvider: String = tableProvider.getOrElse("").toLowerCase()
+ // Make sure that the target table has a provider that supports default column values.
+ if (!allowedTableProviders.contains(givenTableProvider)) {
+ throw QueryCompilationErrors.defaultReferencesNotAllowedInDataSource(
+ statementType, givenTableProvider)
+ }
+ if (addNewColumnToExistingTable &&
+ givenTableProvider.nonEmpty &&
+ addColumnExistingTableBannedProviders.contains(givenTableProvider)) {
+ throw QueryCompilationErrors.addNewDefaultColumnToExistingTableNotAllowed(
+ statementType, givenTableProvider)
+ }
+ }
+ }
+
/**
* Parses and analyzes the DEFAULT column text in `field`, returning an error upon failure.
*
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 12a8db92363..e5d9720bb02 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -248,8 +248,9 @@ private[sql] object CatalogV2Util {
val (before, after) = schema.fields.splitAt(fieldIndex + 1)
StructType(before ++ (field +: after))
}
- constantFoldCurrentDefaultsToExistDefaults(
+ validateTableProviderForDefaultValue(
newSchema, tableProvider, statementType, addNewColumnToExistingTable)
+ constantFoldCurrentDefaultsToExistDefaults(newSchema, statementType)
}
private def replace(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 9506c99af96..1376408fb21 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -749,13 +749,28 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
messageParameters = Map.empty)
}
- def operationOnlySupportedWithV2TableError(
- nameParts: Seq[String],
+ def unsupportedTableOperationError(
+ catalog: CatalogPlugin,
+ ident: Identifier,
+ operation: String): Throwable = {
+ unsupportedTableOperationError(
+ catalog.name +: ident.namespace :+ ident.name, operation)
+ }
+
+ def unsupportedTableOperationError(
+ ident: TableIdentifier,
+ operation: String): Throwable = {
+ unsupportedTableOperationError(
+ Seq(ident.catalog.get, ident.database.get, ident.table), operation)
+ }
+
+ private def unsupportedTableOperationError(
+ qualifiedTableName: Seq[String],
operation: String): Throwable = {
new AnalysisException(
errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
messageParameters = Map(
- "tableName" -> toSQLId(nameParts),
+ "tableName" -> toSQLId(qualifiedTableName),
"operation" -> operation))
}
@@ -3401,16 +3416,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
}
}
- def generatedColumnsUnsupported(nameParts: Seq[String]): AnalysisException = {
- new AnalysisException(
- errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
- messageParameters = Map(
- "tableName" -> toSQLId(nameParts),
- "operation" -> "generated columns"
- )
- )
- }
-
def ambiguousLateralColumnAliasError(name: String, numOfMatches: Int): Throwable = {
new AnalysisException(
errorClass = "AMBIGUOUS_LATERAL_COLUMN_ALIAS",
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index a7254865c1e..9959dbf6516 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -177,7 +177,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
// disabled.
withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
val result: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
- db1tbl3.schema, db1tbl3.provider, "CREATE TABLE", false)
+ db1tbl3.schema, "CREATE TABLE")
val columnEWithFeatureDisabled: StructField = findField("e", result)
// No constant-folding has taken place to the EXISTS_DEFAULT metadata.
assert(!columnEWithFeatureDisabled.metadata.contains("EXISTS_DEFAULT"))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index e82f203742b..8a744c1c198 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -172,7 +172,10 @@ class BasicInMemoryTableCatalog extends TableCatalog {
class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamespaces {
override def capabilities: java.util.Set[TableCatalogCapability] = {
- Set(TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS).asJava
+ Set(
+ TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE,
+ TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS
+ ).asJava
}
protected def allNamespaces: Seq[Seq[String]] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 7b2d5015840..b2b35b40492 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -53,9 +53,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case AddColumns(ResolvedV1TableIdentifier(ident), cols) =>
cols.foreach { c =>
if (c.name.length > 1) {
- throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
- Seq(ident.catalog.get, ident.database.get, ident.table),
- "ADD COLUMN with qualified column")
+ throw QueryCompilationErrors.unsupportedTableOperationError(
+ ident, "ADD COLUMN with qualified column")
}
if (!c.nullable) {
throw QueryCompilationErrors.addColumnWithV1TableCannotSpecifyNotNullError
@@ -64,24 +63,20 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
AlterTableAddColumnsCommand(ident, cols.map(convertToStructField))
case ReplaceColumns(ResolvedV1TableIdentifier(ident), _) =>
- throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
- Seq(ident.catalog.get, ident.database.get, ident.table),
- "REPLACE COLUMNS")
+ throw QueryCompilationErrors.unsupportedTableOperationError(ident, "REPLACE COLUMNS")
case a @ AlterColumn(ResolvedTable(catalog, ident, table: V1Table, _), _, _, _, _, _, _)
if isSessionCatalog(catalog) =>
if (a.column.name.length > 1) {
- throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
- Seq(catalog.name, ident.namespace()(0), ident.name),
- "ALTER COLUMN with qualified column")
+ throw QueryCompilationErrors.unsupportedTableOperationError(
+ catalog, ident, "ALTER COLUMN with qualified column")
}
if (a.nullable.isDefined) {
throw QueryCompilationErrors.alterColumnWithV1TableCannotSpecifyNotNullError
}
if (a.position.isDefined) {
- throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
- Seq(catalog.name, ident.namespace()(0), ident.name),
- "ALTER COLUMN ... FIRST | ALTER")
+ throw QueryCompilationErrors.unsupportedTableOperationError(
+ catalog, ident, "ALTER COLUMN ... FIRST | ALTER")
}
val builder = new MetadataBuilder
// Add comment to metadata
@@ -105,14 +100,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn)
case RenameColumn(ResolvedV1TableIdentifier(ident), _, _) =>
- throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
- Seq(ident.catalog.get, ident.database.get, ident.table),
- "RENAME COLUMN")
+ throw QueryCompilationErrors.unsupportedTableOperationError(ident, "RENAME COLUMN")
case DropColumns(ResolvedV1TableIdentifier(ident), _, _) =>
- throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
- Seq(ident.catalog.get, ident.database.get, ident.table),
- "DROP COLUMN")
+ throw QueryCompilationErrors.unsupportedTableOperationError(ident, "DROP COLUMN")
case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
AlterTableSetPropertiesCommand(ident, props, isView = false)
@@ -204,9 +195,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
- throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
- Seq(ident.catalog.get, ident.database.get, ident.table),
- "REPLACE TABLE")
+ throw QueryCompilationErrors.unsupportedTableOperationError(
+ ident, "REPLACE TABLE")
} else {
c
}
@@ -214,9 +204,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
- throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
- Seq(ident.catalog.get, ident.database.get, ident.table),
- "REPLACE TABLE AS SELECT")
+ throw QueryCompilationErrors.unsupportedTableOperationError(
+ ident, "REPLACE TABLE AS SELECT")
} else {
c
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index f6266bcb33f..351f6d5456d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -289,8 +289,11 @@ case class AlterTableAddColumnsCommand(
sparkSession: SparkSession, tableProvider: Option[String]): Seq[StructField] = {
colsToAdd.map { col: StructField =>
if (col.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
+ val schema = StructType(Array(col))
+ ResolveDefaultColumns.validateTableProviderForDefaultValue(
+ schema, tableProvider, "ALTER TABLE ADD COLUMNS", true)
val foldedStructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
- StructType(Array(col)), tableProvider, "ALTER TABLE ADD COLUMNS", true)
+ schema, "ALTER TABLE ADD COLUMNS")
foldedStructType.fields(0)
} else {
col
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 74539b54117..e3a1f6f6b68 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -133,14 +133,15 @@ case class DataSourceAnalysis(analyzer: Analyzer) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) =>
+ ResolveDefaultColumns.validateTableProviderForDefaultValue(
+ tableDesc.schema, tableDesc.provider, "CREATE TABLE", false)
val newSchema: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
- tableDesc.schema, tableDesc.provider, "CREATE TABLE", false)
+ tableDesc.schema, "CREATE TABLE")
if (GeneratedColumn.hasGeneratedColumns(newSchema)) {
- throw QueryCompilationErrors.generatedColumnsUnsupported(
- Seq(tableDesc.identifier.catalog.get, tableDesc.identifier.database.get,
- tableDesc.identifier.table))
+ throw QueryCompilationErrors.unsupportedTableOperationError(
+ tableDesc.identifier, "generated columns")
}
val newTableDesc = tableDesc.copy(schema = newSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 4d84c42bc5b..55dc8b54e00 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable
import org.apache.commons.lang3.StringUtils
+import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
@@ -175,11 +176,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case CreateTable(ResolvedIdentifier(catalog, ident), schema, partitioning,
tableSpec, ifNotExists) =>
+ ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident)
val newSchema: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
- schema, tableSpec.provider, "CREATE TABLE", false)
+ schema, "CREATE TABLE")
GeneratedColumn.validateGeneratedColumns(
- newSchema, catalog.asTableCatalog, ident.asMultipartIdentifier, "CREATE TABLE")
+ newSchema, catalog.asTableCatalog, ident, "CREATE TABLE")
CreateTableExec(catalog.asTableCatalog, ident, structTypeToV2Columns(newSchema),
partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
@@ -201,11 +203,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil
case ReplaceTable(ResolvedIdentifier(catalog, ident), schema, parts, tableSpec, orCreate) =>
+ ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident)
val newSchema: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
- schema, tableSpec.provider, "CREATE TABLE", false)
+ schema, "CREATE TABLE")
GeneratedColumn.validateGeneratedColumns(
- newSchema, catalog.asTableCatalog, ident.asMultipartIdentifier, "CREATE TABLE")
+ newSchema, catalog.asTableCatalog, ident, "CREATE TABLE")
val v2Columns = structTypeToV2Columns(newSchema)
catalog match {
@@ -308,12 +311,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
case LogicalRelation(_, _, catalogTable, _) =>
val tableIdentifier = catalogTable.get.identifier
- throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
- Seq(tableIdentifier.catalog.get, tableIdentifier.database.get, tableIdentifier.table),
+ throw QueryCompilationErrors.unsupportedTableOperationError(
+ tableIdentifier,
"DELETE")
- case _ =>
- throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
- Seq(), "DELETE")
+ case other =>
+ throw SparkException.internalError("Unexpected table relation: " + other)
}
case ReplaceData(_: DataSourceV2Relation, _, query, r: DataSourceV2Relation, Some(write)) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index 461e948b029..b4789c98df9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Transform
@@ -51,6 +51,12 @@ class V2SessionCatalog(catalog: SessionCatalog)
// This class is instantiated by Spark, so `initialize` method will not be called.
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}
+ override def capabilities(): util.Set[TableCatalogCapability] = {
+ Set(
+ TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE
+ ).asJava
+ }
+
override def listTables(namespace: Array[String]): Array[Identifier] = {
namespace match {
case Array(db) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 5b7acda6fc3..d95a372d43a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1422,6 +1422,38 @@ class DataSourceV2SQLSuiteV1Filter
}
}
+ test("SPARK-42684: Column default value only allowed with TableCatalogs that " +
+ "SUPPORT_COLUMN_DEFAULT_VALUE") {
+ val tblName = "my_tab"
+ val tableDefinition =
+ s"$tblName(c1 INT, c2 INT DEFAULT 0)"
+ for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) {
+ // InMemoryTableCatalog.capabilities() contains SUPPORT_COLUMN_DEFAULT_VALUE
+ withTable(s"testcat.$tblName") {
+ if (statement == "REPLACE TABLE") {
+ sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo")
+ }
+ // Can create table with a generated column
+ sql(s"$statement testcat.$tableDefinition")
+ assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName)))
+ }
+ // BasicInMemoryTableCatalog.capabilities() = {}
+ withSQLConf("spark.sql.catalog.dummy" -> classOf[BasicInMemoryTableCatalog].getName) {
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql("USE dummy")
+ sql(s"$statement dummy.$tableDefinition")
+ },
+ errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+ parameters = Map(
+ "tableName" -> "`dummy`.`my_tab`",
+ "operation" -> "column default value"
+ )
+ )
+ }
+ }
+ }
+
test("SPARK-41290: Generated columns only allowed with TableCatalogs that " +
"SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS") {
val tblName = "my_tab"
@@ -1446,7 +1478,7 @@ class DataSourceV2SQLSuiteV1Filter
},
errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
parameters = Map(
- "tableName" -> "`my_tab`",
+ "tableName" -> "`dummy`.`my_tab`",
"operation" -> "generated columns"
)
)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org