You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2023/06/12 04:31:29 UTC

[spark] branch master updated: [SPARK-43529][SQL] Support general constant expressions as CREATE/REPLACE TABLE OPTIONS values

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7250941ab5b [SPARK-43529][SQL] Support general constant expressions as CREATE/REPLACE TABLE OPTIONS values
7250941ab5b is described below

commit 7250941ab5b8ea1c1dc720f2b6407404ac7020bb
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Sun Jun 11 21:31:03 2023 -0700

    [SPARK-43529][SQL] Support general constant expressions as CREATE/REPLACE TABLE OPTIONS values
    
    ### What changes were proposed in this pull request?
    
    This PR updates the SQL compiler to support general constnat expressions in the syntax for CREATE/REPLACE TABLE OPTIONS values, rather than restricting to a few types of literals only.
    
    * The analyzer now checks that the provided expressions are in fact `foldable`, and throws an error message otherwise.
    * This error message that users encounter in these cases improves from a general "syntax error at or near <location>" to instead indicate that the syntax is valid, but only constant expressions are supported in these contexts.
    
    ### Why are the changes needed?
    
    This makes it easier to provide OPTIONS lists in SQL, supporting use cases like concatenating strings with `||`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the SQL syntax changes.
    
    ### How was this patch tested?
    
    This PR adds new unit test coverage.
    
    Closes #41191 from dtenedor/expression-properties.
    
    Authored-by: Daniel Tenedorio <da...@databricks.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 core/src/main/resources/error/error-classes.json   |  5 ++
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     | 10 ++-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  1 +
 .../sql/catalyst/analysis/ResolveTableSpec.scala   | 90 ++++++++++++++++++++
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 69 ++++++++++-----
 .../sql/catalyst/plans/logical/v2Commands.scala    | 70 +++++++++++++--
 .../sql/catalyst/rules/RuleIdCollection.scala      |  1 +
 .../apache/spark/sql/catalyst/trees/TreeNode.scala |  6 +-
 .../sql/connector/catalog/CatalogV2Util.scala      |  5 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  | 17 ++++
 .../CreateTablePartitioningValidationSuite.scala   | 18 +---
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 76 +++++++++--------
 .../org/apache/spark/sql/DataFrameWriter.scala     | 11 +--
 .../org/apache/spark/sql/DataFrameWriterV2.scala   |  8 +-
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 13 +--
 .../spark/sql/execution/SparkSqlParser.scala       | 19 ++++-
 .../datasources/v2/DataSourceV2Strategy.scala      | 13 +--
 .../apache/spark/sql/internal/CatalogImpl.scala    | 12 ++-
 .../spark/sql/streaming/DataStreamWriter.scala     |  5 +-
 .../sql/TableOptionsConstantFoldingSuite.scala     | 99 ++++++++++++++++++++++
 .../connector/V2CommandsCaseSensitivitySuite.scala | 14 ++-
 21 files changed, 433 insertions(+), 129 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 7b39ab7266c..a12a8000870 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1389,6 +1389,11 @@
           "<statement> with multiple part function name(<funcName>) is not allowed."
         ]
       },
+      "OPTION_IS_INVALID" : {
+        "message" : [
+          "option or property key <key> is invalid; only <supported> are supported"
+        ]
+      },
       "REPETITIVE_WINDOW_DEFINITION" : {
         "message" : [
           "The definition of window <windowName> is repetitive."
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 89100f2aeec..c7b238bfd2c 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -374,7 +374,7 @@ tableProvider
     ;
 
 createTableClauses
-    :((OPTIONS options=propertyList) |
+    :((OPTIONS options=expressionPropertyList) |
      (PARTITIONED BY partitioning=partitionFieldList) |
      skewSpec |
      bucketSpec |
@@ -405,6 +405,14 @@ propertyValue
     | stringLit
     ;
 
+expressionPropertyList
+    : LEFT_PAREN expressionProperty (COMMA expressionProperty)* RIGHT_PAREN
+    ;
+
+expressionProperty
+    : key=propertyKey (EQ? value=expression)?
+    ;
+
 constantList
     : LEFT_PAREN constant (COMMA constant)* RIGHT_PAREN
     ;
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index aa1b9d0e8fd..901ae243225 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -298,6 +298,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
       ExtractGenerator ::
       ResolveGenerate ::
       ResolveFunctions ::
+      ResolveTableSpec ::
       ResolveAliases ::
       ResolveSubquery ::
       ResolveSubqueryColumnAliases ::
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
new file mode 100644
index 00000000000..d4b0a8d25e0
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkThrowable
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.{ArrayType, MapType, StructType}
+
+/**
+ * This object is responsible for processing unresolved table specifications in commands with
+ * OPTIONS lists. The parser produces such lists as maps from strings to unresolved expressions.
+ * After otherwise resolving such expressions in the analyzer, here we convert them to resolved
+ * table specifications wherein these OPTIONS list values are represented as strings instead, for
+ * convenience.
+ */
+object ResolveTableSpec extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperatorsWithPruning(_.containsAnyPattern(COMMAND), ruleId) {
+      case t: CreateTable =>
+        resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+      case t: CreateTableAsSelect =>
+        resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+      case t: ReplaceTable =>
+        resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+      case t: ReplaceTableAsSelect =>
+        resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+    }
+  }
+
+  /** Helper method to resolve the table specification within a logical plan. */
+  private def resolveTableSpec(
+      input: LogicalPlan, tableSpec: TableSpec, optionsListExpressions: OptionsListExpressions,
+      withNewSpec: TableSpec => LogicalPlan): LogicalPlan = tableSpec match {
+    case u: UnresolvedTableSpec if optionsListExpressions.allOptionsResolved =>
+      val newOptions: Seq[(String, String)] = optionsListExpressions.options.map {
+        case (key: String, null) =>
+          (key, null)
+        case (key: String, value: Expression) =>
+          val newValue: String = try {
+            val dt = value.dataType
+            value match {
+              case Literal(null, _) =>
+                null
+              case _
+                if dt.isInstanceOf[ArrayType] ||
+                  dt.isInstanceOf[StructType] ||
+                  dt.isInstanceOf[MapType] =>
+                throw QueryCompilationErrors.optionMustBeConstant(key)
+              case _ =>
+                val result = value.eval()
+                Literal(result, dt).toString
+            }
+          } catch {
+            case _: SparkThrowable | _: java.lang.RuntimeException =>
+              throw QueryCompilationErrors.optionMustBeConstant(key)
+          }
+          (key, newValue)
+      }
+      val newTableSpec = ResolvedTableSpec(
+        properties = u.properties,
+        provider = u.provider,
+        options = newOptions.toMap,
+        location = u.location,
+        comment = u.comment,
+        serde = u.serde,
+        external = u.external)
+      withNewSpec(newTableSpec)
+    case _ =>
+      input
+  }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index f4170860c24..2156ae4d51f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, con
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces, TableCatalog}
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
-import org.apache.spark.sql.errors.QueryParsingErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryParsingErrors}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
@@ -3343,6 +3343,20 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     }
   }
 
+  /**
+   * Parse a key-value map from an [[ExpressionPropertyListContext]], assuming all values are
+   * specified.
+   */
+  override def visitExpressionPropertyList(
+      ctx: ExpressionPropertyListContext): OptionsListExpressions = {
+    val options = ctx.expressionProperty.asScala.map { property =>
+      val key: String = visitPropertyKey(property.key)
+      val value: Expression = Option(property.value).map(expression).getOrElse(null)
+      key -> value
+    }.toSeq
+    OptionsListExpressions(options)
+  }
+
   override def visitStringLit(ctx: StringLitContext): Token = {
     if (ctx != null) {
       if (ctx.STRING_LITERAL != null) {
@@ -3377,7 +3391,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
    */
   type TableClauses = (
       Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
-      Map[String, String], Option[String], Option[String], Option[SerdeInfo])
+      OptionsListExpressions, Option[String], Option[String], Option[SerdeInfo])
 
   /**
    * Validate a create table statement and return the [[TableIdentifier]].
@@ -3637,8 +3651,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
         ctx.EXTENDED != null)
     }
 
-  def cleanTableProperties(
-      ctx: ParserRuleContext, properties: Map[String, String]): Map[String, String] = {
+  def cleanTableProperties[ValueType](
+      ctx: ParserRuleContext, properties: Map[String, ValueType]): Map[String, ValueType] = {
     import TableCatalog._
     val legacyOn = conf.getConf(SQLConf.LEGACY_PROPERTY_NON_RESERVED)
     properties.filter {
@@ -3672,18 +3686,26 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
 
   def cleanTableOptions(
       ctx: ParserRuleContext,
-      options: Map[String, String],
-      location: Option[String]): (Map[String, String], Option[String]) = {
+      options: OptionsListExpressions,
+      location: Option[String]): (OptionsListExpressions, Option[String]) = {
     var path = location
-    val filtered = cleanTableProperties(ctx, options).filter {
-      case (k, v) if k.equalsIgnoreCase("path") && path.nonEmpty =>
-        throw QueryParsingErrors.duplicatedTablePathsFoundError(path.get, v, ctx)
-      case (k, v) if k.equalsIgnoreCase("path") =>
-        path = Some(v)
+    val filtered = cleanTableProperties(ctx, options.options.toMap).filter {
+      case (key, value) if key.equalsIgnoreCase("path") =>
+        val newValue: String =
+          if (value == null) {
+            ""
+          } else value match {
+            case Literal(_, _: StringType) => value.toString
+            case _ => throw QueryCompilationErrors.optionMustBeLiteralString(key)
+          }
+        if (path.nonEmpty) {
+          throw QueryParsingErrors.duplicatedTablePathsFoundError(path.get, newValue, ctx)
+        }
+        path = Some(newValue)
         false
       case _ => true
     }
-    (filtered, path)
+    (OptionsListExpressions(filtered.toSeq), path)
   }
 
   /**
@@ -3841,7 +3863,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
     val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
     val cleanedProperties = cleanTableProperties(ctx, properties)
-    val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
+    val options = Option(ctx.options).map(visitExpressionPropertyList)
+      .getOrElse(OptionsListExpressions(Seq.empty))
     val location = visitLocationSpecList(ctx.locationSpec())
     val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options, location)
     val comment = visitCommentSpecList(ctx.commentSpec())
@@ -3921,8 +3944,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     val columns = Option(ctx.createOrReplaceTableColTypeList())
       .map(visitCreateOrReplaceTableColTypeList).getOrElse(Nil)
     val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
-    val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
-      visitCreateTableClauses(ctx.createTableClauses())
+    val (partTransforms, partCols, bucketSpec, properties, options, location,
+      comment, serdeInfo) = visitCreateTableClauses(ctx.createTableClauses())
 
     if (provider.isDefined && serdeInfo.isDefined) {
       operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
@@ -3936,7 +3959,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
 
     val partitioning =
       partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform)
-    val tableSpec = TableSpec(properties, provider, options, location, comment,
+    val tableSpec = UnresolvedTableSpec(properties, provider, location, comment,
       serdeInfo, external)
 
     Option(ctx.query).map(plan) match {
@@ -3953,14 +3976,15 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
 
       case Some(query) =>
         CreateTableAsSelect(withIdentClause(identifierContext, UnresolvedIdentifier(_)),
-          partitioning, query, tableSpec, Map.empty, ifNotExists)
+          partitioning, query, tableSpec, Map.empty, ifNotExists, optionsListExpressions = options)
 
       case _ =>
         // Note: table schema includes both the table columns list and the partition columns
         // with data type.
         val schema = StructType(columns ++ partCols)
         CreateTable(withIdentClause(identifierContext, UnresolvedIdentifier(_)),
-          schema, partitioning, tableSpec, ignoreIfExists = ifNotExists)
+          schema, partitioning, tableSpec, ignoreIfExists = ifNotExists,
+          optionsListExpressions = options)
     }
   }
 
@@ -4005,8 +4029,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
 
     val partitioning =
       partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform)
-    val tableSpec = TableSpec(properties, provider, options, location, comment,
-      serdeInfo, false)
+    val tableSpec = UnresolvedTableSpec(properties, provider, location, comment,
+      serdeInfo, external = false)
 
     Option(ctx.query).map(plan) match {
       case Some(_) if columns.nonEmpty =>
@@ -4023,7 +4047,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
       case Some(query) =>
         ReplaceTableAsSelect(
           withIdentClause(ctx.replaceTableHeader.identifierReference(), UnresolvedIdentifier(_)),
-          partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate)
+          partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate,
+          optionsListExpressions = options)
 
       case _ =>
         // Note: table schema includes both the table columns list and the partition columns
@@ -4031,7 +4056,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
         val schema = StructType(columns ++ partCols)
         ReplaceTable(
           withIdentClause(ctx.replaceTableHeader.identifierReference(), UnresolvedIdentifier(_)),
-          schema, partitioning, tableSpec, orCreate = orCreate)
+          schema, partitioning, tableSpec, orCreate = orCreate, optionsListExpressions = options)
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 91b69a3c089..a781bf56b9b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.expressions.filter.Predicate
 import org.apache.spark.sql.connector.write.{DeltaWrite, RowLevelOperation, RowLevelOperationTable, SupportsDelta, Write}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, MetadataBuilder, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType}
 
 // For v2 DML commands, it may end up with the v1 fallback code path and need to build a DataFrame
 // which is required by the DS v1 API. We need to keep the analyzed input query plan to build
@@ -445,7 +445,9 @@ case class CreateTable(
     tableSchema: StructType,
     partitioning: Seq[Transform],
     tableSpec: TableSpec,
-    ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan {
+    ignoreIfExists: Boolean,
+    optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
+  extends UnaryCommand with V2CreateTablePlan {
 
   override def child: LogicalPlan = name
 
@@ -467,7 +469,8 @@ case class CreateTableAsSelect(
     tableSpec: TableSpec,
     writeOptions: Map[String, String],
     ignoreIfExists: Boolean,
-    isAnalyzed: Boolean = false)
+    isAnalyzed: Boolean = false,
+    optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
   extends V2CreateTableAsSelectPlan {
 
   override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
@@ -496,7 +499,9 @@ case class ReplaceTable(
     tableSchema: StructType,
     partitioning: Seq[Transform],
     tableSpec: TableSpec,
-    orCreate: Boolean) extends UnaryCommand with V2CreateTablePlan {
+    orCreate: Boolean,
+    optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
+  extends UnaryCommand with V2CreateTablePlan {
 
   override def child: LogicalPlan = name
 
@@ -521,7 +526,8 @@ case class ReplaceTableAsSelect(
     tableSpec: TableSpec,
     writeOptions: Map[String, String],
     orCreate: Boolean,
-    isAnalyzed: Boolean = false)
+    isAnalyzed: Boolean = false,
+    optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
   extends V2CreateTableAsSelectPlan {
 
   override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
@@ -1382,11 +1388,61 @@ case class DropIndex(
     copy(table = newChild)
 }
 
-case class TableSpec(
+trait TableSpec {
+  def properties: Map[String, String]
+  def provider: Option[String]
+  def location: Option[String]
+  def comment: Option[String]
+  def serde: Option[SerdeInfo]
+  def external: Boolean
+  def withNewLocation(newLocation: Option[String]): TableSpec
+}
+
+case class UnresolvedTableSpec(
+    properties: Map[String, String],
+    provider: Option[String],
+    location: Option[String],
+    comment: Option[String],
+    serde: Option[SerdeInfo],
+    external: Boolean) extends TableSpec {
+  override def withNewLocation(loc: Option[String]): TableSpec = {
+    UnresolvedTableSpec(properties, provider, loc, comment, serde, external)
+  }
+}
+
+/**
+ * This contains the expressions in an OPTIONS list. We store it alongside anywhere the above
+ * UnresolvedTableSpec lives. We use a separate object so that tree traversals in analyzer rules can
+ * descend into the child expressions naturally without extra treatment.
+ */
+case class OptionsListExpressions(options: Seq[(String, Expression)])
+  extends Expression with Unevaluable {
+  override def nullable: Boolean = true
+  override def dataType: DataType = MapType(StringType, StringType)
+  override def children: Seq[Expression] = options.map(_._2)
+
+  override protected def withNewChildrenInternal(
+    newChildren: IndexedSeq[Expression]): Expression = {
+    assert(options.length == newChildren.length)
+    val newOptions = options.zip(newChildren).map {
+      case ((key: String, _), newChild: Expression) =>
+        (key, newChild)
+    }
+    OptionsListExpressions(newOptions)
+  }
+
+  lazy val allOptionsResolved: Boolean = options.map(_._2).forall(_.resolved)
+}
+
+case class ResolvedTableSpec(
     properties: Map[String, String],
     provider: Option[String],
     options: Map[String, String],
     location: Option[String],
     comment: Option[String],
     serde: Option[SerdeInfo],
-    external: Boolean)
+    external: Boolean) extends TableSpec {
+  override def withNewLocation(newLocation: Option[String]): TableSpec = {
+    ResolvedTableSpec(properties, provider, options, newLocation, comment, serde, external)
+  }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index 7fa048c5dc3..caf679f3e7a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -92,6 +92,7 @@ object RuleIdCollection {
       "org.apache.spark.sql.catalyst.analysis.ResolveLateralColumnAliasReference" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveOrderByAll" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveRowLevelCommandAssignments" ::
+      "org.apache.spark.sql.catalyst.analysis.ResolveTableSpec" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveUnion" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveWindowTime" ::
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 75802de1a66..17c89d7e6ae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection._
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical.TableSpec
+import org.apache.spark.sql.catalyst.plans.logical.{ResolvedTableSpec, UnresolvedTableSpec}
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
 import org.apache.spark.sql.catalyst.rules.RuleId
 import org.apache.spark.sql.catalyst.rules.RuleIdCollection
@@ -927,9 +927,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre
       redactMapString(map.asCaseSensitiveMap().asScala, maxFields)
     case map: Map[_, _] =>
       redactMapString(map, maxFields)
-    case t: TableSpec =>
+    case t: ResolvedTableSpec =>
       t.copy(properties = Utils.redact(t.properties).toMap,
         options = Utils.redact(t.options).toMap) :: Nil
+    case t: UnresolvedTableSpec =>
+      t.copy(properties = Utils.redact(t.properties).toMap) :: Nil
     case table: CatalogTable =>
       stringArgsForCatalogTable(table)
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index e5d9720bb02..e92c1ee75a6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{ResolvedTableSpec, SerdeInfo, TableSpec}
 import org.apache.spark.sql.catalyst.util.GeneratedColumn
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.connector.catalog.TableChange._
@@ -376,7 +376,8 @@ private[sql] object CatalogV2Util {
 
   def convertTableProperties(t: TableSpec): Map[String, String] = {
     val props = convertTableProperties(
-      t.properties, t.options, t.serde, t.location, t.comment, t.provider, t.external)
+      t.properties, t.asInstanceOf[ResolvedTableSpec].options, t.serde, t.location, t.comment,
+      t.provider, t.external)
     withDefaultOwnership(props)
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 94b8ee25dd2..4c87b9da1c7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3567,4 +3567,21 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       )
     )
   }
+
+  def optionMustBeLiteralString(key: String): Throwable = {
+    new AnalysisException(
+      errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
+      messageParameters = Map(
+        "key" -> key,
+        "supported" -> "literal strings"))
+  }
+
+  def optionMustBeConstant(key: String, cause: Option[Throwable] = None): Throwable = {
+    new AnalysisException(
+      errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
+      messageParameters = Map(
+        "key" -> key,
+        "supported" -> "constant expressions"),
+      cause = cause)
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
index ba312ddbc49..d1651e536dd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
@@ -20,17 +20,15 @@ package org.apache.spark.sql.catalyst.analysis
 import java.util
 
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, UnresolvedTableSpec}
 import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, Table, TableCapability, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Expressions
 import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class CreateTablePartitioningValidationSuite extends AnalysisTest {
-
+  val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
   test("CreateTableAsSelect: fail missing top-level column") {
-    val tableSpec = TableSpec(Map.empty, None, Map.empty,
-      None, None, None, false)
     val plan = CreateTableAsSelect(
       UnresolvedIdentifier(Array("table_name")),
       Expressions.bucket(4, "does_not_exist") :: Nil,
@@ -46,8 +44,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
   }
 
   test("CreateTableAsSelect: fail missing top-level column nested reference") {
-    val tableSpec = TableSpec(Map.empty, None, Map.empty,
-      None, None, None, false)
     val plan = CreateTableAsSelect(
       UnresolvedIdentifier(Array("table_name")),
       Expressions.bucket(4, "does_not_exist.z") :: Nil,
@@ -63,8 +59,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
   }
 
   test("CreateTableAsSelect: fail missing nested column") {
-    val tableSpec = TableSpec(Map.empty, None, Map.empty,
-      None, None, None, false)
     val plan = CreateTableAsSelect(
       UnresolvedIdentifier(Array("table_name")),
       Expressions.bucket(4, "point.z") :: Nil,
@@ -80,8 +74,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
   }
 
   test("CreateTableAsSelect: fail with multiple errors") {
-    val tableSpec = TableSpec(Map.empty, None, Map.empty,
-      None, None, None, false)
     val plan = CreateTableAsSelect(
       UnresolvedIdentifier(Array("table_name")),
       Expressions.bucket(4, "does_not_exist", "point.z") :: Nil,
@@ -97,8 +89,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
   }
 
   test("CreateTableAsSelect: success with top-level column") {
-    val tableSpec = TableSpec(Map.empty, None, Map.empty,
-      None, None, None, false)
     val plan = CreateTableAsSelect(
       UnresolvedIdentifier(Array("table_name")),
       Expressions.bucket(4, "id") :: Nil,
@@ -111,8 +101,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
   }
 
   test("CreateTableAsSelect: success using nested column") {
-    val tableSpec = TableSpec(Map.empty, None, Map.empty,
-      None, None, None, false)
     val plan = CreateTableAsSelect(
       UnresolvedIdentifier(Array("table_name")),
       Expressions.bucket(4, "point.x") :: Nil,
@@ -125,8 +113,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
   }
 
   test("CreateTableAsSelect: success using complex column") {
-    val tableSpec = TableSpec(Map.empty, None, Map.empty,
-      None, None, None, false)
     val plan = CreateTableAsSelect(
       UnresolvedIdentifier(Array("table_name")),
       Expressions.bucket(4, "point") :: Nil,
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 53635acf0b3..8cfdf411ae9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -22,13 +22,13 @@ import java.util.Locale
 import org.apache.spark.SparkThrowable
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
-import org.apache.spark.sql.catalyst.plans.logical.{TableSpec => LogicalTableSpec, _}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns}
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
 import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder, StringType, StructType, TimestampType}
+import org.apache.spark.sql.types.{Decimal, IntegerType, LongType, MetadataBuilder, StringType, StructType, TimestampType}
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
 class DDLParserSuite extends AnalysisTest {
@@ -57,7 +57,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       Map.empty[String, String],
       Some("parquet"),
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       None,
       None)
@@ -83,7 +83,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq.empty[Transform],
         Map.empty[String, String],
         Some("parquet"),
-        Map.empty[String, String],
+        OptionsListExpressions(Seq.empty),
         None,
         None,
         None),
@@ -103,7 +103,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("a"))),
       Map.empty[String, String],
       Some("parquet"),
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       None,
       None)
@@ -157,7 +157,7 @@ class DDLParserSuite extends AnalysisTest {
           LiteralValue(34, IntegerType)))),
       Map.empty[String, String],
       Some("parquet"),
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       None,
       None)
@@ -179,7 +179,7 @@ class DDLParserSuite extends AnalysisTest {
       List(bucket(5, Array(FieldReference.column("a")), Array(FieldReference.column("b")))),
       Map.empty[String, String],
       Some("parquet"),
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       None,
       None)
@@ -198,7 +198,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       Map.empty[String, String],
       Some("parquet"),
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       Some("abc"),
       None)
@@ -218,7 +218,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       Map("test" -> "test"),
       Some("parquet"),
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       None,
       None)
@@ -236,7 +236,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq.empty[Transform],
         Map.empty[String, String],
         Some("parquet"),
-        Map.empty[String, String],
+        OptionsListExpressions(Seq.empty),
         Some("/tmp/file"),
         None,
         None)
@@ -254,7 +254,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       Map.empty[String, String],
       Some("parquet"),
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       None,
       None)
@@ -272,7 +272,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("part"))),
       Map.empty[String, String],
       None,
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       None,
       None)
@@ -290,7 +290,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("part"))),
       Map.empty[String, String],
       None,
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       None,
       None)
@@ -308,7 +308,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("part"))),
       Map.empty[String, String],
       Some("parquet"),
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       None,
       None)
@@ -381,7 +381,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("part"))),
       Map.empty[String, String],
       None,
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       None,
       Some(SerdeInfo(storedAs = Some("parquet"))))
@@ -406,7 +406,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq(IdentityTransform(FieldReference("part"))),
         Map.empty[String, String],
         None,
-        Map.empty[String, String],
+        OptionsListExpressions(Seq.empty),
         None,
         None,
         Some(SerdeInfo(storedAs = Some(format), serde = Some("customSerde"), serdeProperties = Map(
@@ -463,7 +463,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("part"))),
       Map.empty[String, String],
       None,
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       None,
       Some(SerdeInfo(storedAs = Some("textfile"), serdeProperties = Map(
@@ -514,7 +514,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("part"))),
       Map.empty[String, String],
       None,
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       None,
       Some(SerdeInfo(formatClasses = Some(FormatClasses("inFormat", "outFormat")))))
@@ -537,7 +537,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("part"))),
       Map.empty[String, String],
       None,
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       None,
       Some(SerdeInfo(
@@ -878,9 +878,13 @@ class DDLParserSuite extends AnalysisTest {
           Seq("table_name"),
           Some(new StructType),
           Seq.empty[Transform],
-          Map.empty[String, String],
+          Map.empty,
           Some("json"),
-          Map("a" -> "1", "b" -> "0.1", "c" -> "true"),
+          OptionsListExpressions(
+            Seq(
+              ("a", Literal(1)),
+              ("b", Literal(Decimal(0.1))),
+              ("c" -> Literal(true)))),
           None,
           None,
           None),
@@ -935,7 +939,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq.empty[Transform],
         Map("p1" -> "v1", "p2" -> "v2"),
         Some("parquet"),
-        Map.empty[String, String],
+        OptionsListExpressions(Seq.empty),
         Some("/user/external/page_view"),
         Some("This is the staging page view table"),
         None)
@@ -2472,7 +2476,7 @@ class DDLParserSuite extends AnalysisTest {
       partitioning: Seq[Transform],
       properties: Map[String, String],
       provider: Option[String],
-      options: Map[String, String],
+      options: OptionsListExpressions,
       location: Option[String],
       comment: Option[String],
       serdeInfo: Option[SerdeInfo],
@@ -2488,7 +2492,7 @@ class DDLParserSuite extends AnalysisTest {
             create.partitioning,
             create.tableSpec.properties,
             create.tableSpec.provider,
-            create.tableSpec.options,
+            create.optionsListExpressions,
             create.tableSpec.location,
             create.tableSpec.comment,
             create.tableSpec.serde,
@@ -2500,7 +2504,7 @@ class DDLParserSuite extends AnalysisTest {
             replace.partitioning,
             replace.tableSpec.properties,
             replace.tableSpec.provider,
-            replace.tableSpec.options,
+            replace.optionsListExpressions,
             replace.tableSpec.location,
             replace.tableSpec.comment,
             replace.tableSpec.serde)
@@ -2511,7 +2515,7 @@ class DDLParserSuite extends AnalysisTest {
             ctas.partitioning,
             ctas.tableSpec.properties,
             ctas.tableSpec.provider,
-            ctas.tableSpec.options,
+            ctas.optionsListExpressions,
             ctas.tableSpec.location,
             ctas.tableSpec.comment,
             ctas.tableSpec.serde,
@@ -2523,7 +2527,7 @@ class DDLParserSuite extends AnalysisTest {
             rtas.partitioning,
             rtas.tableSpec.properties,
             rtas.tableSpec.provider,
-            rtas.tableSpec.options,
+            rtas.optionsListExpressions,
             rtas.tableSpec.location,
             rtas.tableSpec.comment,
             rtas.tableSpec.serde)
@@ -2560,7 +2564,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       Map.empty[String, String],
       None,
-      Map.empty[String, String],
+      OptionsListExpressions(Seq.empty),
       None,
       None,
       None)
@@ -2610,8 +2614,8 @@ class DDLParserSuite extends AnalysisTest {
           .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "\"abc\"").build())
     val createTableResult =
       CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn,
-        Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
-          Map.empty[String, String], None, None, None, false), false)
+        Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
+          None, None, None, false), false)
     // Parse the CREATE TABLE statement twice, swapping the order of the NOT NULL and DEFAULT
     // options, to make sure that the parser accepts any ordering of these options.
     comparePlans(parsePlan(
@@ -2623,8 +2627,8 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(parsePlan("REPLACE TABLE my_tab(a INT, " +
       "b STRING NOT NULL DEFAULT \"abc\") USING parquet"),
       ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn,
-        Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
-          Map.empty[String, String], None, None, None, false), false))
+        Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
+          None, None, None, false), false))
     // These ALTER TABLE statements should parse successfully.
     comparePlans(
       parsePlan("ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42"),
@@ -2779,13 +2783,13 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(parsePlan(
       "CREATE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"),
       CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn,
-        Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
-          Map.empty[String, String], None, None, None, false), false))
+        Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
+          None, None, None, false), false))
     comparePlans(parsePlan(
       "REPLACE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"),
       ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn,
-        Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
-          Map.empty[String, String], None, None, None, false), false))
+        Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
+          None, None, None, false), false))
     // Two generation expressions
     checkError(
       exception = parseException("CREATE TABLE my_tab(a INT, " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index da93fdf58e9..a3cb12307fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedIdentifier, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table}
 import org.apache.spark.sql.connector.catalog.TableCapability._
@@ -326,10 +326,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
               val catalog = CatalogV2Util.getTableProviderCatalog(
                 supportsExtract, catalogManager, dsOptions)
 
-              val tableSpec = TableSpec(
+              val tableSpec = UnresolvedTableSpec(
                 properties = Map.empty,
                 provider = Some(source),
-                options = Map.empty,
                 location = extraOptions.get("path"),
                 comment = extraOptions.get(TableCatalog.PROP_COMMENT),
                 serde = None,
@@ -591,10 +590,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         AppendData.byName(v2Relation, df.logicalPlan, extraOptions.toMap)
 
       case (SaveMode.Overwrite, _) =>
-        val tableSpec = TableSpec(
+        val tableSpec = UnresolvedTableSpec(
           properties = Map.empty,
           provider = Some(source),
-          options = Map.empty,
           location = extraOptions.get("path"),
           comment = extraOptions.get(TableCatalog.PROP_COMMENT),
           serde = None,
@@ -611,10 +609,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         // We have a potential race condition here in AppendMode, if the table suddenly gets
         // created between our existence check and physical execution, but this can't be helped
         // in any case.
-        val tableSpec = TableSpec(
+        val tableSpec = UnresolvedTableSpec(
           properties = Map.empty,
           provider = Some(source),
-          options = Map.empty,
           location = extraOptions.get("path"),
           comment = extraOptions.get(TableCatalog.PROP_COMMENT),
           serde = None,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index 270bf0a948e..101dd7ec299 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedIdentifier, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years}
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
 import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.IntegerType
@@ -107,10 +107,9 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
   }
 
   override def create(): Unit = {
-    val tableSpec = TableSpec(
+    val tableSpec = UnresolvedTableSpec(
       properties = properties.toMap,
       provider = provider,
-      options = Map.empty,
       location = None,
       comment = None,
       serde = None,
@@ -196,10 +195,9 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
   }
 
   private def internalReplace(orCreate: Boolean): Unit = {
-    val tableSpec = TableSpec(
+    val tableSpec = UnresolvedTableSpec(
       properties = properties.toMap,
       provider = provider,
-      options = Map.empty,
       location = None,
       comment = None,
       serde = None,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index b2b35b40492..515b0bb90bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -158,9 +158,9 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
 
     // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
     // session catalog and the table provider is not v2.
-    case c @ CreateTable(ResolvedV1Identifier(ident), _, _, _, _) =>
+    case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: ResolvedTableSpec, _, _) =>
       val (storageFormat, provider) = getStorageFormatAndProvider(
-        c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
+        c.tableSpec.provider, tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
         ctas = false)
       if (!isV2Provider(provider)) {
         constructV1TableCmd(None, c.tableSpec, ident, c.tableSchema, c.partitioning,
@@ -169,10 +169,11 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         c
       }
 
-    case c @ CreateTableAsSelect(ResolvedV1Identifier(ident), _, _, _, writeOptions, _, _) =>
+    case c @ CreateTableAsSelect(
+        ResolvedV1Identifier(ident), _, _, tableSpec: ResolvedTableSpec, writeOptions, _, _, _) =>
       val (storageFormat, provider) = getStorageFormatAndProvider(
         c.tableSpec.provider,
-        c.tableSpec.options ++ writeOptions,
+        tableSpec.options ++ writeOptions,
         c.tableSpec.location,
         c.tableSpec.serde,
         ctas = true)
@@ -192,7 +193,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
 
     // For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
     // session catalog and the table provider is not v2.
-    case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) =>
+    case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _, _) =>
       val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
         throw QueryCompilationErrors.unsupportedTableOperationError(
@@ -201,7 +202,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         c
       }
 
-    case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) =>
+    case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _, _) =>
       val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
         throw QueryCompilationErrors.unsupportedTableOperationError(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index e3ae1b83a16..dfe3c67e18b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -29,17 +29,18 @@ import org.antlr.v4.runtime.tree.TerminalNode
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedFunctionName, UnresolvedIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
 import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
 import org.apache.spark.sql.catalyst.util.DateTimeConstants
-import org.apache.spark.sql.errors.QueryParsingErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryParsingErrors}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.types.StringType
 
 /**
  * Concrete parser for Spark SQL statements.
@@ -332,7 +333,19 @@ class SparkSqlAstBuilder extends AstBuilder {
 
       withIdentClause(identCtx, ident => {
         val table = tableIdentifier(ident, "CREATE TEMPORARY VIEW", ctx)
-        val optionsWithLocation = location.map(l => options + ("path" -> l)).getOrElse(options)
+        val optionsList: Map[String, String] =
+          options.options.map { case (key, value) =>
+            val newValue: String =
+              if (value == null) {
+                null
+              } else value match {
+                case Literal(_, _: StringType) => value.toString
+                case _ => throw QueryCompilationErrors.optionMustBeLiteralString(key)
+              }
+            (key, newValue)
+          }.toMap
+        val optionsWithLocation =
+          location.map(l => optionsList + ("path" -> l)).getOrElse(optionsList)
         CreateTempViewUsing(table, schema, replace = false, global = false, provider,
           optionsWithLocation)
       })
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 471a9393b7d..07cce4b931b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -103,8 +103,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
   }
 
   private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
-    tableSpec.copy(
-      location = tableSpec.location.map(makeQualifiedDBObjectPath(_)))
+    tableSpec.withNewLocation(tableSpec.location.map(makeQualifiedDBObjectPath(_)))
   }
 
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
@@ -173,7 +172,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil
 
     case CreateTable(ResolvedIdentifier(catalog, ident), schema, partitioning,
-        tableSpec, ifNotExists) =>
+        tableSpec, ifNotExists, unresolvedOptionsList) =>
       ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident)
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
@@ -185,7 +184,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
         partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
 
     case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, tableSpec,
-        options, ifNotExists, true) =>
+        options, ifNotExists, true, unresolvedOptionsList) =>
       catalog match {
         case staging: StagingTableCatalog =>
           AtomicCreateTableAsSelectExec(staging, ident, parts, query,
@@ -198,7 +197,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
     case RefreshTable(r: ResolvedTable) =>
       RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil
 
-    case ReplaceTable(ResolvedIdentifier(catalog, ident), schema, parts, tableSpec, orCreate) =>
+    case ReplaceTable(
+        ResolvedIdentifier(catalog, ident), schema, parts, tableSpec, orCreate,
+        unresolvedOptionsList) =>
       ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident)
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
@@ -217,7 +218,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       }
 
     case ReplaceTableAsSelect(ResolvedIdentifier(catalog, ident),
-        parts, query, tableSpec, options, orCreate, true) =>
+        parts, query, tableSpec, options, orCreate, true, unresolvedOptionsList) =>
       catalog match {
         case staging: StagingTableCatalog =>
           AtomicReplaceTableAsSelectExec(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 55136442b1a..2aac82a990e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -26,7 +26,8 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, TableSpec, View}
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, OptionsListExpressions, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, UnresolvedTableSpec, View}
 import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper, NamespaceHelper, TransformHelper}
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -661,10 +662,12 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
       None
     }
 
-    val tableSpec = TableSpec(
+    val newOptions = OptionsListExpressions(options.map { case (key, value) =>
+      (key, Literal(value).asInstanceOf[Expression])
+    }.toSeq)
+    val tableSpec = UnresolvedTableSpec(
       properties = Map(),
       provider = Some(source),
-      options = options,
       location = location,
       comment = { if (description.isEmpty) None else Some(description) },
       serde = None,
@@ -675,7 +678,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
       tableSchema = schema,
       partitioning = Seq(),
       tableSpec = tableSpec,
-      ignoreIfExists = false)
+      ignoreIfExists = false,
+      optionsListExpressions = newOptions)
 
     sparkSession.sessionState.executePlan(plan).toRdd
     sparkSession.table(tableName)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 2e8d671ad70..f913faa030d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, UnresolvedTableSpec}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback}
@@ -290,10 +290,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
        * Note, currently the new table creation by this API doesn't fully cover the V2 table.
        * TODO (SPARK-33638): Full support of v2 table creation
        */
-      val tableSpec = TableSpec(
+      val tableSpec = UnresolvedTableSpec(
         Map.empty[String, String],
         Some(source),
-        Map.empty[String, String],
         extraOptions.get("path"),
         None,
         None,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TableOptionsConstantFoldingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TableOptionsConstantFoldingSuite.scala
new file mode 100644
index 00000000000..2fab2d0f1af
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TableOptionsConstantFoldingSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.test.SharedSparkSession
+
+/** These tests exercise passing constant but non-literal OPTIONS lists, and folding them. */
+class TableOptionsConstantFoldingSuite extends QueryTest with SharedSparkSession {
+  val prefix = "create table t (col int) using json options "
+
+  /** Helper method to create a table with a OPTIONS list and then check the resulting value. */
+  def checkOption(createOption: String, expectedValue: String): Unit = {
+    withTable("t") {
+      sql(s"$prefix ('k' = $createOption)")
+      sql("insert into t values (42)")
+      checkAnswer(spark.table("t"), Seq(Row(42)))
+      val actual = spark.table("t")
+        .queryExecution.sparkPlan.asInstanceOf[FileSourceScanExec].relation.options
+      assert(actual.get("k").get == expectedValue)
+    }
+  }
+
+  test("SPARK-43529: Support constant expressions in CREATE/REPLACE TABLE OPTIONS") {
+    checkOption("1 + 2", "3")
+    checkOption("'a' || 'b'", "ab")
+    checkOption("true or false", "true")
+    checkOption("null", null)
+    checkOption("cast('11 23:4:0' as interval day to second)",
+      "INTERVAL '11 23:04:00' DAY TO SECOND")
+    checkOption("date_diff(current_date(), current_date())", "0")
+    checkOption("date_sub(date'2022-02-02', 1)", "2022-02-01")
+    checkOption("timestampadd(microsecond, 5, timestamp'2022-02-28 00:00:00')",
+      "2022-02-28 00:00:00.000005")
+    checkOption("round(cast(2.25 as decimal(5, 3)), 1)", "2.3")
+    // The result of invoking this "ROUND" function call is NULL, since the target decimal type is
+    // too narrow to contain the result of the cast.
+    checkOption("round(cast(2.25 as decimal(3, 3)), 1)", "null")
+
+    // Test some cases where the provided option value is a non-constant or invalid expression.
+    checkError(
+      exception = intercept[AnalysisException](
+        sql(s"$prefix ('k' = 1 + 2 + unresolvedAttribute)")),
+      errorClass = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+      parameters = Map(
+        "objectName" -> "`unresolvedAttribute`"),
+      queryContext = Array(ExpectedContext("", "", 60, 78, "unresolvedAttribute")))
+    checkError(
+      exception = intercept[AnalysisException](
+        sql(s"$prefix ('k' = true or false or unresolvedAttribute)")),
+      errorClass = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+      parameters = Map(
+        "objectName" -> "`unresolvedAttribute`"),
+      queryContext = Array(ExpectedContext("", "", 69, 87, "unresolvedAttribute")))
+    checkError(
+      exception = intercept[AnalysisException](
+        sql(s"$prefix ('k' = cast(array('9', '9') as array<byte>))")),
+      errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
+      parameters = Map(
+        "key" -> "k",
+        "supported" -> "constant expressions"))
+    checkError(
+      exception = intercept[AnalysisException](
+        sql(s"$prefix ('k' = cast(map('9', '9') as map<string, string>))")),
+      errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
+      parameters = Map(
+        "key" -> "k",
+        "supported" -> "constant expressions"))
+    checkError(
+      exception = intercept[AnalysisException](
+        sql(s"$prefix ('k' = raise_error('failure'))")),
+      errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
+      parameters = Map(
+        "key" -> "k",
+        "supported" -> "constant expressions"))
+    checkError(
+      exception = intercept[AnalysisException](
+        sql(s"$prefix ('k' = raise_error('failure'))")),
+      errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
+      parameters = Map(
+        "key" -> "k",
+        "supported" -> "constant expressions"))
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
index a51ac78fdb2..4fe7809162f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.connector
 
 import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition, UnresolvedIdentifier}
-import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, UnresolvedTableSpec}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -51,8 +51,7 @@ class V2CommandsCaseSensitivitySuite
     Seq(true, false).foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
         Seq("ID", "iD").foreach { ref =>
-          val tableSpec = TableSpec(Map.empty, None, Map.empty,
-            None, None, None, false)
+          val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
           val plan = CreateTableAsSelect(
             UnresolvedIdentifier(Array("table_name")),
             Expressions.identity(ref) :: Nil,
@@ -75,8 +74,7 @@ class V2CommandsCaseSensitivitySuite
     Seq(true, false).foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
         Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref =>
-          val tableSpec = TableSpec(Map.empty, None, Map.empty,
-            None, None, None, false)
+          val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
           val plan = CreateTableAsSelect(
             UnresolvedIdentifier(Array("table_name")),
             Expressions.bucket(4, ref) :: Nil,
@@ -100,8 +98,7 @@ class V2CommandsCaseSensitivitySuite
     Seq(true, false).foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
         Seq("ID", "iD").foreach { ref =>
-          val tableSpec = TableSpec(Map.empty, None, Map.empty,
-            None, None, None, false)
+          val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
           val plan = ReplaceTableAsSelect(
             UnresolvedIdentifier(Array("table_name")),
             Expressions.identity(ref) :: Nil,
@@ -124,8 +121,7 @@ class V2CommandsCaseSensitivitySuite
     Seq(true, false).foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
         Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref =>
-          val tableSpec = TableSpec(Map.empty, None, Map.empty,
-            None, None, None, false)
+          val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
           val plan = ReplaceTableAsSelect(
             UnresolvedIdentifier(Array("table_name")),
             Expressions.bucket(4, ref) :: Nil,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org