You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/08 12:06:21 UTC

[spark] branch branch-3.4 updated: [SPARK-42684][SQL] v2 catalog should not allow column default value by default

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new ee3daecfe7c [SPARK-42684][SQL] v2 catalog should not allow column default value by default
ee3daecfe7c is described below

commit ee3daecfe7c4a2115ffc94f2b85cd9800ea74196
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Mar 8 20:05:32 2023 +0800

    [SPARK-42684][SQL] v2 catalog should not allow column default value by default
    
    ### What changes were proposed in this pull request?
    
    Following generated columns, column default value should also have a catalog capability and v2 catalogs must explicitly declare SUPPORT_COLUMN_DEFAULT_VALUE to support it.
    
    ### Why are the changes needed?
    
    column default value needs dedicated handling and if a catalog simply ignores it, then query result can be wrong.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    new tests
    
    Closes #40299 from cloud-fan/default.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 69dd20b5e45c7e3533efbfdc1974f59931c1b781)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../catalog/DelegatingCatalogExtension.java        |  6 ++
 .../connector/catalog/TableCatalogCapability.java  | 39 ++++++++----
 .../spark/sql/catalyst/util/GeneratedColumn.scala  |  7 ++-
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  | 69 ++++++++++++++--------
 .../sql/connector/catalog/CatalogV2Util.scala      |  3 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  | 31 ++++++----
 .../sql/catalyst/catalog/SessionCatalogSuite.scala |  2 +-
 .../connector/catalog/InMemoryTableCatalog.scala   |  5 +-
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 37 ++++--------
 .../spark/sql/execution/command/tables.scala       |  5 +-
 .../execution/datasources/DataSourceStrategy.scala |  9 +--
 .../datasources/v2/DataSourceV2Strategy.scala      | 20 ++++---
 .../datasources/v2/V2SessionCatalog.scala          |  8 ++-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 34 ++++++++++-
 14 files changed, 178 insertions(+), 97 deletions(-)

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
index 8bbfe535295..534e1b86eca 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.connector.catalog;
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.catalyst.analysis.*;
@@ -52,6 +53,11 @@ public abstract class DelegatingCatalogExtension implements CatalogExtension {
   @Override
   public final void initialize(String name, CaseInsensitiveStringMap options) {}
 
+  @Override
+  public Set<TableCatalogCapability> capabilities() {
+    return asTableCatalog().capabilities();
+  }
+
   @Override
   public String[] defaultNamespace() {
     return delegate.defaultNamespace();
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
index 84a2a0f7648..5ccb15ff1f0 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
@@ -33,16 +33,31 @@ import org.apache.spark.annotation.Evolving;
 public enum TableCatalogCapability {
 
   /**
-  * Signals that the TableCatalog supports defining generated columns upon table creation in SQL.
-  * <p>
-  * Without this capability, any create/replace table statements with a generated column defined
-  * in the table schema will throw an exception during analysis.
-  * <p>
-  * A generated column is defined with syntax: {@code colName colType GENERATED ALWAYS AS (expr)}
-  * <p>
-  * Generation expression are included in the column definition for APIs like
-  * {@link TableCatalog#createTable}.
-  * See {@link Column#generationExpression()}.
-  */
-  SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS
+   * Signals that the TableCatalog supports defining generated columns upon table creation in SQL.
+   * <p>
+   * Without this capability, any create/replace table statements with a generated column defined
+   * in the table schema will throw an exception during analysis.
+   * <p>
+   * A generated column is defined with syntax: {@code colName colType GENERATED ALWAYS AS (expr)}
+   * <p>
+   * Generation expression are included in the column definition for APIs like
+   * {@link TableCatalog#createTable}.
+   * See {@link Column#generationExpression()}.
+   */
+  SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS,
+
+  /**
+   * Signals that the TableCatalog supports defining column default value as expression in
+   * CREATE/REPLACE/ALTER TABLE.
+   * <p>
+   * Without this capability, any CREATE/REPLACE/ALTER TABLE statement with a column default value
+   * defined in the table schema will throw an exception during analysis.
+   * <p>
+   * A column default value is defined with syntax: {@code colName colType DEFAULT expr}
+   * <p>
+   * Column default value expression is included in the column definition for APIs like
+   * {@link TableCatalog#createTable}.
+   * See {@link Column#defaultValue()}.
+   */
+  SUPPORT_COLUMN_DEFAULT_VALUE
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
index 6ff5df98d3c..9a1ce5b0295 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
 import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
-import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog, TableCatalogCapability}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, TableCatalog, TableCatalogCapability}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
@@ -182,12 +182,13 @@ object GeneratedColumn {
   def validateGeneratedColumns(
       schema: StructType,
       catalog: TableCatalog,
-      ident: Seq[String],
+      ident: Identifier,
       statementType: String): Unit = {
     if (hasGeneratedColumns(schema)) {
       if (!catalog.capabilities().contains(
         TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS)) {
-        throw QueryCompilationErrors.generatedColumnsUnsupported(ident)
+        throw QueryCompilationErrors.unsupportedTableOperationError(
+          catalog, ident, "generated columns")
       }
       GeneratedColumn.verifyGeneratedColumns(schema, statementType)
     }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index be7d74b0782..d0287cc602b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
-import org.apache.spark.sql.connector.catalog.{CatalogManager, FunctionCatalog, Identifier}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, FunctionCatalog, Identifier, TableCatalog, TableCatalogCapability}
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
@@ -90,39 +90,15 @@ object ResolveDefaultColumns {
    * EXISTS_DEFAULT metadata for such columns where the value is not present in storage.
    *
    * @param tableSchema   represents the names and types of the columns of the statement to process.
-   * @param tableProvider provider of the target table to store default values for, if any.
    * @param statementType name of the statement being processed, such as INSERT; useful for errors.
-   * @param addNewColumnToExistingTable true if the statement being processed adds a new column to
-   *                                    a table that already exists.
    * @return a copy of `tableSchema` with field metadata updated with the constant-folded values.
    */
   def constantFoldCurrentDefaultsToExistDefaults(
       tableSchema: StructType,
-      tableProvider: Option[String],
-      statementType: String,
-      addNewColumnToExistingTable: Boolean): StructType = {
+      statementType: String): StructType = {
     if (SQLConf.get.enableDefaultColumns) {
-      val keywords: Array[String] =
-        SQLConf.get.getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS)
-          .toLowerCase().split(",").map(_.trim)
-      val allowedTableProviders: Array[String] =
-        keywords.map(_.stripSuffix("*"))
-      val addColumnExistingTableBannedProviders: Array[String] =
-        keywords.filter(_.endsWith("*")).map(_.stripSuffix("*"))
-      val givenTableProvider: String = tableProvider.getOrElse("").toLowerCase()
       val newFields: Seq[StructField] = tableSchema.fields.map { field =>
         if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
-          // Make sure that the target table has a provider that supports default column values.
-          if (!allowedTableProviders.contains(givenTableProvider)) {
-            throw QueryCompilationErrors
-              .defaultReferencesNotAllowedInDataSource(statementType, givenTableProvider)
-          }
-          if (addNewColumnToExistingTable &&
-            givenTableProvider.nonEmpty &&
-            addColumnExistingTableBannedProviders.contains(givenTableProvider)) {
-            throw QueryCompilationErrors
-              .addNewDefaultColumnToExistingTableNotAllowed(statementType, givenTableProvider)
-          }
           val analyzed: Expression = analyze(field, statementType)
           val newMetadata: Metadata = new MetadataBuilder().withMetadata(field.metadata)
             .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, analyzed.sql).build()
@@ -137,6 +113,47 @@ object ResolveDefaultColumns {
     }
   }
 
+  // Fails if the given catalog does not support column default value.
+  def validateCatalogForDefaultValue(
+      schema: StructType,
+      catalog: TableCatalog,
+      ident: Identifier): Unit = {
+    if (SQLConf.get.enableDefaultColumns &&
+      schema.exists(_.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) &&
+      !catalog.capabilities().contains(TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE)) {
+      throw QueryCompilationErrors.unsupportedTableOperationError(
+        catalog, ident, "column default value")
+    }
+  }
+
+  // Fails if the given table provider of the session catalog does not support column default value.
+  def validateTableProviderForDefaultValue(
+      schema: StructType,
+      tableProvider: Option[String],
+      statementType: String,
+      addNewColumnToExistingTable: Boolean): Unit = {
+    if (SQLConf.get.enableDefaultColumns &&
+      schema.exists(_.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY))) {
+      val keywords: Array[String] = SQLConf.get.getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS)
+        .toLowerCase().split(",").map(_.trim)
+      val allowedTableProviders: Array[String] = keywords.map(_.stripSuffix("*"))
+      val addColumnExistingTableBannedProviders: Array[String] =
+        keywords.filter(_.endsWith("*")).map(_.stripSuffix("*"))
+      val givenTableProvider: String = tableProvider.getOrElse("").toLowerCase()
+      // Make sure that the target table has a provider that supports default column values.
+      if (!allowedTableProviders.contains(givenTableProvider)) {
+        throw QueryCompilationErrors.defaultReferencesNotAllowedInDataSource(
+          statementType, givenTableProvider)
+      }
+      if (addNewColumnToExistingTable &&
+        givenTableProvider.nonEmpty &&
+        addColumnExistingTableBannedProviders.contains(givenTableProvider)) {
+        throw QueryCompilationErrors.addNewDefaultColumnToExistingTableNotAllowed(
+          statementType, givenTableProvider)
+      }
+    }
+  }
+
   /**
    * Parses and analyzes the DEFAULT column text in `field`, returning an error upon failure.
    *
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 12a8db92363..e5d9720bb02 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -248,8 +248,9 @@ private[sql] object CatalogV2Util {
       val (before, after) = schema.fields.splitAt(fieldIndex + 1)
       StructType(before ++ (field +: after))
     }
-    constantFoldCurrentDefaultsToExistDefaults(
+    validateTableProviderForDefaultValue(
       newSchema, tableProvider, statementType, addNewColumnToExistingTable)
+    constantFoldCurrentDefaultsToExistDefaults(newSchema, statementType)
   }
 
   private def replace(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 9506c99af96..1376408fb21 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -749,13 +749,28 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map.empty)
   }
 
-  def operationOnlySupportedWithV2TableError(
-      nameParts: Seq[String],
+  def unsupportedTableOperationError(
+      catalog: CatalogPlugin,
+      ident: Identifier,
+      operation: String): Throwable = {
+    unsupportedTableOperationError(
+      catalog.name +: ident.namespace :+ ident.name, operation)
+  }
+
+  def unsupportedTableOperationError(
+      ident: TableIdentifier,
+      operation: String): Throwable = {
+    unsupportedTableOperationError(
+      Seq(ident.catalog.get, ident.database.get, ident.table), operation)
+  }
+
+  private def unsupportedTableOperationError(
+      qualifiedTableName: Seq[String],
       operation: String): Throwable = {
     new AnalysisException(
       errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
       messageParameters = Map(
-        "tableName" -> toSQLId(nameParts),
+        "tableName" -> toSQLId(qualifiedTableName),
         "operation" -> operation))
   }
 
@@ -3401,16 +3416,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
     }
   }
 
-  def generatedColumnsUnsupported(nameParts: Seq[String]): AnalysisException = {
-    new AnalysisException(
-      errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
-      messageParameters = Map(
-        "tableName" -> toSQLId(nameParts),
-        "operation" -> "generated columns"
-      )
-    )
-  }
-
   def ambiguousLateralColumnAliasError(name: String, numOfMatches: Int): Throwable = {
     new AnalysisException(
       errorClass = "AMBIGUOUS_LATERAL_COLUMN_ALIAS",
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index a7254865c1e..9959dbf6516 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -177,7 +177,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
       // disabled.
       withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
         val result: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
-          db1tbl3.schema, db1tbl3.provider, "CREATE TABLE", false)
+          db1tbl3.schema, "CREATE TABLE")
         val columnEWithFeatureDisabled: StructField = findField("e", result)
         // No constant-folding has taken place to the EXISTS_DEFAULT metadata.
         assert(!columnEWithFeatureDisabled.metadata.contains("EXISTS_DEFAULT"))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index e82f203742b..8a744c1c198 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -172,7 +172,10 @@ class BasicInMemoryTableCatalog extends TableCatalog {
 class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamespaces {
 
   override def capabilities: java.util.Set[TableCatalogCapability] = {
-    Set(TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS).asJava
+    Set(
+      TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE,
+      TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS
+    ).asJava
   }
 
   protected def allNamespaces: Seq[Seq[String]] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 7b2d5015840..b2b35b40492 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -53,9 +53,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
     case AddColumns(ResolvedV1TableIdentifier(ident), cols) =>
       cols.foreach { c =>
         if (c.name.length > 1) {
-          throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
-            Seq(ident.catalog.get, ident.database.get, ident.table),
-            "ADD COLUMN with qualified column")
+          throw QueryCompilationErrors.unsupportedTableOperationError(
+            ident, "ADD COLUMN with qualified column")
         }
         if (!c.nullable) {
           throw QueryCompilationErrors.addColumnWithV1TableCannotSpecifyNotNullError
@@ -64,24 +63,20 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
       AlterTableAddColumnsCommand(ident, cols.map(convertToStructField))
 
     case ReplaceColumns(ResolvedV1TableIdentifier(ident), _) =>
-      throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
-        Seq(ident.catalog.get, ident.database.get, ident.table),
-        "REPLACE COLUMNS")
+      throw QueryCompilationErrors.unsupportedTableOperationError(ident, "REPLACE COLUMNS")
 
     case a @ AlterColumn(ResolvedTable(catalog, ident, table: V1Table, _), _, _, _, _, _, _)
         if isSessionCatalog(catalog) =>
       if (a.column.name.length > 1) {
-        throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
-          Seq(catalog.name, ident.namespace()(0), ident.name),
-          "ALTER COLUMN with qualified column")
+        throw QueryCompilationErrors.unsupportedTableOperationError(
+          catalog, ident, "ALTER COLUMN with qualified column")
       }
       if (a.nullable.isDefined) {
         throw QueryCompilationErrors.alterColumnWithV1TableCannotSpecifyNotNullError
       }
       if (a.position.isDefined) {
-        throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
-          Seq(catalog.name, ident.namespace()(0), ident.name),
-          "ALTER COLUMN ... FIRST | ALTER")
+        throw QueryCompilationErrors.unsupportedTableOperationError(
+          catalog, ident, "ALTER COLUMN ... FIRST | ALTER")
       }
       val builder = new MetadataBuilder
       // Add comment to metadata
@@ -105,14 +100,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
       AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn)
 
     case RenameColumn(ResolvedV1TableIdentifier(ident), _, _) =>
-      throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
-        Seq(ident.catalog.get, ident.database.get, ident.table),
-        "RENAME COLUMN")
+      throw QueryCompilationErrors.unsupportedTableOperationError(ident, "RENAME COLUMN")
 
     case DropColumns(ResolvedV1TableIdentifier(ident), _, _) =>
-      throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
-        Seq(ident.catalog.get, ident.database.get, ident.table),
-        "DROP COLUMN")
+      throw QueryCompilationErrors.unsupportedTableOperationError(ident, "DROP COLUMN")
 
     case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
       AlterTableSetPropertiesCommand(ident, props, isView = false)
@@ -204,9 +195,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
     case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) =>
       val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
-        throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
-          Seq(ident.catalog.get, ident.database.get, ident.table),
-          "REPLACE TABLE")
+        throw QueryCompilationErrors.unsupportedTableOperationError(
+          ident, "REPLACE TABLE")
       } else {
         c
       }
@@ -214,9 +204,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
     case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) =>
       val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
-        throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
-          Seq(ident.catalog.get, ident.database.get, ident.table),
-          "REPLACE TABLE AS SELECT")
+        throw QueryCompilationErrors.unsupportedTableOperationError(
+          ident, "REPLACE TABLE AS SELECT")
       } else {
         c
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index f6266bcb33f..351f6d5456d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -289,8 +289,11 @@ case class AlterTableAddColumnsCommand(
       sparkSession: SparkSession, tableProvider: Option[String]): Seq[StructField] = {
     colsToAdd.map { col: StructField =>
       if (col.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
+        val schema = StructType(Array(col))
+        ResolveDefaultColumns.validateTableProviderForDefaultValue(
+          schema, tableProvider, "ALTER TABLE ADD COLUMNS", true)
         val foldedStructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
-          StructType(Array(col)), tableProvider, "ALTER TABLE ADD COLUMNS", true)
+          schema, "ALTER TABLE ADD COLUMNS")
         foldedStructType.fields(0)
       } else {
         col
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 74539b54117..e3a1f6f6b68 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -133,14 +133,15 @@ case class DataSourceAnalysis(analyzer: Analyzer) extends Rule[LogicalPlan] {
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) =>
+      ResolveDefaultColumns.validateTableProviderForDefaultValue(
+        tableDesc.schema, tableDesc.provider, "CREATE TABLE", false)
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
-          tableDesc.schema, tableDesc.provider, "CREATE TABLE", false)
+          tableDesc.schema, "CREATE TABLE")
 
       if (GeneratedColumn.hasGeneratedColumns(newSchema)) {
-        throw QueryCompilationErrors.generatedColumnsUnsupported(
-          Seq(tableDesc.identifier.catalog.get, tableDesc.identifier.database.get,
-            tableDesc.identifier.table))
+        throw QueryCompilationErrors.unsupportedTableOperationError(
+          tableDesc.identifier, "generated columns")
       }
 
       val newTableDesc = tableDesc.copy(schema = newSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 4d84c42bc5b..55dc8b54e00 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
@@ -175,11 +176,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
 
     case CreateTable(ResolvedIdentifier(catalog, ident), schema, partitioning,
         tableSpec, ifNotExists) =>
+      ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident)
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
-          schema, tableSpec.provider, "CREATE TABLE", false)
+          schema, "CREATE TABLE")
       GeneratedColumn.validateGeneratedColumns(
-        newSchema, catalog.asTableCatalog, ident.asMultipartIdentifier, "CREATE TABLE")
+        newSchema, catalog.asTableCatalog, ident, "CREATE TABLE")
 
       CreateTableExec(catalog.asTableCatalog, ident, structTypeToV2Columns(newSchema),
         partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
@@ -201,11 +203,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil
 
     case ReplaceTable(ResolvedIdentifier(catalog, ident), schema, parts, tableSpec, orCreate) =>
+      ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident)
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
-          schema, tableSpec.provider, "CREATE TABLE", false)
+          schema, "CREATE TABLE")
       GeneratedColumn.validateGeneratedColumns(
-        newSchema, catalog.asTableCatalog, ident.asMultipartIdentifier, "CREATE TABLE")
+        newSchema, catalog.asTableCatalog, ident, "CREATE TABLE")
 
       val v2Columns = structTypeToV2Columns(newSchema)
       catalog match {
@@ -308,12 +311,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
           }
         case LogicalRelation(_, _, catalogTable, _) =>
           val tableIdentifier = catalogTable.get.identifier
-          throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
-            Seq(tableIdentifier.catalog.get, tableIdentifier.database.get, tableIdentifier.table),
+          throw QueryCompilationErrors.unsupportedTableOperationError(
+            tableIdentifier,
             "DELETE")
-        case _ =>
-          throw QueryCompilationErrors.operationOnlySupportedWithV2TableError(
-            Seq(), "DELETE")
+        case other =>
+          throw SparkException.internalError("Unexpected table relation: " + other)
       }
 
     case ReplaceData(_: DataSourceV2Relation, _, query, r: DataSourceV2Relation, Some(write)) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index 461e948b029..b4789c98df9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table}
 import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.connector.expressions.Transform
@@ -51,6 +51,12 @@ class V2SessionCatalog(catalog: SessionCatalog)
   // This class is instantiated by Spark, so `initialize` method will not be called.
   override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}
 
+  override def capabilities(): util.Set[TableCatalogCapability] = {
+    Set(
+      TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE
+    ).asJava
+  }
+
   override def listTables(namespace: Array[String]): Array[Identifier] = {
     namespace match {
       case Array(db) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 5b7acda6fc3..d95a372d43a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1422,6 +1422,38 @@ class DataSourceV2SQLSuiteV1Filter
     }
   }
 
+  test("SPARK-42684: Column default value only allowed with TableCatalogs that " +
+    "SUPPORT_COLUMN_DEFAULT_VALUE") {
+    val tblName = "my_tab"
+    val tableDefinition =
+      s"$tblName(c1 INT, c2 INT DEFAULT 0)"
+    for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) {
+      // InMemoryTableCatalog.capabilities() contains SUPPORT_COLUMN_DEFAULT_VALUE
+      withTable(s"testcat.$tblName") {
+        if (statement == "REPLACE TABLE") {
+          sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo")
+        }
+        // Can create table with a generated column
+        sql(s"$statement testcat.$tableDefinition")
+        assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName)))
+      }
+      // BasicInMemoryTableCatalog.capabilities() = {}
+      withSQLConf("spark.sql.catalog.dummy" -> classOf[BasicInMemoryTableCatalog].getName) {
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("USE dummy")
+            sql(s"$statement dummy.$tableDefinition")
+          },
+          errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+          parameters = Map(
+            "tableName" -> "`dummy`.`my_tab`",
+            "operation" -> "column default value"
+          )
+        )
+      }
+    }
+  }
+
   test("SPARK-41290: Generated columns only allowed with TableCatalogs that " +
     "SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS") {
     val tblName = "my_tab"
@@ -1446,7 +1478,7 @@ class DataSourceV2SQLSuiteV1Filter
           },
           errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
           parameters = Map(
-            "tableName" -> "`my_tab`",
+            "tableName" -> "`dummy`.`my_tab`",
             "operation" -> "generated columns"
           )
         )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org