You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/03/26 04:12:19 UTC

[spark] branch master updated: [SPARK-38336][SQL] Support DEFAULT column values in CREATE/REPLACE TABLE statements

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e95738  [SPARK-38336][SQL] Support DEFAULT column values in CREATE/REPLACE TABLE statements
4e95738 is described below

commit 4e95738fdfc334c25f44689ff8c2db5aa7c726f2
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Sat Mar 26 12:09:56 2022 +0800

    [SPARK-38336][SQL] Support DEFAULT column values in CREATE/REPLACE TABLE statements
    
    ### What changes were proposed in this pull request?
    
    Extend CREATE TABLE and REPLACE TABLE statements to support columns with DEFAULT values. This information will be stored in the column metadata.
    
    ### Why are the changes needed?
    
    This builds the foundation for future work (not included in this PR) to support INSERT INTO statements, which may then omit the default values or refer to them explicitly with the DEFAULT keyword, in which case the Spark analyzer will automatically insert the appropriate corresponding values in the right places.
    
    Example:
    ```
    CREATE TABLE T(a INT DEFAULT 4, b INT NOT NULL DEFAULT 5);
    INSERT INTO T VALUES (1);
    INSERT INTO T VALUES (1, DEFAULT);
    INSERT INTO T VALUES (DEFAULT, 6);
    SELECT * FROM T;
    (1, 5)
    (1, 5)
    (4, 6)
    ```
    
    ### How was this patch tested?
    
    This change is covered by new and existing unit test coverage as well as new INSERT INTO query test cases covering a variety of positive and negative scenarios.
    
    Closes #35855 from dtenedor/default-cols-create-table.
    
    Authored-by: Daniel Tenedorio <da...@databricks.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  34 ++++-
 .../sql/catalyst/util/ResolveDefaultColumns.scala  | 153 +++++++++++++++++++++
 .../spark/sql/errors/QueryParsingErrors.scala      |   4 +
 .../org/apache/spark/sql/internal/SQLConf.scala    |  13 ++
 .../catalyst/catalog/ExternalCatalogSuite.scala    |  44 ++++--
 .../sql/catalyst/catalog/SessionCatalogSuite.scala |  62 +++++++++
 .../spark/sql/catalyst/parser/DDLParserSuite.scala |  56 ++++++--
 .../datasources/v2/DataSourceV2Strategy.scala      |  15 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |  14 +-
 .../org/apache/spark/sql/hive/InsertSuite.scala    |  22 +--
 10 files changed, 369 insertions(+), 48 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 3fcd8d8..01e627f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, Set}
 
 import org.antlr.v4.runtime.{ParserRuleContext, Token}
+import org.antlr.v4.runtime.misc.Interval
 import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
 import org.apache.commons.codec.DecoderException
 import org.apache.commons.codec.binary.Hex
@@ -39,7 +40,7 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
-import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, IntervalUtils}
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, IntervalUtils, ResolveDefaultColumns}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, stringToTimestamp, stringToTimestampWithoutTimeZone}
 import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog}
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -2788,13 +2789,18 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     Option(commentSpec()).map(visitCommentSpec).foreach {
       builder.putString("comment", _)
     }
+    // Add the 'DEFAULT expression' clause in the column definition, if any, to the column metadata.
+    Option(ctx.defaultExpression()).map(visitDefaultExpression).foreach { field =>
+      if (conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) {
+        // Add default to metadata
+        builder.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, field)
+        builder.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, field)
+      } else {
+        throw QueryParsingErrors.defaultColumnNotEnabledError(ctx)
+      }
+    }
 
-    // Process the 'DEFAULT expression' clause in the column definition, if any.
     val name: String = colName.getText
-    val defaultExpr = Option(ctx.defaultExpression()).map(visitDefaultExpression)
-    if (defaultExpr.isDefined) {
-      throw QueryParsingErrors.defaultColumnNotImplementedYetError(ctx)
-    }
 
     StructField(
       name = name,
@@ -2852,6 +2858,22 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
   }
 
   /**
+   * Create a default string.
+   */
+  override def visitDefaultExpression(ctx: DefaultExpressionContext): String = withOrigin(ctx) {
+    val exprCtx = ctx.expression()
+    // Make sure it can be converted to Catalyst expressions.
+    expression(exprCtx)
+    // Extract the raw expression text so that we can save the user provided text. We don't
+    // use `Expression.sql` to avoid storing incorrect text caused by bugs in any expression's
+    // `sql` method. Note: `exprCtx.getText` returns a string without spaces, so we need to
+    // get the text from the underlying char stream instead.
+    val start = exprCtx.getStart.getStartIndex
+    val end = exprCtx.getStop.getStopIndex
+    exprCtx.getStart.getInputStream.getText(new Interval(start, end))
+  }
+
+  /**
    * Create an optional comment string.
    */
   protected def visitCommentSpecList(ctx: java.util.List[CommentSpecContext]): Option[String] = {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumns.scala
new file mode 100644
index 0000000..1e40756
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumns.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This object contains fields to help process DEFAULT columns.
+ */
+object ResolveDefaultColumns {
+  // This column metadata indicates the default value associated with a particular table column that
+  // is in effect at any given time. Its value begins at the time of the initial CREATE/REPLACE
+  // TABLE statement with DEFAULT column definition(s), if any. It then changes whenever an ALTER
+  // TABLE statement SETs the DEFAULT. The intent is for this "current default" to be used by
+  // UPDATE, INSERT and MERGE, which evaluate each default expression for each row.
+  val CURRENT_DEFAULT_COLUMN_METADATA_KEY = "CURRENT_DEFAULT"
+  // This column metadata represents the default value for all existing rows in a table after a
+  // column has been added. This value is determined at time of CREATE TABLE, REPLACE TABLE, or
+  // ALTER TABLE ADD COLUMN, and never changes thereafter. The intent is for this "exist default"
+  // to be used by any scan when the columns in the source row are missing data. For example,
+  // consider the following sequence:
+  // CREATE TABLE t (c1 INT)
+  // INSERT INTO t VALUES (42)
+  // ALTER TABLE t ADD COLUMNS (c2 INT DEFAULT 43)
+  // SELECT c1, c2 FROM t
+  // In this case, the final query is expected to return 42, 43. The ALTER TABLE ADD COLUMNS command
+  // executed after there was already data in the table, so in order to enforce this invariant,
+  // we need either (1) an expensive backfill of value 43 at column c2 into all previous rows, or
+  // (2) indicate to each data source that selected columns missing data are to generate the
+  // corresponding DEFAULT value instead. We choose option (2) for efficiency, and represent this
+  // value as the text representation of a folded constant in the "EXISTS_DEFAULT" column metadata.
+  val EXISTS_DEFAULT_COLUMN_METADATA_KEY = "EXISTS_DEFAULT"
+
+  /**
+   * Finds "current default" expressions in CREATE/REPLACE TABLE columns and constant-folds them.
+   *
+   * The results are stored in the "exists default" metadata of the same columns. For example, in
+   * the event of this statement:
+   *
+   * CREATE TABLE T(a INT, b INT DEFAULT 5 + 5)
+   *
+   * This method constant-folds the "current default" value, stored in the CURRENT_DEFAULT metadata
+   * of the "b" column, to "10", storing the result in the "exists default" value within the
+   * EXISTS_DEFAULT metadata of that same column. Meanwhile the "current default" metadata of this
+   * "b" column retains its original value of "5 + 5".
+   *
+   * The reason for constant-folding the EXISTS_DEFAULT is to make the end-user visible behavior the
+   * same, after executing an ALTER TABLE ADD COLUMNS command with DEFAULT value, as if the system
+   * had performed an exhaustive backfill of the provided value to all previously existing rows in
+   * the table instead. We choose to avoid doing such a backfill because it would be a
+   * time-consuming and costly operation. Instead, we elect to store the EXISTS_DEFAULT in the
+   * column metadata for future reference when querying data out of the data source. In turn, each
+   * data source then takes responsibility to provide the constant-folded value in the
+   * EXISTS_DEFAULT metadata for such columns where the value is not present in storage.
+   *
+   * @param analyzer used for analyzing the result of parsing the column expression stored as text.
+   * @param tableSchema represents the names and types of the columns of the statement to process.
+   * @param statementType name of the statement being processed, such as INSERT; useful for errors.
+   * @return a copy of `tableSchema` with field metadata updated with the constant-folded values.
+   */
+  def constantFoldCurrentDefaultsToExistDefaults(
+      analyzer: Analyzer,
+      tableSchema: StructType,
+      statementType: String): StructType = {
+    if (!SQLConf.get.enableDefaultColumns) {
+      return tableSchema
+    }
+    val newFields: Seq[StructField] = tableSchema.fields.map { field =>
+      if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
+        val analyzed: Expression = analyze(analyzer, field, statementType)
+        val newMetadata: Metadata = new MetadataBuilder().withMetadata(field.metadata)
+          .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, analyzed.sql).build()
+        field.copy(metadata = newMetadata)
+      } else {
+        field
+      }
+    }
+    StructType(newFields)
+  }
+
+  /**
+   * Parses and analyzes the DEFAULT column text in `field`, returning an error upon failure.
+   *
+   * @param field represents the DEFAULT column value whose "default" metadata to parse and analyze.
+   * @param statementType which type of statement we are running, such as INSERT; useful for errors.
+   * @return Result of the analysis and constant-folding operation.
+   */
+  def analyze(
+      analyzer: Analyzer,
+      field: StructField,
+      statementType: String): Expression = {
+    // Parse the expression.
+    val colText: String = field.metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY)
+    lazy val parser = new CatalystSqlParser()
+    val parsed: Expression = try {
+      parser.parseExpression(colText)
+    } catch {
+      case ex: ParseException =>
+        throw new AnalysisException(
+          s"Failed to execute $statementType command because the destination table column " +
+            s"${field.name} has a DEFAULT value of $colText which fails to parse as a valid " +
+            s"expression: ${ex.getMessage}")
+    }
+    // Analyze the parse result.
+    val plan = try {
+      val analyzed = analyzer.execute(Project(Seq(Alias(parsed, field.name)()), OneRowRelation()))
+      analyzer.checkAnalysis(analyzed)
+      ConstantFolding(analyzed)
+    } catch {
+      case ex: AnalysisException =>
+        throw new AnalysisException(
+          s"Failed to execute $statementType command because the destination table column " +
+            s"${field.name} has a DEFAULT value of $colText which fails to resolve as a valid " +
+            s"expression: ${ex.getMessage}")
+    }
+    val analyzed: Expression = plan.collectFirst {
+      case Project(Seq(a: Alias), OneRowRelation()) => a.child
+    }.get
+    // Perform implicit coercion from the provided expression type to the required column type.
+    if (field.dataType == analyzed.dataType) {
+      analyzed
+    } else if (Cast.canUpCast(analyzed.dataType, field.dataType)) {
+      Cast(analyzed, field.dataType)
+    } else {
+      throw new AnalysisException(
+        s"Failed to execute $statementType command because the destination table column " +
+          s"${field.name} has a DEFAULT value with type ${field.dataType}, but the " +
+          s"statement provided a value of incompatible type ${analyzed.dataType}")
+    }
+  }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index d055299..d40f276 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -444,4 +444,8 @@ object QueryParsingErrors {
   def defaultColumnNotImplementedYetError(ctx: ParserRuleContext): Throwable = {
     new ParseException("Support for DEFAULT column values is not implemented yet", ctx)
   }
+
+  def defaultColumnNotEnabledError(ctx: ParserRuleContext): Throwable = {
+    new ParseException("Support for DEFAULT column values is not allowed", ctx)
+  }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1bba8b6..3e1872a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2805,6 +2805,17 @@ object SQLConf {
     .booleanConf
     .createWithDefault(sys.env.get("SPARK_ANSI_SQL_MODE").contains("true"))
 
+  val ENABLE_DEFAULT_COLUMNS =
+    buildConf("spark.sql.defaultColumn.enabled")
+      .internal()
+      .doc("When true, allow CREATE TABLE, REPLACE TABLE, and ALTER COLUMN statements to set or " +
+        "update default values for specific columns. Following INSERT, MERGE, and UPDATE " +
+        "statements may then omit these values and their values will be injected automatically " +
+        "instead.")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val ENFORCE_RESERVED_KEYWORDS = buildConf("spark.sql.ansi.enforceReservedKeywords")
     .doc(s"When true and '${ANSI_ENABLED.key}' is true, the Spark SQL parser enforces the ANSI " +
       "reserved keywords and forbids SQL queries that use reserved keywords as alias names " +
@@ -4305,6 +4316,8 @@ class SQLConf extends Serializable with Logging {
 
   def ansiEnabled: Boolean = getConf(ANSI_ENABLED)
 
+  def enableDefaultColumns: Boolean = getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)
+
   def enforceReservedKeywords: Boolean = ansiEnabled && getConf(ENFORCE_RESERVED_KEYWORDS)
 
   def strictIndexOperator: Boolean = ansiEnabled && getConf(ANSI_STRICT_INDEX_OPERATOR)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index f791f77..d26501b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -27,10 +27,10 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -1025,16 +1025,44 @@ abstract class CatalogTestUtils {
 
   def newTable(name: String, db: String): CatalogTable = newTable(name, Some(db))
 
-  def newTable(name: String, database: Option[String] = None): CatalogTable = {
+  def newTable(
+      name: String,
+      database: Option[String] = None,
+      defaultColumns: Boolean = false): CatalogTable = {
     CatalogTable(
       identifier = TableIdentifier(name, database),
       tableType = CatalogTableType.EXTERNAL,
       storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)),
-      schema = new StructType()
-        .add("col1", "int")
-        .add("col2", "string")
-        .add("a", "int")
-        .add("b", "string"),
+      schema = if (defaultColumns) {
+        new StructType()
+          .add("col1", "int")
+          .add("col2", "string")
+          .add("a", IntegerType, nullable = true,
+            new MetadataBuilder().putString(
+              ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "42").build())
+          .add("b", StringType, nullable = false,
+            new MetadataBuilder().putString(
+              ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "\"abc\"").build())
+          // The default value fails to parse.
+          .add("c", LongType, nullable = false,
+            new MetadataBuilder().putString(
+              ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "_@#$%").build())
+          // The default value fails to resolve.
+          .add("d", LongType, nullable = false,
+            new MetadataBuilder().putString(
+              ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
+              "(select min(x) from badtable)").build())
+          // The default value fails to coerce to the required type.
+          .add("e", BooleanType, nullable = false,
+            new MetadataBuilder().putString(
+              ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "41 + 1").build())
+      } else {
+        new StructType()
+          .add("col1", "int")
+          .add("col2", "string")
+          .add("a", "int")
+          .add("b", "string")
+      },
       provider = Some(defaultProvider),
       partitionColumnNames = Seq("a", "b"),
       bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index b134085..8769a6f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, LogicalPlan, Project, Range, SubqueryAlias, View}
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
 import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -120,6 +121,67 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
     assert(e.contains(s"`$name` is not a valid name for tables/databases."))
   }
 
+  test("create table with default columns") {
+    withBasicCatalog { catalog =>
+      assert(catalog.externalCatalog.listTables("db1").isEmpty)
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+      catalog.createTable(newTable(
+        "tbl3", Some("db1"), defaultColumns = true), ignoreIfExists = false)
+      catalog.createTable(newTable(
+        "tbl3", Some("db2"), defaultColumns = true), ignoreIfExists = false)
+      assert(catalog.externalCatalog.listTables("db1").toSet == Set("tbl3"))
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3"))
+      // Inspect the default column values.
+      val db1tbl3 = catalog.externalCatalog.getTable("db1", "tbl3")
+      val currentDefault = ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY
+
+      def findField(name: String, schema: StructType): StructField =
+        schema.fields.filter(_.name == name).head
+      val columnA: StructField = findField("a", db1tbl3.schema)
+      val columnB: StructField = findField("b", db1tbl3.schema)
+      val columnC: StructField = findField("c", db1tbl3.schema)
+      val columnD: StructField = findField("d", db1tbl3.schema)
+      val columnE: StructField = findField("e", db1tbl3.schema)
+
+      val defaultValueColumnA: String = columnA.metadata.getString(currentDefault)
+      val defaultValueColumnB: String = columnB.metadata.getString(currentDefault)
+      val defaultValueColumnC: String = columnC.metadata.getString(currentDefault)
+      val defaultValueColumnD: String = columnD.metadata.getString(currentDefault)
+      val defaultValueColumnE: String = columnE.metadata.getString(currentDefault)
+
+      assert(defaultValueColumnA == "42")
+      assert(defaultValueColumnB == "\"abc\"")
+      assert(defaultValueColumnC == "_@#$%")
+      assert(defaultValueColumnD == "(select min(x) from badtable)")
+      assert(defaultValueColumnE == "41 + 1")
+
+      // Analyze the default column values.
+      val analyzer = new Analyzer(new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin))
+      val statementType = "CREATE TABLE"
+      assert(ResolveDefaultColumns.analyze(analyzer, columnA, statementType).sql == "42")
+      assert(ResolveDefaultColumns.analyze(analyzer, columnB, statementType).sql == "'abc'")
+      assert(intercept[AnalysisException] {
+        ResolveDefaultColumns.analyze(analyzer, columnC, statementType)
+      }.getMessage.contains("fails to parse as a valid expression"))
+      assert(intercept[AnalysisException] {
+        ResolveDefaultColumns.analyze(analyzer, columnD, statementType)
+      }.getMessage.contains("fails to resolve as a valid expression"))
+      assert(intercept[AnalysisException] {
+        ResolveDefaultColumns.analyze(analyzer, columnE, statementType)
+      }.getMessage.contains("statement provided a value of incompatible type"))
+
+      // Make sure that constant-folding default values does not take place when the feature is
+      // disabled.
+      withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+        val result: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
+          analyzer, db1tbl3.schema, "CREATE TABLE")
+        val columnEWithFeatureDisabled: StructField = findField("e", result)
+        // No constant-folding has taken place to the EXISTS_DEFAULT metadata.
+        assert(!columnEWithFeatureDisabled.metadata.contains("EXISTS_DEFAULT"))
+      }
+    }
+  }
+
   test("create databases using invalid names") {
     withEmptyCatalog { catalog =>
       testInvalidName(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 493e6b7..e6ae073 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -22,12 +22,13 @@ import java.util.Locale
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
-import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.logical.{TableSpec => LogicalTableSpec, _}
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
 import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
+import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder, StringType, StructType, TimestampType}
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
 class DDLParserSuite extends AnalysisTest {
@@ -2240,20 +2241,45 @@ class DDLParserSuite extends AnalysisTest {
   }
 
   test("SPARK-38335: Implement parser support for DEFAULT values for columns in tables") {
-    // The following commands will support DEFAULT columns, but this has not been implemented yet.
-    for (sql <- Seq(
-      "ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42",
-      "ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT 42",
-      "ALTER TABLE t1 ALTER COLUMN a.b.c DROP DEFAULT",
-      "ALTER TABLE t1 REPLACE COLUMNS (x STRING DEFAULT 42)",
-      "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING NOT NULL DEFAULT \"abc\") USING parquet",
-      "REPLACE TABLE my_tab(a INT COMMENT 'test', b STRING NOT NULL DEFAULT \"xyz\") USING parquet"
-    )) {
-      val exc = intercept[ParseException] {
-        parsePlan(sql);
-      }
-      assert(exc.getMessage.contains("Support for DEFAULT column values is not implemented yet"));
+    // The following ALTER TABLE commands will support DEFAULT columns, but this has not been
+    // implemented yet.
+    val unsupportedError = "Support for DEFAULT column values is not implemented yet"
+    assert(intercept[ParseException] {
+      parsePlan("ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42")
+    }.getMessage.contains(unsupportedError))
+    assert(intercept[ParseException] {
+      parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT 42")
+    }.getMessage.contains(unsupportedError))
+    assert(intercept[ParseException] {
+      parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c DROP DEFAULT")
+    }.getMessage.contains(unsupportedError))
+    assert(intercept[ParseException] {
+      parsePlan("ALTER TABLE t1 REPLACE COLUMNS (x STRING DEFAULT 42)")
+    }.getMessage.contains(unsupportedError))
+    // These CREATE/REPLACE TABLE statements should parse successfully.
+    val schemaWithDefaultColumn = new StructType()
+      .add("a", IntegerType, true)
+      .add("b", StringType, false,
+        new MetadataBuilder()
+          .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "\"abc\"")
+          .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "\"abc\"").build())
+    comparePlans(parsePlan(
+      "CREATE TABLE my_tab(a INT, b STRING NOT NULL DEFAULT \"abc\") USING parquet"),
+      CreateTable(UnresolvedDBObjectName(Seq("my_tab"), false), schemaWithDefaultColumn,
+        Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
+          Map.empty[String, String], None, None, None, false), false))
+    comparePlans(parsePlan("REPLACE TABLE my_tab(a INT, " +
+      "b STRING NOT NULL DEFAULT \"abc\") USING parquet"),
+      ReplaceTable(UnresolvedDBObjectName(Seq("my_tab"), false), schemaWithDefaultColumn,
+        Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
+          Map.empty[String, String], None, None, None, false), false))
+    // Make sure that the parser returns an exception when the feature is disabled.
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+      intercept(
+        "CREATE TABLE my_tab(a INT, b STRING NOT NULL DEFAULT \"abc\") USING parquet",
+        "Support for DEFAULT column values is not allowed")
     }
+
     // In each of the following cases, the DEFAULT reference parses as an unresolved attribute
     // reference. We can handle these cases after the parsing stage, at later phases of analysis.
     comparePlans(parsePlan("VALUES (1, 2, DEFAULT) AS val"),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index c0b00a4..d179079 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, Not, Or, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.{toPrettySQL, V2ExpressionBuilder}
+import org.apache.spark.sql.catalyst.util.{toPrettySQL, ResolveDefaultColumns, V2ExpressionBuilder}
 import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog}
 import org.apache.spark.sql.connector.catalog.index.SupportsIndex
 import org.apache.spark.sql.connector.expressions.{FieldReference}
@@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
 import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
 import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.StorageLevel
 
@@ -168,7 +169,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
 
     case CreateTable(ResolvedDBObjectName(catalog, ident), schema, partitioning,
         tableSpec, ifNotExists) =>
-      CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema,
+      val newSchema: StructType =
+        ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
+          session.sessionState.analyzer, schema, "CREATE TABLE")
+      CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, newSchema,
         partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
 
     case CreateTableAsSelect(ResolvedDBObjectName(catalog, ident), parts, query, tableSpec,
@@ -187,12 +191,15 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil
 
     case ReplaceTable(ResolvedDBObjectName(catalog, ident), schema, parts, tableSpec, orCreate) =>
+      val newSchema: StructType =
+        ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
+          session.sessionState.analyzer, schema, "CREATE TABLE")
       catalog match {
         case staging: StagingTableCatalog =>
-          AtomicReplaceTableExec(staging, ident.asIdentifier, schema, parts,
+          AtomicReplaceTableExec(staging, ident.asIdentifier, newSchema, parts,
             qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil
         case _ =>
-          ReplaceTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, parts,
+          ReplaceTableExec(catalog.asTableCatalog, ident.asIdentifier, newSchema, parts,
             qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil
       }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index b64ed08..6f14b1f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchDatabaseException, NoSuchNamespaceException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, ResolveDefaultColumns}
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
@@ -614,15 +614,21 @@ class DataSourceV2SQLSuite
     val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
     assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
 
-    spark.sql("REPLACE TABLE testcat.table_name (id bigint NOT NULL) USING foo")
+    spark.sql("REPLACE TABLE testcat.table_name (id bigint NOT NULL DEFAULT 41 + 1) USING foo")
     val replaced = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
 
     assert(replaced.asInstanceOf[InMemoryTable].rows.isEmpty,
         "Replaced table should have no rows after committing.")
     assert(replaced.schema().fields.length === 1,
         "Replaced table should have new schema.")
-    assert(replaced.schema().fields(0) === StructField("id", LongType, nullable = false),
-      "Replaced table should have new schema.")
+    val actual = replaced.schema().fields(0)
+    val expected = StructField("id", LongType, nullable = false,
+      new MetadataBuilder().putString(
+        ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "41 + 1")
+        .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "CAST(42 AS BIGINT)")
+        .build())
+    assert(actual === expected,
+      "Replaced table should have new schema with DEFAULT column metadata.")
   }
 
   test("ReplaceTableAsSelect: CREATE OR REPLACE new table has same behavior as CTAS.") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index 9e29386..4dc08a0e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -869,25 +869,25 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
             |SORTED BY (s1)
             |INTO 200 BUCKETS
             |STORED AS PARQUET
-          """.stripMargin
+        """.stripMargin
         } else {
           """
-             |CREATE TABLE test1(
-             |v1 BIGINT,
-             |s1 INT)
-             |USING PARQUET
-             |PARTITIONED BY (pk BIGINT)
-             |CLUSTERED BY (v1)
-             |SORTED BY (s1)
-             |INTO 200 BUCKETS
-          """.stripMargin
+            |CREATE TABLE test1(
+            |v1 BIGINT,
+            |s1 INT)
+            |USING PARQUET
+            |PARTITIONED BY (pk BIGINT)
+            |CLUSTERED BY (v1)
+            |SORTED BY (s1)
+            |INTO 200 BUCKETS
+        """.stripMargin
         }
 
         val insertString =
           """
             |INSERT INTO test1
             |SELECT * FROM VALUES(1,1,1)
-          """.stripMargin
+        """.stripMargin
 
         val dropString = "DROP TABLE IF EXISTS test1"
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org