You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2023/06/13 17:34:56 UTC
[spark] branch master updated: [SPARK-43529][SQL][FOLLOWUP] Code cleanup in UnresolvedTableSpec and related plans
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7e94f2a5433 [SPARK-43529][SQL][FOLLOWUP] Code cleanup in UnresolvedTableSpec and related plans
7e94f2a5433 is described below
commit 7e94f2a543371b7d507e7bffab1ce7e44328dda5
Author: Gengliang Wang <ge...@apache.org>
AuthorDate: Tue Jun 13 10:34:19 2023 -0700
[SPARK-43529][SQL][FOLLOWUP] Code cleanup in UnresolvedTableSpec and related plans
### What changes were proposed in this pull request?
Follow-up of https://github.com/apache/spark/pull/41191 to clean up the code in UnresolvedTableSpec and related plans:
* Rename `OptionsListExpressions` as `OptionList`
* Rename `trait TableSpec` as `TableSpecBase`
* Rename `ResolvedTableSpec` as `TableSpec`, make sure all the physical plans are using `TableSpec` instead of `TableSpecBase`.
* Move option list expressions to UnresolvedTableSpec, so that all the specs are in one class.
* Make UnaryExpression an `UnaryExpression`, so that transforming with `mapExpressions` will transform it and the option list expressions in its child
* Restore the signatures of class `CreateTable`, `CreateTableAsSelect`, `ReplaceTable` and `ReplaceTableAsSelect`
### Why are the changes needed?
Make the code implementation simpler
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests
Closes #41549 from gengliangwang/optionsFollowUp.
Authored-by: Gengliang Wang <ge...@apache.org>
Signed-off-by: Gengliang Wang <ge...@apache.org>
---
.../sql/catalyst/analysis/ResolveTableSpec.scala | 19 ++--
.../spark/sql/catalyst/parser/AstBuilder.scala | 28 +++---
.../sql/catalyst/plans/logical/v2Commands.scala | 57 ++++++-----
.../apache/spark/sql/catalyst/trees/TreeNode.scala | 6 +-
.../sql/connector/catalog/CatalogV2Util.scala | 4 +-
.../CreateTablePartitioningValidationSuite.scala | 5 +-
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 106 +++++++++++----------
.../org/apache/spark/sql/DataFrameWriter.scala | 5 +-
.../org/apache/spark/sql/DataFrameWriterV2.scala | 4 +-
.../catalyst/analysis/ResolveSessionCatalog.scala | 10 +-
.../datasources/v2/DataSourceV2Strategy.scala | 11 +--
.../apache/spark/sql/internal/CatalogImpl.scala | 8 +-
.../spark/sql/streaming/DataStreamWriter.scala | 3 +-
.../connector/V2CommandsCaseSensitivitySuite.scala | 14 ++-
14 files changed, 148 insertions(+), 132 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
index d4b0a8d25e0..69a5b13124a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
@@ -36,22 +36,23 @@ object ResolveTableSpec extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperatorsWithPruning(_.containsAnyPattern(COMMAND), ruleId) {
case t: CreateTable =>
- resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+ resolveTableSpec(t, t.tableSpec, s => t.copy(tableSpec = s))
case t: CreateTableAsSelect =>
- resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+ resolveTableSpec(t, t.tableSpec, s => t.copy(tableSpec = s))
case t: ReplaceTable =>
- resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+ resolveTableSpec(t, t.tableSpec, s => t.copy(tableSpec = s))
case t: ReplaceTableAsSelect =>
- resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+ resolveTableSpec(t, t.tableSpec, s => t.copy(tableSpec = s))
}
}
/** Helper method to resolve the table specification within a logical plan. */
private def resolveTableSpec(
- input: LogicalPlan, tableSpec: TableSpec, optionsListExpressions: OptionsListExpressions,
- withNewSpec: TableSpec => LogicalPlan): LogicalPlan = tableSpec match {
- case u: UnresolvedTableSpec if optionsListExpressions.allOptionsResolved =>
- val newOptions: Seq[(String, String)] = optionsListExpressions.options.map {
+ input: LogicalPlan,
+ tableSpec: TableSpecBase,
+ withNewSpec: TableSpecBase => LogicalPlan): LogicalPlan = tableSpec match {
+ case u: UnresolvedTableSpec if u.optionExpression.resolved =>
+ val newOptions: Seq[(String, String)] = u.optionExpression.options.map {
case (key: String, null) =>
(key, null)
case (key: String, value: Expression) =>
@@ -75,7 +76,7 @@ object ResolveTableSpec extends Rule[LogicalPlan] {
}
(key, newValue)
}
- val newTableSpec = ResolvedTableSpec(
+ val newTableSpec = TableSpec(
properties = u.properties,
provider = u.provider,
options = newOptions.toMap,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 2156ae4d51f..a076385573e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3348,13 +3348,13 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
* specified.
*/
override def visitExpressionPropertyList(
- ctx: ExpressionPropertyListContext): OptionsListExpressions = {
+ ctx: ExpressionPropertyListContext): OptionList = {
val options = ctx.expressionProperty.asScala.map { property =>
val key: String = visitPropertyKey(property.key)
val value: Expression = Option(property.value).map(expression).getOrElse(null)
key -> value
}.toSeq
- OptionsListExpressions(options)
+ OptionList(options)
}
override def visitStringLit(ctx: StringLitContext): Token = {
@@ -3391,7 +3391,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
*/
type TableClauses = (
Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
- OptionsListExpressions, Option[String], Option[String], Option[SerdeInfo])
+ OptionList, Option[String], Option[String], Option[SerdeInfo])
/**
* Validate a create table statement and return the [[TableIdentifier]].
@@ -3686,8 +3686,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
def cleanTableOptions(
ctx: ParserRuleContext,
- options: OptionsListExpressions,
- location: Option[String]): (OptionsListExpressions, Option[String]) = {
+ options: OptionList,
+ location: Option[String]): (OptionList, Option[String]) = {
var path = location
val filtered = cleanTableProperties(ctx, options.options.toMap).filter {
case (key, value) if key.equalsIgnoreCase("path") =>
@@ -3705,7 +3705,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
false
case _ => true
}
- (OptionsListExpressions(filtered.toSeq), path)
+ (OptionList(filtered.toSeq), path)
}
/**
@@ -3864,7 +3864,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val cleanedProperties = cleanTableProperties(ctx, properties)
val options = Option(ctx.options).map(visitExpressionPropertyList)
- .getOrElse(OptionsListExpressions(Seq.empty))
+ .getOrElse(OptionList(Seq.empty))
val location = visitLocationSpecList(ctx.locationSpec())
val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options, location)
val comment = visitCommentSpecList(ctx.commentSpec())
@@ -3959,7 +3959,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
val partitioning =
partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform)
- val tableSpec = UnresolvedTableSpec(properties, provider, location, comment,
+ val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment,
serdeInfo, external)
Option(ctx.query).map(plan) match {
@@ -3976,15 +3976,14 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
case Some(query) =>
CreateTableAsSelect(withIdentClause(identifierContext, UnresolvedIdentifier(_)),
- partitioning, query, tableSpec, Map.empty, ifNotExists, optionsListExpressions = options)
+ partitioning, query, tableSpec, Map.empty, ifNotExists)
case _ =>
// Note: table schema includes both the table columns list and the partition columns
// with data type.
val schema = StructType(columns ++ partCols)
CreateTable(withIdentClause(identifierContext, UnresolvedIdentifier(_)),
- schema, partitioning, tableSpec, ignoreIfExists = ifNotExists,
- optionsListExpressions = options)
+ schema, partitioning, tableSpec, ignoreIfExists = ifNotExists)
}
}
@@ -4029,7 +4028,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
val partitioning =
partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform)
- val tableSpec = UnresolvedTableSpec(properties, provider, location, comment,
+ val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment,
serdeInfo, external = false)
Option(ctx.query).map(plan) match {
@@ -4047,8 +4046,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
case Some(query) =>
ReplaceTableAsSelect(
withIdentClause(ctx.replaceTableHeader.identifierReference(), UnresolvedIdentifier(_)),
- partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate,
- optionsListExpressions = options)
+ partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate)
case _ =>
// Note: table schema includes both the table columns list and the partition columns
@@ -4056,7 +4054,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
val schema = StructType(columns ++ partCols)
ReplaceTable(
withIdentClause(ctx.replaceTableHeader.identifierReference(), UnresolvedIdentifier(_)),
- schema, partitioning, tableSpec, orCreate = orCreate, optionsListExpressions = options)
+ schema, partitioning, tableSpec, orCreate = orCreate)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index a781bf56b9b..bd646b7f692 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AssignmentUtils, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.FunctionResource
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, MetadataAttribute, NamedExpression, Unevaluable, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, MetadataAttribute, NamedExpression, UnaryExpression, Unevaluable, V2ExpressionUtils}
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
import org.apache.spark.sql.catalyst.trees.BinaryLike
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, RowDeltaUtils, WriteDeltaProjections}
@@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.write.{DeltaWrite, RowLevelOperation, RowLevelOperationTable, SupportsDelta, Write}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType}
+import org.apache.spark.util.Utils
// For v2 DML commands, it may end up with the v1 fallback code path and need to build a DataFrame
// which is required by the DS v1 API. We need to keep the analyzed input query plan to build
@@ -444,9 +445,8 @@ case class CreateTable(
name: LogicalPlan,
tableSchema: StructType,
partitioning: Seq[Transform],
- tableSpec: TableSpec,
- ignoreIfExists: Boolean,
- optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
+ tableSpec: TableSpecBase,
+ ignoreIfExists: Boolean)
extends UnaryCommand with V2CreateTablePlan {
override def child: LogicalPlan = name
@@ -466,11 +466,10 @@ case class CreateTableAsSelect(
name: LogicalPlan,
partitioning: Seq[Transform],
query: LogicalPlan,
- tableSpec: TableSpec,
+ tableSpec: TableSpecBase,
writeOptions: Map[String, String],
ignoreIfExists: Boolean,
- isAnalyzed: Boolean = false,
- optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
+ isAnalyzed: Boolean = false)
extends V2CreateTableAsSelectPlan {
override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
@@ -498,9 +497,8 @@ case class ReplaceTable(
name: LogicalPlan,
tableSchema: StructType,
partitioning: Seq[Transform],
- tableSpec: TableSpec,
- orCreate: Boolean,
- optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
+ tableSpec: TableSpecBase,
+ orCreate: Boolean)
extends UnaryCommand with V2CreateTablePlan {
override def child: LogicalPlan = name
@@ -523,11 +521,10 @@ case class ReplaceTableAsSelect(
name: LogicalPlan,
partitioning: Seq[Transform],
query: LogicalPlan,
- tableSpec: TableSpec,
+ tableSpec: TableSpecBase,
writeOptions: Map[String, String],
orCreate: Boolean,
- isAnalyzed: Boolean = false,
- optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
+ isAnalyzed: Boolean = false)
extends V2CreateTableAsSelectPlan {
override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
@@ -1388,25 +1385,34 @@ case class DropIndex(
copy(table = newChild)
}
-trait TableSpec {
+trait TableSpecBase {
def properties: Map[String, String]
def provider: Option[String]
def location: Option[String]
def comment: Option[String]
def serde: Option[SerdeInfo]
def external: Boolean
- def withNewLocation(newLocation: Option[String]): TableSpec
}
case class UnresolvedTableSpec(
properties: Map[String, String],
provider: Option[String],
+ optionExpression: OptionList,
location: Option[String],
comment: Option[String],
serde: Option[SerdeInfo],
- external: Boolean) extends TableSpec {
- override def withNewLocation(loc: Option[String]): TableSpec = {
- UnresolvedTableSpec(properties, provider, loc, comment, serde, external)
+ external: Boolean) extends UnaryExpression with Unevaluable with TableSpecBase {
+
+ override def dataType: DataType =
+ throw new UnsupportedOperationException("UnresolvedTableSpec doesn't have a data type")
+
+ override def child: Expression = optionExpression
+
+ override protected def withNewChildInternal(newChild: Expression): Expression =
+ this.copy(optionExpression = newChild.asInstanceOf[OptionList])
+
+ override def simpleString(maxFields: Int): String = {
+ this.copy(properties = Utils.redact(properties).toMap).toString
}
}
@@ -1415,11 +1421,12 @@ case class UnresolvedTableSpec(
* UnresolvedTableSpec lives. We use a separate object so that tree traversals in analyzer rules can
* descend into the child expressions naturally without extra treatment.
*/
-case class OptionsListExpressions(options: Seq[(String, Expression)])
+case class OptionList(options: Seq[(String, Expression)])
extends Expression with Unevaluable {
override def nullable: Boolean = true
override def dataType: DataType = MapType(StringType, StringType)
override def children: Seq[Expression] = options.map(_._2)
+ override lazy val resolved: Boolean = options.map(_._2).forall(_.resolved)
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): Expression = {
@@ -1428,21 +1435,19 @@ case class OptionsListExpressions(options: Seq[(String, Expression)])
case ((key: String, _), newChild: Expression) =>
(key, newChild)
}
- OptionsListExpressions(newOptions)
+ OptionList(newOptions)
}
-
- lazy val allOptionsResolved: Boolean = options.map(_._2).forall(_.resolved)
}
-case class ResolvedTableSpec(
+case class TableSpec(
properties: Map[String, String],
provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String],
serde: Option[SerdeInfo],
- external: Boolean) extends TableSpec {
- override def withNewLocation(newLocation: Option[String]): TableSpec = {
- ResolvedTableSpec(properties, provider, options, newLocation, comment, serde, external)
+ external: Boolean) extends TableSpecBase {
+ def withNewLocation(newLocation: Option[String]): TableSpec = {
+ TableSpec(properties, provider, options, newLocation, comment, serde, external)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 17c89d7e6ae..75802de1a66 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical.{ResolvedTableSpec, UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.TableSpec
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.rules.RuleId
import org.apache.spark.sql.catalyst.rules.RuleIdCollection
@@ -927,11 +927,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre
redactMapString(map.asCaseSensitiveMap().asScala, maxFields)
case map: Map[_, _] =>
redactMapString(map, maxFields)
- case t: ResolvedTableSpec =>
+ case t: TableSpec =>
t.copy(properties = Utils.redact(t.properties).toMap,
options = Utils.redact(t.options).toMap) :: Nil
- case t: UnresolvedTableSpec =>
- t.copy(properties = Utils.redact(t.properties).toMap) :: Nil
case table: CatalogTable =>
stringArgsForCatalogTable(table)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index e92c1ee75a6..be569b1de9d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{ResolvedTableSpec, SerdeInfo, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
import org.apache.spark.sql.catalyst.util.GeneratedColumn
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.TableChange._
@@ -376,7 +376,7 @@ private[sql] object CatalogV2Util {
def convertTableProperties(t: TableSpec): Map[String, String] = {
val props = convertTableProperties(
- t.properties, t.asInstanceOf[ResolvedTableSpec].options, t.serde, t.location, t.comment,
+ t.properties, t.options, t.serde, t.location, t.comment,
t.provider, t.external)
withDefaultOwnership(props)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
index d1651e536dd..4158dc9e273 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
@@ -20,14 +20,15 @@ package org.apache.spark.sql.catalyst.analysis
import java.util
import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, OptionList, UnresolvedTableSpec}
import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, Table, TableCapability, TableCatalog}
import org.apache.spark.sql.connector.expressions.Expressions
import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
class CreateTablePartitioningValidationSuite extends AnalysisTest {
- val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
+ val tableSpec =
+ UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false)
test("CreateTableAsSelect: fail missing top-level column") {
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 8cfdf411ae9..f07de11727e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -57,7 +57,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map.empty[String, String],
Some("parquet"),
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
None)
@@ -83,7 +83,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map.empty[String, String],
Some("parquet"),
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
None),
@@ -103,7 +103,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("a"))),
Map.empty[String, String],
Some("parquet"),
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
None)
@@ -157,7 +157,7 @@ class DDLParserSuite extends AnalysisTest {
LiteralValue(34, IntegerType)))),
Map.empty[String, String],
Some("parquet"),
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
None)
@@ -179,7 +179,7 @@ class DDLParserSuite extends AnalysisTest {
List(bucket(5, Array(FieldReference.column("a")), Array(FieldReference.column("b")))),
Map.empty[String, String],
Some("parquet"),
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
None)
@@ -198,7 +198,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map.empty[String, String],
Some("parquet"),
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
Some("abc"),
None)
@@ -218,7 +218,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map("test" -> "test"),
Some("parquet"),
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
None)
@@ -236,7 +236,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map.empty[String, String],
Some("parquet"),
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
Some("/tmp/file"),
None,
None)
@@ -254,7 +254,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map.empty[String, String],
Some("parquet"),
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
None)
@@ -272,7 +272,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
None,
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
None)
@@ -290,7 +290,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
None,
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
None)
@@ -308,7 +308,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
Some("parquet"),
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
None)
@@ -381,7 +381,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
None,
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
Some(SerdeInfo(storedAs = Some("parquet"))))
@@ -406,7 +406,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
None,
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
Some(SerdeInfo(storedAs = Some(format), serde = Some("customSerde"), serdeProperties = Map(
@@ -463,7 +463,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
None,
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
Some(SerdeInfo(storedAs = Some("textfile"), serdeProperties = Map(
@@ -514,7 +514,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
None,
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
Some(SerdeInfo(formatClasses = Some(FormatClasses("inFormat", "outFormat")))))
@@ -537,7 +537,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
None,
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
Some(SerdeInfo(
@@ -880,7 +880,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map.empty,
Some("json"),
- OptionsListExpressions(
+ OptionList(
Seq(
("a", Literal(1)),
("b", Literal(Decimal(0.1))),
@@ -939,7 +939,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map("p1" -> "v1", "p2" -> "v2"),
Some("parquet"),
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
Some("/user/external/page_view"),
Some("This is the staging page view table"),
None)
@@ -2476,7 +2476,7 @@ class DDLParserSuite extends AnalysisTest {
partitioning: Seq[Transform],
properties: Map[String, String],
provider: Option[String],
- options: OptionsListExpressions,
+ options: OptionList,
location: Option[String],
comment: Option[String],
serdeInfo: Option[SerdeInfo],
@@ -2486,51 +2486,55 @@ class DDLParserSuite extends AnalysisTest {
def apply(plan: LogicalPlan): TableSpec = {
plan match {
case create: CreateTable =>
+ val tableSpec = create.tableSpec.asInstanceOf[UnresolvedTableSpec]
TableSpec(
create.name.asInstanceOf[UnresolvedIdentifier].nameParts,
Some(create.tableSchema),
create.partitioning,
- create.tableSpec.properties,
- create.tableSpec.provider,
- create.optionsListExpressions,
- create.tableSpec.location,
- create.tableSpec.comment,
- create.tableSpec.serde,
- create.tableSpec.external)
+ tableSpec.properties,
+ tableSpec.provider,
+ tableSpec.optionExpression,
+ tableSpec.location,
+ tableSpec.comment,
+ tableSpec.serde,
+ tableSpec.external)
case replace: ReplaceTable =>
+ val tableSpec = replace.tableSpec.asInstanceOf[UnresolvedTableSpec]
TableSpec(
replace.name.asInstanceOf[UnresolvedIdentifier].nameParts,
Some(replace.tableSchema),
replace.partitioning,
- replace.tableSpec.properties,
- replace.tableSpec.provider,
- replace.optionsListExpressions,
- replace.tableSpec.location,
- replace.tableSpec.comment,
- replace.tableSpec.serde)
+ tableSpec.properties,
+ tableSpec.provider,
+ tableSpec.optionExpression,
+ tableSpec.location,
+ tableSpec.comment,
+ tableSpec.serde)
case ctas: CreateTableAsSelect =>
+ val tableSpec = ctas.tableSpec.asInstanceOf[UnresolvedTableSpec]
TableSpec(
ctas.name.asInstanceOf[UnresolvedIdentifier].nameParts,
Some(ctas.query).filter(_.resolved).map(_.schema),
ctas.partitioning,
- ctas.tableSpec.properties,
- ctas.tableSpec.provider,
- ctas.optionsListExpressions,
- ctas.tableSpec.location,
- ctas.tableSpec.comment,
- ctas.tableSpec.serde,
- ctas.tableSpec.external)
+ tableSpec.properties,
+ tableSpec.provider,
+ tableSpec.optionExpression,
+ tableSpec.location,
+ tableSpec.comment,
+ tableSpec.serde,
+ tableSpec.external)
case rtas: ReplaceTableAsSelect =>
+ val tableSpec = rtas.tableSpec.asInstanceOf[UnresolvedTableSpec]
TableSpec(
rtas.name.asInstanceOf[UnresolvedIdentifier].nameParts,
Some(rtas.query).filter(_.resolved).map(_.schema),
rtas.partitioning,
- rtas.tableSpec.properties,
- rtas.tableSpec.provider,
- rtas.optionsListExpressions,
- rtas.tableSpec.location,
- rtas.tableSpec.comment,
- rtas.tableSpec.serde)
+ tableSpec.properties,
+ tableSpec.provider,
+ tableSpec.optionExpression,
+ tableSpec.location,
+ tableSpec.comment,
+ tableSpec.serde)
case other =>
fail(s"Expected to parse Create, CTAS, Replace, or RTAS plan" +
s" from query, got ${other.getClass.getName}.")
@@ -2564,7 +2568,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map.empty[String, String],
None,
- OptionsListExpressions(Seq.empty),
+ OptionList(Seq.empty),
None,
None,
None)
@@ -2615,7 +2619,7 @@ class DDLParserSuite extends AnalysisTest {
val createTableResult =
CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn,
Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
- None, None, None, false), false)
+ OptionList(Seq.empty), None, None, None, false), false)
// Parse the CREATE TABLE statement twice, swapping the order of the NOT NULL and DEFAULT
// options, to make sure that the parser accepts any ordering of these options.
comparePlans(parsePlan(
@@ -2628,7 +2632,7 @@ class DDLParserSuite extends AnalysisTest {
"b STRING NOT NULL DEFAULT \"abc\") USING parquet"),
ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn,
Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
- None, None, None, false), false))
+ OptionList(Seq.empty), None, None, None, false), false))
// These ALTER TABLE statements should parse successfully.
comparePlans(
parsePlan("ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42"),
@@ -2784,12 +2788,12 @@ class DDLParserSuite extends AnalysisTest {
"CREATE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"),
CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn,
Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
- None, None, None, false), false))
+ OptionList(Seq.empty), None, None, None, false), false))
comparePlans(parsePlan(
"REPLACE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"),
ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn,
Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
- None, None, None, false), false))
+ OptionList(Seq.empty), None, None, None, false), false))
// Two generation expressions
checkError(
exception = parseException("CREATE TABLE my_tab(a INT, " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index a3cb12307fb..34b7c21db1a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedIdentifier, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OptionList, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table}
import org.apache.spark.sql.connector.catalog.TableCapability._
@@ -329,6 +329,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val tableSpec = UnresolvedTableSpec(
properties = Map.empty,
provider = Some(source),
+ optionExpression = OptionList(Seq.empty),
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
serde = None,
@@ -593,6 +594,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val tableSpec = UnresolvedTableSpec(
properties = Map.empty,
provider = Some(source),
+ optionExpression = OptionList(Seq.empty),
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
serde = None,
@@ -612,6 +614,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val tableSpec = UnresolvedTableSpec(
properties = Map.empty,
provider = Some(source),
+ optionExpression = OptionList(Seq.empty),
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
serde = None,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index 101dd7ec299..6202fede568 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedIdentifier, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years}
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OptionList, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.IntegerType
@@ -110,6 +110,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
val tableSpec = UnresolvedTableSpec(
properties = properties.toMap,
provider = provider,
+ optionExpression = OptionList(Seq.empty),
location = None,
comment = None,
serde = None,
@@ -198,6 +199,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
val tableSpec = UnresolvedTableSpec(
properties = properties.toMap,
provider = provider,
+ optionExpression = OptionList(Seq.empty),
location = None,
comment = None,
serde = None,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 515b0bb90bd..fb1e9bcc591 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -158,7 +158,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
// session catalog and the table provider is not v2.
- case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: ResolvedTableSpec, _, _) =>
+ case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: TableSpec, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider, tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
ctas = false)
@@ -170,7 +170,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
case c @ CreateTableAsSelect(
- ResolvedV1Identifier(ident), _, _, tableSpec: ResolvedTableSpec, writeOptions, _, _, _) =>
+ ResolvedV1Identifier(ident), _, _, tableSpec: TableSpec, writeOptions, _, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider,
tableSpec.options ++ writeOptions,
@@ -193,7 +193,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
// session catalog and the table provider is not v2.
- case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _, _) =>
+ case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw QueryCompilationErrors.unsupportedTableOperationError(
@@ -202,7 +202,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
c
}
- case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _, _) =>
+ case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw QueryCompilationErrors.unsupportedTableOperationError(
@@ -444,7 +444,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
private def constructV1TableCmd(
query: Option[LogicalPlan],
- tableSpec: TableSpec,
+ tableSpec: TableSpecBase,
ident: TableIdentifier,
tableSchema: StructType,
partitioning: Seq[Transform],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 359a6f43f0b..f7b18e6a7a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -172,7 +172,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil
case CreateTable(ResolvedIdentifier(catalog, ident), schema, partitioning,
- tableSpec, ifNotExists, unresolvedOptionsList) =>
+ tableSpec: TableSpec, ifNotExists) =>
ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident)
val newSchema: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
@@ -183,8 +183,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
CreateTableExec(catalog.asTableCatalog, ident, structTypeToV2Columns(newSchema),
partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
- case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, tableSpec,
- options, ifNotExists, true, unresolvedOptionsList) =>
+ case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, tableSpec: TableSpec,
+ options, ifNotExists, true) =>
catalog match {
case staging: StagingTableCatalog =>
AtomicCreateTableAsSelectExec(staging, ident, parts, query,
@@ -198,8 +198,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil
case ReplaceTable(
- ResolvedIdentifier(catalog, ident), schema, parts, tableSpec, orCreate,
- unresolvedOptionsList) =>
+ ResolvedIdentifier(catalog, ident), schema, parts, tableSpec: TableSpec, orCreate) =>
ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident)
val newSchema: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
@@ -218,7 +217,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
case ReplaceTableAsSelect(ResolvedIdentifier(catalog, ident),
- parts, query, tableSpec, options, orCreate, true, unresolvedOptionsList) =>
+ parts, query, tableSpec: TableSpec, options, orCreate, true) =>
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableAsSelectExec(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 2aac82a990e..76c89bfa4a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, OptionsListExpressions, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, UnresolvedTableSpec, View}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, OptionList, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, UnresolvedTableSpec, View}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper, NamespaceHelper, TransformHelper}
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -662,12 +662,13 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
None
}
- val newOptions = OptionsListExpressions(options.map { case (key, value) =>
+ val newOptions = OptionList(options.map { case (key, value) =>
(key, Literal(value).asInstanceOf[Expression])
}.toSeq)
val tableSpec = UnresolvedTableSpec(
properties = Map(),
provider = Some(source),
+ optionExpression = newOptions,
location = location,
comment = { if (description.isEmpty) None else Some(description) },
serde = None,
@@ -678,8 +679,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
tableSchema = schema,
partitioning = Seq(),
tableSpec = tableSpec,
- ignoreIfExists = false,
- optionsListExpressions = newOptions)
+ ignoreIfExists = false)
sparkSession.sessionState.executePlan(plan).toRdd
sparkSession.table(tableName)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index f913faa030d..12977987f08 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, OptionList, UnresolvedTableSpec}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback}
@@ -293,6 +293,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
val tableSpec = UnresolvedTableSpec(
Map.empty[String, String],
Some(source),
+ OptionList(Seq.empty),
extraOptions.get("path"),
None,
None,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
index 4fe7809162f..51d15a666db 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.connector
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition, UnresolvedIdentifier}
-import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, OptionList, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, UnresolvedTableSpec}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -51,7 +51,8 @@ class V2CommandsCaseSensitivitySuite
Seq(true, false).foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
Seq("ID", "iD").foreach { ref =>
- val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
+ val tableSpec =
+ UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false)
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.identity(ref) :: Nil,
@@ -74,7 +75,8 @@ class V2CommandsCaseSensitivitySuite
Seq(true, false).foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref =>
- val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
+ val tableSpec =
+ UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false)
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, ref) :: Nil,
@@ -98,7 +100,8 @@ class V2CommandsCaseSensitivitySuite
Seq(true, false).foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
Seq("ID", "iD").foreach { ref =>
- val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
+ val tableSpec =
+ UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false)
val plan = ReplaceTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.identity(ref) :: Nil,
@@ -121,7 +124,8 @@ class V2CommandsCaseSensitivitySuite
Seq(true, false).foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref =>
- val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
+ val tableSpec =
+ UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false)
val plan = ReplaceTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, ref) :: Nil,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org