You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2023/06/13 17:34:56 UTC

[spark] branch master updated: [SPARK-43529][SQL][FOLLOWUP] Code cleanup in UnresolvedTableSpec and related plans

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e94f2a5433 [SPARK-43529][SQL][FOLLOWUP] Code cleanup in UnresolvedTableSpec and related plans
7e94f2a5433 is described below

commit 7e94f2a543371b7d507e7bffab1ce7e44328dda5
Author: Gengliang Wang <ge...@apache.org>
AuthorDate: Tue Jun 13 10:34:19 2023 -0700

    [SPARK-43529][SQL][FOLLOWUP] Code cleanup in UnresolvedTableSpec and related plans
    
    ### What changes were proposed in this pull request?
    
    Follow-up of https://github.com/apache/spark/pull/41191 to clean up the code in UnresolvedTableSpec and related plans:
    * Rename `OptionsListExpressions` as `OptionList`
    * Rename `trait TableSpec` as `TableSpecBase`
    * Rename `ResolvedTableSpec` as `TableSpec`, make sure all the physical plans are using `TableSpec` instead of `TableSpecBase`.
    * Move option list expressions to UnresolvedTableSpec, so that all the specs are in one class.
    * Make UnaryExpression an `UnaryExpression`, so that transforming with `mapExpressions` will transform it and the option list expressions in its child
    * Restore the signatures of class `CreateTable`, `CreateTableAsSelect`, `ReplaceTable` and `ReplaceTableAsSelect`
    
    ### Why are the changes needed?
    
    Make the code implementation simpler
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests
    
    Closes #41549 from gengliangwang/optionsFollowUp.
    
    Authored-by: Gengliang Wang <ge...@apache.org>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../sql/catalyst/analysis/ResolveTableSpec.scala   |  19 ++--
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  28 +++---
 .../sql/catalyst/plans/logical/v2Commands.scala    |  57 ++++++-----
 .../apache/spark/sql/catalyst/trees/TreeNode.scala |   6 +-
 .../sql/connector/catalog/CatalogV2Util.scala      |   4 +-
 .../CreateTablePartitioningValidationSuite.scala   |   5 +-
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 106 +++++++++++----------
 .../org/apache/spark/sql/DataFrameWriter.scala     |   5 +-
 .../org/apache/spark/sql/DataFrameWriterV2.scala   |   4 +-
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  10 +-
 .../datasources/v2/DataSourceV2Strategy.scala      |  11 +--
 .../apache/spark/sql/internal/CatalogImpl.scala    |   8 +-
 .../spark/sql/streaming/DataStreamWriter.scala     |   3 +-
 .../connector/V2CommandsCaseSensitivitySuite.scala |  14 ++-
 14 files changed, 148 insertions(+), 132 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
index d4b0a8d25e0..69a5b13124a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
@@ -36,22 +36,23 @@ object ResolveTableSpec extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.resolveOperatorsWithPruning(_.containsAnyPattern(COMMAND), ruleId) {
       case t: CreateTable =>
-        resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+        resolveTableSpec(t, t.tableSpec, s => t.copy(tableSpec = s))
       case t: CreateTableAsSelect =>
-        resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+        resolveTableSpec(t, t.tableSpec, s => t.copy(tableSpec = s))
       case t: ReplaceTable =>
-        resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+        resolveTableSpec(t, t.tableSpec, s => t.copy(tableSpec = s))
       case t: ReplaceTableAsSelect =>
-        resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+        resolveTableSpec(t, t.tableSpec, s => t.copy(tableSpec = s))
     }
   }
 
   /** Helper method to resolve the table specification within a logical plan. */
   private def resolveTableSpec(
-      input: LogicalPlan, tableSpec: TableSpec, optionsListExpressions: OptionsListExpressions,
-      withNewSpec: TableSpec => LogicalPlan): LogicalPlan = tableSpec match {
-    case u: UnresolvedTableSpec if optionsListExpressions.allOptionsResolved =>
-      val newOptions: Seq[(String, String)] = optionsListExpressions.options.map {
+      input: LogicalPlan,
+      tableSpec: TableSpecBase,
+      withNewSpec: TableSpecBase => LogicalPlan): LogicalPlan = tableSpec match {
+    case u: UnresolvedTableSpec if u.optionExpression.resolved =>
+      val newOptions: Seq[(String, String)] = u.optionExpression.options.map {
         case (key: String, null) =>
           (key, null)
         case (key: String, value: Expression) =>
@@ -75,7 +76,7 @@ object ResolveTableSpec extends Rule[LogicalPlan] {
           }
           (key, newValue)
       }
-      val newTableSpec = ResolvedTableSpec(
+      val newTableSpec = TableSpec(
         properties = u.properties,
         provider = u.provider,
         options = newOptions.toMap,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 2156ae4d51f..a076385573e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3348,13 +3348,13 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
    * specified.
    */
   override def visitExpressionPropertyList(
-      ctx: ExpressionPropertyListContext): OptionsListExpressions = {
+      ctx: ExpressionPropertyListContext): OptionList = {
     val options = ctx.expressionProperty.asScala.map { property =>
       val key: String = visitPropertyKey(property.key)
       val value: Expression = Option(property.value).map(expression).getOrElse(null)
       key -> value
     }.toSeq
-    OptionsListExpressions(options)
+    OptionList(options)
   }
 
   override def visitStringLit(ctx: StringLitContext): Token = {
@@ -3391,7 +3391,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
    */
   type TableClauses = (
       Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
-      OptionsListExpressions, Option[String], Option[String], Option[SerdeInfo])
+      OptionList, Option[String], Option[String], Option[SerdeInfo])
 
   /**
    * Validate a create table statement and return the [[TableIdentifier]].
@@ -3686,8 +3686,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
 
   def cleanTableOptions(
       ctx: ParserRuleContext,
-      options: OptionsListExpressions,
-      location: Option[String]): (OptionsListExpressions, Option[String]) = {
+      options: OptionList,
+      location: Option[String]): (OptionList, Option[String]) = {
     var path = location
     val filtered = cleanTableProperties(ctx, options.options.toMap).filter {
       case (key, value) if key.equalsIgnoreCase("path") =>
@@ -3705,7 +3705,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
         false
       case _ => true
     }
-    (OptionsListExpressions(filtered.toSeq), path)
+    (OptionList(filtered.toSeq), path)
   }
 
   /**
@@ -3864,7 +3864,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
     val cleanedProperties = cleanTableProperties(ctx, properties)
     val options = Option(ctx.options).map(visitExpressionPropertyList)
-      .getOrElse(OptionsListExpressions(Seq.empty))
+      .getOrElse(OptionList(Seq.empty))
     val location = visitLocationSpecList(ctx.locationSpec())
     val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options, location)
     val comment = visitCommentSpecList(ctx.commentSpec())
@@ -3959,7 +3959,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
 
     val partitioning =
       partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform)
-    val tableSpec = UnresolvedTableSpec(properties, provider, location, comment,
+    val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment,
       serdeInfo, external)
 
     Option(ctx.query).map(plan) match {
@@ -3976,15 +3976,14 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
 
       case Some(query) =>
         CreateTableAsSelect(withIdentClause(identifierContext, UnresolvedIdentifier(_)),
-          partitioning, query, tableSpec, Map.empty, ifNotExists, optionsListExpressions = options)
+          partitioning, query, tableSpec, Map.empty, ifNotExists)
 
       case _ =>
         // Note: table schema includes both the table columns list and the partition columns
         // with data type.
         val schema = StructType(columns ++ partCols)
         CreateTable(withIdentClause(identifierContext, UnresolvedIdentifier(_)),
-          schema, partitioning, tableSpec, ignoreIfExists = ifNotExists,
-          optionsListExpressions = options)
+          schema, partitioning, tableSpec, ignoreIfExists = ifNotExists)
     }
   }
 
@@ -4029,7 +4028,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
 
     val partitioning =
       partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform)
-    val tableSpec = UnresolvedTableSpec(properties, provider, location, comment,
+    val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment,
       serdeInfo, external = false)
 
     Option(ctx.query).map(plan) match {
@@ -4047,8 +4046,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
       case Some(query) =>
         ReplaceTableAsSelect(
           withIdentClause(ctx.replaceTableHeader.identifierReference(), UnresolvedIdentifier(_)),
-          partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate,
-          optionsListExpressions = options)
+          partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate)
 
       case _ =>
         // Note: table schema includes both the table columns list and the partition columns
@@ -4056,7 +4054,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
         val schema = StructType(columns ++ partCols)
         ReplaceTable(
           withIdentClause(ctx.replaceTableHeader.identifierReference(), UnresolvedIdentifier(_)),
-          schema, partitioning, tableSpec, orCreate = orCreate, optionsListExpressions = options)
+          schema, partitioning, tableSpec, orCreate = orCreate)
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index a781bf56b9b..bd646b7f692 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AssignmentUtils, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.FunctionResource
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, MetadataAttribute, NamedExpression, Unevaluable, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, MetadataAttribute, NamedExpression, UnaryExpression, Unevaluable, V2ExpressionUtils}
 import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
 import org.apache.spark.sql.catalyst.trees.BinaryLike
 import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, RowDeltaUtils, WriteDeltaProjections}
@@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.expressions.filter.Predicate
 import org.apache.spark.sql.connector.write.{DeltaWrite, RowLevelOperation, RowLevelOperationTable, SupportsDelta, Write}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType}
+import org.apache.spark.util.Utils
 
 // For v2 DML commands, it may end up with the v1 fallback code path and need to build a DataFrame
 // which is required by the DS v1 API. We need to keep the analyzed input query plan to build
@@ -444,9 +445,8 @@ case class CreateTable(
     name: LogicalPlan,
     tableSchema: StructType,
     partitioning: Seq[Transform],
-    tableSpec: TableSpec,
-    ignoreIfExists: Boolean,
-    optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
+    tableSpec: TableSpecBase,
+    ignoreIfExists: Boolean)
   extends UnaryCommand with V2CreateTablePlan {
 
   override def child: LogicalPlan = name
@@ -466,11 +466,10 @@ case class CreateTableAsSelect(
     name: LogicalPlan,
     partitioning: Seq[Transform],
     query: LogicalPlan,
-    tableSpec: TableSpec,
+    tableSpec: TableSpecBase,
     writeOptions: Map[String, String],
     ignoreIfExists: Boolean,
-    isAnalyzed: Boolean = false,
-    optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
+    isAnalyzed: Boolean = false)
   extends V2CreateTableAsSelectPlan {
 
   override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
@@ -498,9 +497,8 @@ case class ReplaceTable(
     name: LogicalPlan,
     tableSchema: StructType,
     partitioning: Seq[Transform],
-    tableSpec: TableSpec,
-    orCreate: Boolean,
-    optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
+    tableSpec: TableSpecBase,
+    orCreate: Boolean)
   extends UnaryCommand with V2CreateTablePlan {
 
   override def child: LogicalPlan = name
@@ -523,11 +521,10 @@ case class ReplaceTableAsSelect(
     name: LogicalPlan,
     partitioning: Seq[Transform],
     query: LogicalPlan,
-    tableSpec: TableSpec,
+    tableSpec: TableSpecBase,
     writeOptions: Map[String, String],
     orCreate: Boolean,
-    isAnalyzed: Boolean = false,
-    optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
+    isAnalyzed: Boolean = false)
   extends V2CreateTableAsSelectPlan {
 
   override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
@@ -1388,25 +1385,34 @@ case class DropIndex(
     copy(table = newChild)
 }
 
-trait TableSpec {
+trait TableSpecBase {
   def properties: Map[String, String]
   def provider: Option[String]
   def location: Option[String]
   def comment: Option[String]
   def serde: Option[SerdeInfo]
   def external: Boolean
-  def withNewLocation(newLocation: Option[String]): TableSpec
 }
 
 case class UnresolvedTableSpec(
     properties: Map[String, String],
     provider: Option[String],
+    optionExpression: OptionList,
     location: Option[String],
     comment: Option[String],
     serde: Option[SerdeInfo],
-    external: Boolean) extends TableSpec {
-  override def withNewLocation(loc: Option[String]): TableSpec = {
-    UnresolvedTableSpec(properties, provider, loc, comment, serde, external)
+    external: Boolean) extends UnaryExpression with Unevaluable with TableSpecBase {
+
+  override def dataType: DataType =
+    throw new UnsupportedOperationException("UnresolvedTableSpec doesn't have a data type")
+
+  override def child: Expression = optionExpression
+
+  override protected def withNewChildInternal(newChild: Expression): Expression =
+    this.copy(optionExpression = newChild.asInstanceOf[OptionList])
+
+  override def simpleString(maxFields: Int): String = {
+    this.copy(properties = Utils.redact(properties).toMap).toString
   }
 }
 
@@ -1415,11 +1421,12 @@ case class UnresolvedTableSpec(
  * UnresolvedTableSpec lives. We use a separate object so that tree traversals in analyzer rules can
  * descend into the child expressions naturally without extra treatment.
  */
-case class OptionsListExpressions(options: Seq[(String, Expression)])
+case class OptionList(options: Seq[(String, Expression)])
   extends Expression with Unevaluable {
   override def nullable: Boolean = true
   override def dataType: DataType = MapType(StringType, StringType)
   override def children: Seq[Expression] = options.map(_._2)
+  override lazy val resolved: Boolean = options.map(_._2).forall(_.resolved)
 
   override protected def withNewChildrenInternal(
     newChildren: IndexedSeq[Expression]): Expression = {
@@ -1428,21 +1435,19 @@ case class OptionsListExpressions(options: Seq[(String, Expression)])
       case ((key: String, _), newChild: Expression) =>
         (key, newChild)
     }
-    OptionsListExpressions(newOptions)
+    OptionList(newOptions)
   }
-
-  lazy val allOptionsResolved: Boolean = options.map(_._2).forall(_.resolved)
 }
 
-case class ResolvedTableSpec(
+case class TableSpec(
     properties: Map[String, String],
     provider: Option[String],
     options: Map[String, String],
     location: Option[String],
     comment: Option[String],
     serde: Option[SerdeInfo],
-    external: Boolean) extends TableSpec {
-  override def withNewLocation(newLocation: Option[String]): TableSpec = {
-    ResolvedTableSpec(properties, provider, options, newLocation, comment, serde, external)
+    external: Boolean) extends TableSpecBase {
+  def withNewLocation(newLocation: Option[String]): TableSpec = {
+    TableSpec(properties, provider, options, newLocation, comment, serde, external)
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 17c89d7e6ae..75802de1a66 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection._
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical.{ResolvedTableSpec, UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.TableSpec
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
 import org.apache.spark.sql.catalyst.rules.RuleId
 import org.apache.spark.sql.catalyst.rules.RuleIdCollection
@@ -927,11 +927,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre
       redactMapString(map.asCaseSensitiveMap().asScala, maxFields)
     case map: Map[_, _] =>
       redactMapString(map, maxFields)
-    case t: ResolvedTableSpec =>
+    case t: TableSpec =>
       t.copy(properties = Utils.redact(t.properties).toMap,
         options = Utils.redact(t.options).toMap) :: Nil
-    case t: UnresolvedTableSpec =>
-      t.copy(properties = Utils.redact(t.properties).toMap) :: Nil
     case table: CatalogTable =>
       stringArgsForCatalogTable(table)
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index e92c1ee75a6..be569b1de9d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{ResolvedTableSpec, SerdeInfo, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
 import org.apache.spark.sql.catalyst.util.GeneratedColumn
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.connector.catalog.TableChange._
@@ -376,7 +376,7 @@ private[sql] object CatalogV2Util {
 
   def convertTableProperties(t: TableSpec): Map[String, String] = {
     val props = convertTableProperties(
-      t.properties, t.asInstanceOf[ResolvedTableSpec].options, t.serde, t.location, t.comment,
+      t.properties, t.options, t.serde, t.location, t.comment,
       t.provider, t.external)
     withDefaultOwnership(props)
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
index d1651e536dd..4158dc9e273 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
@@ -20,14 +20,15 @@ package org.apache.spark.sql.catalyst.analysis
 import java.util
 
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, OptionList, UnresolvedTableSpec}
 import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, Table, TableCapability, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Expressions
 import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class CreateTablePartitioningValidationSuite extends AnalysisTest {
-  val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
+  val tableSpec =
+    UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false)
   test("CreateTableAsSelect: fail missing top-level column") {
     val plan = CreateTableAsSelect(
       UnresolvedIdentifier(Array("table_name")),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 8cfdf411ae9..f07de11727e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -57,7 +57,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       Map.empty[String, String],
       Some("parquet"),
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       None,
       None)
@@ -83,7 +83,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq.empty[Transform],
         Map.empty[String, String],
         Some("parquet"),
-        OptionsListExpressions(Seq.empty),
+        OptionList(Seq.empty),
         None,
         None,
         None),
@@ -103,7 +103,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("a"))),
       Map.empty[String, String],
       Some("parquet"),
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       None,
       None)
@@ -157,7 +157,7 @@ class DDLParserSuite extends AnalysisTest {
           LiteralValue(34, IntegerType)))),
       Map.empty[String, String],
       Some("parquet"),
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       None,
       None)
@@ -179,7 +179,7 @@ class DDLParserSuite extends AnalysisTest {
       List(bucket(5, Array(FieldReference.column("a")), Array(FieldReference.column("b")))),
       Map.empty[String, String],
       Some("parquet"),
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       None,
       None)
@@ -198,7 +198,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       Map.empty[String, String],
       Some("parquet"),
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       Some("abc"),
       None)
@@ -218,7 +218,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       Map("test" -> "test"),
       Some("parquet"),
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       None,
       None)
@@ -236,7 +236,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq.empty[Transform],
         Map.empty[String, String],
         Some("parquet"),
-        OptionsListExpressions(Seq.empty),
+        OptionList(Seq.empty),
         Some("/tmp/file"),
         None,
         None)
@@ -254,7 +254,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       Map.empty[String, String],
       Some("parquet"),
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       None,
       None)
@@ -272,7 +272,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("part"))),
       Map.empty[String, String],
       None,
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       None,
       None)
@@ -290,7 +290,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("part"))),
       Map.empty[String, String],
       None,
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       None,
       None)
@@ -308,7 +308,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("part"))),
       Map.empty[String, String],
       Some("parquet"),
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       None,
       None)
@@ -381,7 +381,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("part"))),
       Map.empty[String, String],
       None,
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       None,
       Some(SerdeInfo(storedAs = Some("parquet"))))
@@ -406,7 +406,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq(IdentityTransform(FieldReference("part"))),
         Map.empty[String, String],
         None,
-        OptionsListExpressions(Seq.empty),
+        OptionList(Seq.empty),
         None,
         None,
         Some(SerdeInfo(storedAs = Some(format), serde = Some("customSerde"), serdeProperties = Map(
@@ -463,7 +463,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("part"))),
       Map.empty[String, String],
       None,
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       None,
       Some(SerdeInfo(storedAs = Some("textfile"), serdeProperties = Map(
@@ -514,7 +514,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("part"))),
       Map.empty[String, String],
       None,
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       None,
       Some(SerdeInfo(formatClasses = Some(FormatClasses("inFormat", "outFormat")))))
@@ -537,7 +537,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq(IdentityTransform(FieldReference("part"))),
       Map.empty[String, String],
       None,
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       None,
       Some(SerdeInfo(
@@ -880,7 +880,7 @@ class DDLParserSuite extends AnalysisTest {
           Seq.empty[Transform],
           Map.empty,
           Some("json"),
-          OptionsListExpressions(
+          OptionList(
             Seq(
               ("a", Literal(1)),
               ("b", Literal(Decimal(0.1))),
@@ -939,7 +939,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq.empty[Transform],
         Map("p1" -> "v1", "p2" -> "v2"),
         Some("parquet"),
-        OptionsListExpressions(Seq.empty),
+        OptionList(Seq.empty),
         Some("/user/external/page_view"),
         Some("This is the staging page view table"),
         None)
@@ -2476,7 +2476,7 @@ class DDLParserSuite extends AnalysisTest {
       partitioning: Seq[Transform],
       properties: Map[String, String],
       provider: Option[String],
-      options: OptionsListExpressions,
+      options: OptionList,
       location: Option[String],
       comment: Option[String],
       serdeInfo: Option[SerdeInfo],
@@ -2486,51 +2486,55 @@ class DDLParserSuite extends AnalysisTest {
     def apply(plan: LogicalPlan): TableSpec = {
       plan match {
         case create: CreateTable =>
+          val tableSpec = create.tableSpec.asInstanceOf[UnresolvedTableSpec]
           TableSpec(
             create.name.asInstanceOf[UnresolvedIdentifier].nameParts,
             Some(create.tableSchema),
             create.partitioning,
-            create.tableSpec.properties,
-            create.tableSpec.provider,
-            create.optionsListExpressions,
-            create.tableSpec.location,
-            create.tableSpec.comment,
-            create.tableSpec.serde,
-            create.tableSpec.external)
+            tableSpec.properties,
+            tableSpec.provider,
+            tableSpec.optionExpression,
+            tableSpec.location,
+            tableSpec.comment,
+            tableSpec.serde,
+            tableSpec.external)
         case replace: ReplaceTable =>
+          val tableSpec = replace.tableSpec.asInstanceOf[UnresolvedTableSpec]
           TableSpec(
             replace.name.asInstanceOf[UnresolvedIdentifier].nameParts,
             Some(replace.tableSchema),
             replace.partitioning,
-            replace.tableSpec.properties,
-            replace.tableSpec.provider,
-            replace.optionsListExpressions,
-            replace.tableSpec.location,
-            replace.tableSpec.comment,
-            replace.tableSpec.serde)
+            tableSpec.properties,
+            tableSpec.provider,
+            tableSpec.optionExpression,
+            tableSpec.location,
+            tableSpec.comment,
+            tableSpec.serde)
         case ctas: CreateTableAsSelect =>
+          val tableSpec = ctas.tableSpec.asInstanceOf[UnresolvedTableSpec]
           TableSpec(
             ctas.name.asInstanceOf[UnresolvedIdentifier].nameParts,
             Some(ctas.query).filter(_.resolved).map(_.schema),
             ctas.partitioning,
-            ctas.tableSpec.properties,
-            ctas.tableSpec.provider,
-            ctas.optionsListExpressions,
-            ctas.tableSpec.location,
-            ctas.tableSpec.comment,
-            ctas.tableSpec.serde,
-            ctas.tableSpec.external)
+            tableSpec.properties,
+            tableSpec.provider,
+            tableSpec.optionExpression,
+            tableSpec.location,
+            tableSpec.comment,
+            tableSpec.serde,
+            tableSpec.external)
         case rtas: ReplaceTableAsSelect =>
+          val tableSpec = rtas.tableSpec.asInstanceOf[UnresolvedTableSpec]
           TableSpec(
             rtas.name.asInstanceOf[UnresolvedIdentifier].nameParts,
             Some(rtas.query).filter(_.resolved).map(_.schema),
             rtas.partitioning,
-            rtas.tableSpec.properties,
-            rtas.tableSpec.provider,
-            rtas.optionsListExpressions,
-            rtas.tableSpec.location,
-            rtas.tableSpec.comment,
-            rtas.tableSpec.serde)
+            tableSpec.properties,
+            tableSpec.provider,
+            tableSpec.optionExpression,
+            tableSpec.location,
+            tableSpec.comment,
+            tableSpec.serde)
         case other =>
           fail(s"Expected to parse Create, CTAS, Replace, or RTAS plan" +
             s" from query, got ${other.getClass.getName}.")
@@ -2564,7 +2568,7 @@ class DDLParserSuite extends AnalysisTest {
       Seq.empty[Transform],
       Map.empty[String, String],
       None,
-      OptionsListExpressions(Seq.empty),
+      OptionList(Seq.empty),
       None,
       None,
       None)
@@ -2615,7 +2619,7 @@ class DDLParserSuite extends AnalysisTest {
     val createTableResult =
       CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn,
         Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
-          None, None, None, false), false)
+         OptionList(Seq.empty), None, None, None, false), false)
     // Parse the CREATE TABLE statement twice, swapping the order of the NOT NULL and DEFAULT
     // options, to make sure that the parser accepts any ordering of these options.
     comparePlans(parsePlan(
@@ -2628,7 +2632,7 @@ class DDLParserSuite extends AnalysisTest {
       "b STRING NOT NULL DEFAULT \"abc\") USING parquet"),
       ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn,
         Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
-          None, None, None, false), false))
+          OptionList(Seq.empty), None, None, None, false), false))
     // These ALTER TABLE statements should parse successfully.
     comparePlans(
       parsePlan("ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42"),
@@ -2784,12 +2788,12 @@ class DDLParserSuite extends AnalysisTest {
       "CREATE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"),
       CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn,
         Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
-          None, None, None, false), false))
+          OptionList(Seq.empty), None, None, None, false), false))
     comparePlans(parsePlan(
       "REPLACE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"),
       ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn,
         Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
-          None, None, None, false), false))
+          OptionList(Seq.empty), None, None, None, false), false))
     // Two generation expressions
     checkError(
       exception = parseException("CREATE TABLE my_tab(a INT, " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index a3cb12307fb..34b7c21db1a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedIdentifier, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OptionList, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table}
 import org.apache.spark.sql.connector.catalog.TableCapability._
@@ -329,6 +329,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
               val tableSpec = UnresolvedTableSpec(
                 properties = Map.empty,
                 provider = Some(source),
+                optionExpression = OptionList(Seq.empty),
                 location = extraOptions.get("path"),
                 comment = extraOptions.get(TableCatalog.PROP_COMMENT),
                 serde = None,
@@ -593,6 +594,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         val tableSpec = UnresolvedTableSpec(
           properties = Map.empty,
           provider = Some(source),
+          optionExpression = OptionList(Seq.empty),
           location = extraOptions.get("path"),
           comment = extraOptions.get(TableCatalog.PROP_COMMENT),
           serde = None,
@@ -612,6 +614,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         val tableSpec = UnresolvedTableSpec(
           properties = Map.empty,
           provider = Some(source),
+          optionExpression = OptionList(Seq.empty),
           location = extraOptions.get("path"),
           comment = extraOptions.get(TableCatalog.PROP_COMMENT),
           serde = None,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index 101dd7ec299..6202fede568 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedIdentifier, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years}
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OptionList, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
 import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.IntegerType
@@ -110,6 +110,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
     val tableSpec = UnresolvedTableSpec(
       properties = properties.toMap,
       provider = provider,
+      optionExpression = OptionList(Seq.empty),
       location = None,
       comment = None,
       serde = None,
@@ -198,6 +199,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
     val tableSpec = UnresolvedTableSpec(
       properties = properties.toMap,
       provider = provider,
+      optionExpression = OptionList(Seq.empty),
       location = None,
       comment = None,
       serde = None,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 515b0bb90bd..fb1e9bcc591 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -158,7 +158,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
 
     // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
     // session catalog and the table provider is not v2.
-    case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: ResolvedTableSpec, _, _) =>
+    case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: TableSpec, _) =>
       val (storageFormat, provider) = getStorageFormatAndProvider(
         c.tableSpec.provider, tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
         ctas = false)
@@ -170,7 +170,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
       }
 
     case c @ CreateTableAsSelect(
-        ResolvedV1Identifier(ident), _, _, tableSpec: ResolvedTableSpec, writeOptions, _, _, _) =>
+        ResolvedV1Identifier(ident), _, _, tableSpec: TableSpec, writeOptions, _, _) =>
       val (storageFormat, provider) = getStorageFormatAndProvider(
         c.tableSpec.provider,
         tableSpec.options ++ writeOptions,
@@ -193,7 +193,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
 
     // For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
     // session catalog and the table provider is not v2.
-    case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _, _) =>
+    case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) =>
       val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
         throw QueryCompilationErrors.unsupportedTableOperationError(
@@ -202,7 +202,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         c
       }
 
-    case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _, _) =>
+    case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) =>
       val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
         throw QueryCompilationErrors.unsupportedTableOperationError(
@@ -444,7 +444,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
 
   private def constructV1TableCmd(
       query: Option[LogicalPlan],
-      tableSpec: TableSpec,
+      tableSpec: TableSpecBase,
       ident: TableIdentifier,
       tableSchema: StructType,
       partitioning: Seq[Transform],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 359a6f43f0b..f7b18e6a7a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -172,7 +172,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil
 
     case CreateTable(ResolvedIdentifier(catalog, ident), schema, partitioning,
-        tableSpec, ifNotExists, unresolvedOptionsList) =>
+        tableSpec: TableSpec, ifNotExists) =>
       ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident)
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
@@ -183,8 +183,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       CreateTableExec(catalog.asTableCatalog, ident, structTypeToV2Columns(newSchema),
         partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
 
-    case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, tableSpec,
-        options, ifNotExists, true, unresolvedOptionsList) =>
+    case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, tableSpec: TableSpec,
+        options, ifNotExists, true) =>
       catalog match {
         case staging: StagingTableCatalog =>
           AtomicCreateTableAsSelectExec(staging, ident, parts, query,
@@ -198,8 +198,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil
 
     case ReplaceTable(
-        ResolvedIdentifier(catalog, ident), schema, parts, tableSpec, orCreate,
-        unresolvedOptionsList) =>
+        ResolvedIdentifier(catalog, ident), schema, parts, tableSpec: TableSpec, orCreate) =>
       ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident)
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
@@ -218,7 +217,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       }
 
     case ReplaceTableAsSelect(ResolvedIdentifier(catalog, ident),
-        parts, query, tableSpec, options, orCreate, true, unresolvedOptionsList) =>
+        parts, query, tableSpec: TableSpec, options, orCreate, true) =>
       catalog match {
         case staging: StagingTableCatalog =>
           AtomicReplaceTableAsSelectExec(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 2aac82a990e..76c89bfa4a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, OptionsListExpressions, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, UnresolvedTableSpec, View}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, OptionList, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, UnresolvedTableSpec, View}
 import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper, NamespaceHelper, TransformHelper}
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -662,12 +662,13 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
       None
     }
 
-    val newOptions = OptionsListExpressions(options.map { case (key, value) =>
+    val newOptions = OptionList(options.map { case (key, value) =>
       (key, Literal(value).asInstanceOf[Expression])
     }.toSeq)
     val tableSpec = UnresolvedTableSpec(
       properties = Map(),
       provider = Some(source),
+      optionExpression = newOptions,
       location = location,
       comment = { if (description.isEmpty) None else Some(description) },
       serde = None,
@@ -678,8 +679,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
       tableSchema = schema,
       partitioning = Seq(),
       tableSpec = tableSpec,
-      ignoreIfExists = false,
-      optionsListExpressions = newOptions)
+      ignoreIfExists = false)
 
     sparkSession.sessionState.executePlan(plan).toRdd
     sparkSession.table(tableName)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index f913faa030d..12977987f08 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, OptionList, UnresolvedTableSpec}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback}
@@ -293,6 +293,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
       val tableSpec = UnresolvedTableSpec(
         Map.empty[String, String],
         Some(source),
+        OptionList(Seq.empty),
         extraOptions.get("path"),
         None,
         None,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
index 4fe7809162f..51d15a666db 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.connector
 
 import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition, UnresolvedIdentifier}
-import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, OptionList, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, UnresolvedTableSpec}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -51,7 +51,8 @@ class V2CommandsCaseSensitivitySuite
     Seq(true, false).foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
         Seq("ID", "iD").foreach { ref =>
-          val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
+          val tableSpec =
+            UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false)
           val plan = CreateTableAsSelect(
             UnresolvedIdentifier(Array("table_name")),
             Expressions.identity(ref) :: Nil,
@@ -74,7 +75,8 @@ class V2CommandsCaseSensitivitySuite
     Seq(true, false).foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
         Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref =>
-          val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
+          val tableSpec =
+            UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false)
           val plan = CreateTableAsSelect(
             UnresolvedIdentifier(Array("table_name")),
             Expressions.bucket(4, ref) :: Nil,
@@ -98,7 +100,8 @@ class V2CommandsCaseSensitivitySuite
     Seq(true, false).foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
         Seq("ID", "iD").foreach { ref =>
-          val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
+          val tableSpec =
+            UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false)
           val plan = ReplaceTableAsSelect(
             UnresolvedIdentifier(Array("table_name")),
             Expressions.identity(ref) :: Nil,
@@ -121,7 +124,8 @@ class V2CommandsCaseSensitivitySuite
     Seq(true, false).foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
         Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref =>
-          val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
+          val tableSpec =
+            UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false)
           val plan = ReplaceTableAsSelect(
             UnresolvedIdentifier(Array("table_name")),
             Expressions.bucket(4, ref) :: Nil,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org