You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2023/06/12 04:31:29 UTC
[spark] branch master updated: [SPARK-43529][SQL] Support general constant expressions as CREATE/REPLACE TABLE OPTIONS values
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7250941ab5b [SPARK-43529][SQL] Support general constant expressions as CREATE/REPLACE TABLE OPTIONS values
7250941ab5b is described below
commit 7250941ab5b8ea1c1dc720f2b6407404ac7020bb
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Sun Jun 11 21:31:03 2023 -0700
[SPARK-43529][SQL] Support general constant expressions as CREATE/REPLACE TABLE OPTIONS values
### What changes were proposed in this pull request?
This PR updates the SQL compiler to support general constnat expressions in the syntax for CREATE/REPLACE TABLE OPTIONS values, rather than restricting to a few types of literals only.
* The analyzer now checks that the provided expressions are in fact `foldable`, and throws an error message otherwise.
* This error message that users encounter in these cases improves from a general "syntax error at or near <location>" to instead indicate that the syntax is valid, but only constant expressions are supported in these contexts.
### Why are the changes needed?
This makes it easier to provide OPTIONS lists in SQL, supporting use cases like concatenating strings with `||`.
### Does this PR introduce _any_ user-facing change?
Yes, the SQL syntax changes.
### How was this patch tested?
This PR adds new unit test coverage.
Closes #41191 from dtenedor/expression-properties.
Authored-by: Daniel Tenedorio <da...@databricks.com>
Signed-off-by: Gengliang Wang <ge...@apache.org>
---
core/src/main/resources/error/error-classes.json | 5 ++
.../spark/sql/catalyst/parser/SqlBaseParser.g4 | 10 ++-
.../spark/sql/catalyst/analysis/Analyzer.scala | 1 +
.../sql/catalyst/analysis/ResolveTableSpec.scala | 90 ++++++++++++++++++++
.../spark/sql/catalyst/parser/AstBuilder.scala | 69 ++++++++++-----
.../sql/catalyst/plans/logical/v2Commands.scala | 70 +++++++++++++--
.../sql/catalyst/rules/RuleIdCollection.scala | 1 +
.../apache/spark/sql/catalyst/trees/TreeNode.scala | 6 +-
.../sql/connector/catalog/CatalogV2Util.scala | 5 +-
.../spark/sql/errors/QueryCompilationErrors.scala | 17 ++++
.../CreateTablePartitioningValidationSuite.scala | 18 +---
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 76 +++++++++--------
.../org/apache/spark/sql/DataFrameWriter.scala | 11 +--
.../org/apache/spark/sql/DataFrameWriterV2.scala | 8 +-
.../catalyst/analysis/ResolveSessionCatalog.scala | 13 +--
.../spark/sql/execution/SparkSqlParser.scala | 19 ++++-
.../datasources/v2/DataSourceV2Strategy.scala | 13 +--
.../apache/spark/sql/internal/CatalogImpl.scala | 12 ++-
.../spark/sql/streaming/DataStreamWriter.scala | 5 +-
.../sql/TableOptionsConstantFoldingSuite.scala | 99 ++++++++++++++++++++++
.../connector/V2CommandsCaseSensitivitySuite.scala | 14 ++-
21 files changed, 433 insertions(+), 129 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 7b39ab7266c..a12a8000870 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1389,6 +1389,11 @@
"<statement> with multiple part function name(<funcName>) is not allowed."
]
},
+ "OPTION_IS_INVALID" : {
+ "message" : [
+ "option or property key <key> is invalid; only <supported> are supported"
+ ]
+ },
"REPETITIVE_WINDOW_DEFINITION" : {
"message" : [
"The definition of window <windowName> is repetitive."
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 89100f2aeec..c7b238bfd2c 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -374,7 +374,7 @@ tableProvider
;
createTableClauses
- :((OPTIONS options=propertyList) |
+ :((OPTIONS options=expressionPropertyList) |
(PARTITIONED BY partitioning=partitionFieldList) |
skewSpec |
bucketSpec |
@@ -405,6 +405,14 @@ propertyValue
| stringLit
;
+expressionPropertyList
+ : LEFT_PAREN expressionProperty (COMMA expressionProperty)* RIGHT_PAREN
+ ;
+
+expressionProperty
+ : key=propertyKey (EQ? value=expression)?
+ ;
+
constantList
: LEFT_PAREN constant (COMMA constant)* RIGHT_PAREN
;
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index aa1b9d0e8fd..901ae243225 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -298,6 +298,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
ExtractGenerator ::
ResolveGenerate ::
ResolveFunctions ::
+ ResolveTableSpec ::
ResolveAliases ::
ResolveSubquery ::
ResolveSubqueryColumnAliases ::
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
new file mode 100644
index 00000000000..d4b0a8d25e0
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkThrowable
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.{ArrayType, MapType, StructType}
+
+/**
+ * This object is responsible for processing unresolved table specifications in commands with
+ * OPTIONS lists. The parser produces such lists as maps from strings to unresolved expressions.
+ * After otherwise resolving such expressions in the analyzer, here we convert them to resolved
+ * table specifications wherein these OPTIONS list values are represented as strings instead, for
+ * convenience.
+ */
+object ResolveTableSpec extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.resolveOperatorsWithPruning(_.containsAnyPattern(COMMAND), ruleId) {
+ case t: CreateTable =>
+ resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+ case t: CreateTableAsSelect =>
+ resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+ case t: ReplaceTable =>
+ resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+ case t: ReplaceTableAsSelect =>
+ resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s))
+ }
+ }
+
+ /** Helper method to resolve the table specification within a logical plan. */
+ private def resolveTableSpec(
+ input: LogicalPlan, tableSpec: TableSpec, optionsListExpressions: OptionsListExpressions,
+ withNewSpec: TableSpec => LogicalPlan): LogicalPlan = tableSpec match {
+ case u: UnresolvedTableSpec if optionsListExpressions.allOptionsResolved =>
+ val newOptions: Seq[(String, String)] = optionsListExpressions.options.map {
+ case (key: String, null) =>
+ (key, null)
+ case (key: String, value: Expression) =>
+ val newValue: String = try {
+ val dt = value.dataType
+ value match {
+ case Literal(null, _) =>
+ null
+ case _
+ if dt.isInstanceOf[ArrayType] ||
+ dt.isInstanceOf[StructType] ||
+ dt.isInstanceOf[MapType] =>
+ throw QueryCompilationErrors.optionMustBeConstant(key)
+ case _ =>
+ val result = value.eval()
+ Literal(result, dt).toString
+ }
+ } catch {
+ case _: SparkThrowable | _: java.lang.RuntimeException =>
+ throw QueryCompilationErrors.optionMustBeConstant(key)
+ }
+ (key, newValue)
+ }
+ val newTableSpec = ResolvedTableSpec(
+ properties = u.properties,
+ provider = u.provider,
+ options = newOptions.toMap,
+ location = u.location,
+ comment = u.comment,
+ serde = u.serde,
+ external = u.external)
+ withNewSpec(newTableSpec)
+ case _ =>
+ input
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index f4170860c24..2156ae4d51f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, con
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces, TableCatalog}
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
-import org.apache.spark.sql.errors.QueryParsingErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryParsingErrors}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
@@ -3343,6 +3343,20 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
}
}
+ /**
+ * Parse a key-value map from an [[ExpressionPropertyListContext]], assuming all values are
+ * specified.
+ */
+ override def visitExpressionPropertyList(
+ ctx: ExpressionPropertyListContext): OptionsListExpressions = {
+ val options = ctx.expressionProperty.asScala.map { property =>
+ val key: String = visitPropertyKey(property.key)
+ val value: Expression = Option(property.value).map(expression).getOrElse(null)
+ key -> value
+ }.toSeq
+ OptionsListExpressions(options)
+ }
+
override def visitStringLit(ctx: StringLitContext): Token = {
if (ctx != null) {
if (ctx.STRING_LITERAL != null) {
@@ -3377,7 +3391,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
*/
type TableClauses = (
Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
- Map[String, String], Option[String], Option[String], Option[SerdeInfo])
+ OptionsListExpressions, Option[String], Option[String], Option[SerdeInfo])
/**
* Validate a create table statement and return the [[TableIdentifier]].
@@ -3637,8 +3651,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
ctx.EXTENDED != null)
}
- def cleanTableProperties(
- ctx: ParserRuleContext, properties: Map[String, String]): Map[String, String] = {
+ def cleanTableProperties[ValueType](
+ ctx: ParserRuleContext, properties: Map[String, ValueType]): Map[String, ValueType] = {
import TableCatalog._
val legacyOn = conf.getConf(SQLConf.LEGACY_PROPERTY_NON_RESERVED)
properties.filter {
@@ -3672,18 +3686,26 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
def cleanTableOptions(
ctx: ParserRuleContext,
- options: Map[String, String],
- location: Option[String]): (Map[String, String], Option[String]) = {
+ options: OptionsListExpressions,
+ location: Option[String]): (OptionsListExpressions, Option[String]) = {
var path = location
- val filtered = cleanTableProperties(ctx, options).filter {
- case (k, v) if k.equalsIgnoreCase("path") && path.nonEmpty =>
- throw QueryParsingErrors.duplicatedTablePathsFoundError(path.get, v, ctx)
- case (k, v) if k.equalsIgnoreCase("path") =>
- path = Some(v)
+ val filtered = cleanTableProperties(ctx, options.options.toMap).filter {
+ case (key, value) if key.equalsIgnoreCase("path") =>
+ val newValue: String =
+ if (value == null) {
+ ""
+ } else value match {
+ case Literal(_, _: StringType) => value.toString
+ case _ => throw QueryCompilationErrors.optionMustBeLiteralString(key)
+ }
+ if (path.nonEmpty) {
+ throw QueryParsingErrors.duplicatedTablePathsFoundError(path.get, newValue, ctx)
+ }
+ path = Some(newValue)
false
case _ => true
}
- (filtered, path)
+ (OptionsListExpressions(filtered.toSeq), path)
}
/**
@@ -3841,7 +3863,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val cleanedProperties = cleanTableProperties(ctx, properties)
- val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
+ val options = Option(ctx.options).map(visitExpressionPropertyList)
+ .getOrElse(OptionsListExpressions(Seq.empty))
val location = visitLocationSpecList(ctx.locationSpec())
val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options, location)
val comment = visitCommentSpecList(ctx.commentSpec())
@@ -3921,8 +3944,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
val columns = Option(ctx.createOrReplaceTableColTypeList())
.map(visitCreateOrReplaceTableColTypeList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
- val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
- visitCreateTableClauses(ctx.createTableClauses())
+ val (partTransforms, partCols, bucketSpec, properties, options, location,
+ comment, serdeInfo) = visitCreateTableClauses(ctx.createTableClauses())
if (provider.isDefined && serdeInfo.isDefined) {
operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
@@ -3936,7 +3959,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
val partitioning =
partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform)
- val tableSpec = TableSpec(properties, provider, options, location, comment,
+ val tableSpec = UnresolvedTableSpec(properties, provider, location, comment,
serdeInfo, external)
Option(ctx.query).map(plan) match {
@@ -3953,14 +3976,15 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
case Some(query) =>
CreateTableAsSelect(withIdentClause(identifierContext, UnresolvedIdentifier(_)),
- partitioning, query, tableSpec, Map.empty, ifNotExists)
+ partitioning, query, tableSpec, Map.empty, ifNotExists, optionsListExpressions = options)
case _ =>
// Note: table schema includes both the table columns list and the partition columns
// with data type.
val schema = StructType(columns ++ partCols)
CreateTable(withIdentClause(identifierContext, UnresolvedIdentifier(_)),
- schema, partitioning, tableSpec, ignoreIfExists = ifNotExists)
+ schema, partitioning, tableSpec, ignoreIfExists = ifNotExists,
+ optionsListExpressions = options)
}
}
@@ -4005,8 +4029,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
val partitioning =
partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform)
- val tableSpec = TableSpec(properties, provider, options, location, comment,
- serdeInfo, false)
+ val tableSpec = UnresolvedTableSpec(properties, provider, location, comment,
+ serdeInfo, external = false)
Option(ctx.query).map(plan) match {
case Some(_) if columns.nonEmpty =>
@@ -4023,7 +4047,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
case Some(query) =>
ReplaceTableAsSelect(
withIdentClause(ctx.replaceTableHeader.identifierReference(), UnresolvedIdentifier(_)),
- partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate)
+ partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate,
+ optionsListExpressions = options)
case _ =>
// Note: table schema includes both the table columns list and the partition columns
@@ -4031,7 +4056,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
val schema = StructType(columns ++ partCols)
ReplaceTable(
withIdentClause(ctx.replaceTableHeader.identifierReference(), UnresolvedIdentifier(_)),
- schema, partitioning, tableSpec, orCreate = orCreate)
+ schema, partitioning, tableSpec, orCreate = orCreate, optionsListExpressions = options)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 91b69a3c089..a781bf56b9b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.write.{DeltaWrite, RowLevelOperation, RowLevelOperationTable, SupportsDelta, Write}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, MetadataBuilder, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType}
// For v2 DML commands, it may end up with the v1 fallback code path and need to build a DataFrame
// which is required by the DS v1 API. We need to keep the analyzed input query plan to build
@@ -445,7 +445,9 @@ case class CreateTable(
tableSchema: StructType,
partitioning: Seq[Transform],
tableSpec: TableSpec,
- ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan {
+ ignoreIfExists: Boolean,
+ optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
+ extends UnaryCommand with V2CreateTablePlan {
override def child: LogicalPlan = name
@@ -467,7 +469,8 @@ case class CreateTableAsSelect(
tableSpec: TableSpec,
writeOptions: Map[String, String],
ignoreIfExists: Boolean,
- isAnalyzed: Boolean = false)
+ isAnalyzed: Boolean = false,
+ optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
extends V2CreateTableAsSelectPlan {
override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
@@ -496,7 +499,9 @@ case class ReplaceTable(
tableSchema: StructType,
partitioning: Seq[Transform],
tableSpec: TableSpec,
- orCreate: Boolean) extends UnaryCommand with V2CreateTablePlan {
+ orCreate: Boolean,
+ optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
+ extends UnaryCommand with V2CreateTablePlan {
override def child: LogicalPlan = name
@@ -521,7 +526,8 @@ case class ReplaceTableAsSelect(
tableSpec: TableSpec,
writeOptions: Map[String, String],
orCreate: Boolean,
- isAnalyzed: Boolean = false)
+ isAnalyzed: Boolean = false,
+ optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty))
extends V2CreateTableAsSelectPlan {
override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
@@ -1382,11 +1388,61 @@ case class DropIndex(
copy(table = newChild)
}
-case class TableSpec(
+trait TableSpec {
+ def properties: Map[String, String]
+ def provider: Option[String]
+ def location: Option[String]
+ def comment: Option[String]
+ def serde: Option[SerdeInfo]
+ def external: Boolean
+ def withNewLocation(newLocation: Option[String]): TableSpec
+}
+
+case class UnresolvedTableSpec(
+ properties: Map[String, String],
+ provider: Option[String],
+ location: Option[String],
+ comment: Option[String],
+ serde: Option[SerdeInfo],
+ external: Boolean) extends TableSpec {
+ override def withNewLocation(loc: Option[String]): TableSpec = {
+ UnresolvedTableSpec(properties, provider, loc, comment, serde, external)
+ }
+}
+
+/**
+ * This contains the expressions in an OPTIONS list. We store it alongside anywhere the above
+ * UnresolvedTableSpec lives. We use a separate object so that tree traversals in analyzer rules can
+ * descend into the child expressions naturally without extra treatment.
+ */
+case class OptionsListExpressions(options: Seq[(String, Expression)])
+ extends Expression with Unevaluable {
+ override def nullable: Boolean = true
+ override def dataType: DataType = MapType(StringType, StringType)
+ override def children: Seq[Expression] = options.map(_._2)
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): Expression = {
+ assert(options.length == newChildren.length)
+ val newOptions = options.zip(newChildren).map {
+ case ((key: String, _), newChild: Expression) =>
+ (key, newChild)
+ }
+ OptionsListExpressions(newOptions)
+ }
+
+ lazy val allOptionsResolved: Boolean = options.map(_._2).forall(_.resolved)
+}
+
+case class ResolvedTableSpec(
properties: Map[String, String],
provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String],
serde: Option[SerdeInfo],
- external: Boolean)
+ external: Boolean) extends TableSpec {
+ override def withNewLocation(newLocation: Option[String]): TableSpec = {
+ ResolvedTableSpec(properties, provider, options, newLocation, comment, serde, external)
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index 7fa048c5dc3..caf679f3e7a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -92,6 +92,7 @@ object RuleIdCollection {
"org.apache.spark.sql.catalyst.analysis.ResolveLateralColumnAliasReference" ::
"org.apache.spark.sql.catalyst.analysis.ResolveOrderByAll" ::
"org.apache.spark.sql.catalyst.analysis.ResolveRowLevelCommandAssignments" ::
+ "org.apache.spark.sql.catalyst.analysis.ResolveTableSpec" ::
"org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" ::
"org.apache.spark.sql.catalyst.analysis.ResolveUnion" ::
"org.apache.spark.sql.catalyst.analysis.ResolveWindowTime" ::
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 75802de1a66..17c89d7e6ae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical.TableSpec
+import org.apache.spark.sql.catalyst.plans.logical.{ResolvedTableSpec, UnresolvedTableSpec}
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.rules.RuleId
import org.apache.spark.sql.catalyst.rules.RuleIdCollection
@@ -927,9 +927,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre
redactMapString(map.asCaseSensitiveMap().asScala, maxFields)
case map: Map[_, _] =>
redactMapString(map, maxFields)
- case t: TableSpec =>
+ case t: ResolvedTableSpec =>
t.copy(properties = Utils.redact(t.properties).toMap,
options = Utils.redact(t.options).toMap) :: Nil
+ case t: UnresolvedTableSpec =>
+ t.copy(properties = Utils.redact(t.properties).toMap) :: Nil
case table: CatalogTable =>
stringArgsForCatalogTable(table)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index e5d9720bb02..e92c1ee75a6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{ResolvedTableSpec, SerdeInfo, TableSpec}
import org.apache.spark.sql.catalyst.util.GeneratedColumn
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.TableChange._
@@ -376,7 +376,8 @@ private[sql] object CatalogV2Util {
def convertTableProperties(t: TableSpec): Map[String, String] = {
val props = convertTableProperties(
- t.properties, t.options, t.serde, t.location, t.comment, t.provider, t.external)
+ t.properties, t.asInstanceOf[ResolvedTableSpec].options, t.serde, t.location, t.comment,
+ t.provider, t.external)
withDefaultOwnership(props)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 94b8ee25dd2..4c87b9da1c7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3567,4 +3567,21 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
)
)
}
+
+ def optionMustBeLiteralString(key: String): Throwable = {
+ new AnalysisException(
+ errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
+ messageParameters = Map(
+ "key" -> key,
+ "supported" -> "literal strings"))
+ }
+
+ def optionMustBeConstant(key: String, cause: Option[Throwable] = None): Throwable = {
+ new AnalysisException(
+ errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
+ messageParameters = Map(
+ "key" -> key,
+ "supported" -> "constant expressions"),
+ cause = cause)
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
index ba312ddbc49..d1651e536dd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
@@ -20,17 +20,15 @@ package org.apache.spark.sql.catalyst.analysis
import java.util
import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, UnresolvedTableSpec}
import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, Table, TableCapability, TableCatalog}
import org.apache.spark.sql.connector.expressions.Expressions
import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
class CreateTablePartitioningValidationSuite extends AnalysisTest {
-
+ val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
test("CreateTableAsSelect: fail missing top-level column") {
- val tableSpec = TableSpec(Map.empty, None, Map.empty,
- None, None, None, false)
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, "does_not_exist") :: Nil,
@@ -46,8 +44,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
}
test("CreateTableAsSelect: fail missing top-level column nested reference") {
- val tableSpec = TableSpec(Map.empty, None, Map.empty,
- None, None, None, false)
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, "does_not_exist.z") :: Nil,
@@ -63,8 +59,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
}
test("CreateTableAsSelect: fail missing nested column") {
- val tableSpec = TableSpec(Map.empty, None, Map.empty,
- None, None, None, false)
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, "point.z") :: Nil,
@@ -80,8 +74,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
}
test("CreateTableAsSelect: fail with multiple errors") {
- val tableSpec = TableSpec(Map.empty, None, Map.empty,
- None, None, None, false)
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, "does_not_exist", "point.z") :: Nil,
@@ -97,8 +89,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
}
test("CreateTableAsSelect: success with top-level column") {
- val tableSpec = TableSpec(Map.empty, None, Map.empty,
- None, None, None, false)
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, "id") :: Nil,
@@ -111,8 +101,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
}
test("CreateTableAsSelect: success using nested column") {
- val tableSpec = TableSpec(Map.empty, None, Map.empty,
- None, None, None, false)
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, "point.x") :: Nil,
@@ -125,8 +113,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
}
test("CreateTableAsSelect: success using complex column") {
- val tableSpec = TableSpec(Map.empty, None, Map.empty,
- None, None, None, false)
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, "point") :: Nil,
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 53635acf0b3..8cfdf411ae9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -22,13 +22,13 @@ import java.util.Locale
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
-import org.apache.spark.sql.catalyst.plans.logical.{TableSpec => LogicalTableSpec, _}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns}
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder, StringType, StructType, TimestampType}
+import org.apache.spark.sql.types.{Decimal, IntegerType, LongType, MetadataBuilder, StringType, StructType, TimestampType}
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
class DDLParserSuite extends AnalysisTest {
@@ -57,7 +57,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map.empty[String, String],
Some("parquet"),
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
None)
@@ -83,7 +83,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map.empty[String, String],
Some("parquet"),
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
None),
@@ -103,7 +103,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("a"))),
Map.empty[String, String],
Some("parquet"),
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
None)
@@ -157,7 +157,7 @@ class DDLParserSuite extends AnalysisTest {
LiteralValue(34, IntegerType)))),
Map.empty[String, String],
Some("parquet"),
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
None)
@@ -179,7 +179,7 @@ class DDLParserSuite extends AnalysisTest {
List(bucket(5, Array(FieldReference.column("a")), Array(FieldReference.column("b")))),
Map.empty[String, String],
Some("parquet"),
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
None)
@@ -198,7 +198,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map.empty[String, String],
Some("parquet"),
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
Some("abc"),
None)
@@ -218,7 +218,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map("test" -> "test"),
Some("parquet"),
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
None)
@@ -236,7 +236,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map.empty[String, String],
Some("parquet"),
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
Some("/tmp/file"),
None,
None)
@@ -254,7 +254,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map.empty[String, String],
Some("parquet"),
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
None)
@@ -272,7 +272,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
None,
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
None)
@@ -290,7 +290,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
None,
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
None)
@@ -308,7 +308,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
Some("parquet"),
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
None)
@@ -381,7 +381,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
None,
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
Some(SerdeInfo(storedAs = Some("parquet"))))
@@ -406,7 +406,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
None,
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
Some(SerdeInfo(storedAs = Some(format), serde = Some("customSerde"), serdeProperties = Map(
@@ -463,7 +463,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
None,
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
Some(SerdeInfo(storedAs = Some("textfile"), serdeProperties = Map(
@@ -514,7 +514,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
None,
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
Some(SerdeInfo(formatClasses = Some(FormatClasses("inFormat", "outFormat")))))
@@ -537,7 +537,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(IdentityTransform(FieldReference("part"))),
Map.empty[String, String],
None,
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
Some(SerdeInfo(
@@ -878,9 +878,13 @@ class DDLParserSuite extends AnalysisTest {
Seq("table_name"),
Some(new StructType),
Seq.empty[Transform],
- Map.empty[String, String],
+ Map.empty,
Some("json"),
- Map("a" -> "1", "b" -> "0.1", "c" -> "true"),
+ OptionsListExpressions(
+ Seq(
+ ("a", Literal(1)),
+ ("b", Literal(Decimal(0.1))),
+ ("c" -> Literal(true)))),
None,
None,
None),
@@ -935,7 +939,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map("p1" -> "v1", "p2" -> "v2"),
Some("parquet"),
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
Some("/user/external/page_view"),
Some("This is the staging page view table"),
None)
@@ -2472,7 +2476,7 @@ class DDLParserSuite extends AnalysisTest {
partitioning: Seq[Transform],
properties: Map[String, String],
provider: Option[String],
- options: Map[String, String],
+ options: OptionsListExpressions,
location: Option[String],
comment: Option[String],
serdeInfo: Option[SerdeInfo],
@@ -2488,7 +2492,7 @@ class DDLParserSuite extends AnalysisTest {
create.partitioning,
create.tableSpec.properties,
create.tableSpec.provider,
- create.tableSpec.options,
+ create.optionsListExpressions,
create.tableSpec.location,
create.tableSpec.comment,
create.tableSpec.serde,
@@ -2500,7 +2504,7 @@ class DDLParserSuite extends AnalysisTest {
replace.partitioning,
replace.tableSpec.properties,
replace.tableSpec.provider,
- replace.tableSpec.options,
+ replace.optionsListExpressions,
replace.tableSpec.location,
replace.tableSpec.comment,
replace.tableSpec.serde)
@@ -2511,7 +2515,7 @@ class DDLParserSuite extends AnalysisTest {
ctas.partitioning,
ctas.tableSpec.properties,
ctas.tableSpec.provider,
- ctas.tableSpec.options,
+ ctas.optionsListExpressions,
ctas.tableSpec.location,
ctas.tableSpec.comment,
ctas.tableSpec.serde,
@@ -2523,7 +2527,7 @@ class DDLParserSuite extends AnalysisTest {
rtas.partitioning,
rtas.tableSpec.properties,
rtas.tableSpec.provider,
- rtas.tableSpec.options,
+ rtas.optionsListExpressions,
rtas.tableSpec.location,
rtas.tableSpec.comment,
rtas.tableSpec.serde)
@@ -2560,7 +2564,7 @@ class DDLParserSuite extends AnalysisTest {
Seq.empty[Transform],
Map.empty[String, String],
None,
- Map.empty[String, String],
+ OptionsListExpressions(Seq.empty),
None,
None,
None)
@@ -2610,8 +2614,8 @@ class DDLParserSuite extends AnalysisTest {
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "\"abc\"").build())
val createTableResult =
CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn,
- Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
- Map.empty[String, String], None, None, None, false), false)
+ Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
+ None, None, None, false), false)
// Parse the CREATE TABLE statement twice, swapping the order of the NOT NULL and DEFAULT
// options, to make sure that the parser accepts any ordering of these options.
comparePlans(parsePlan(
@@ -2623,8 +2627,8 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(parsePlan("REPLACE TABLE my_tab(a INT, " +
"b STRING NOT NULL DEFAULT \"abc\") USING parquet"),
ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn,
- Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
- Map.empty[String, String], None, None, None, false), false))
+ Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
+ None, None, None, false), false))
// These ALTER TABLE statements should parse successfully.
comparePlans(
parsePlan("ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42"),
@@ -2779,13 +2783,13 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(parsePlan(
"CREATE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"),
CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn,
- Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
- Map.empty[String, String], None, None, None, false), false))
+ Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
+ None, None, None, false), false))
comparePlans(parsePlan(
"REPLACE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"),
ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn,
- Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
- Map.empty[String, String], None, None, None, false), false))
+ Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
+ None, None, None, false), false))
// Two generation expressions
checkError(
exception = parseException("CREATE TABLE my_tab(a INT, " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index da93fdf58e9..a3cb12307fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedIdentifier, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table}
import org.apache.spark.sql.connector.catalog.TableCapability._
@@ -326,10 +326,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val catalog = CatalogV2Util.getTableProviderCatalog(
supportsExtract, catalogManager, dsOptions)
- val tableSpec = TableSpec(
+ val tableSpec = UnresolvedTableSpec(
properties = Map.empty,
provider = Some(source),
- options = Map.empty,
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
serde = None,
@@ -591,10 +590,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
AppendData.byName(v2Relation, df.logicalPlan, extraOptions.toMap)
case (SaveMode.Overwrite, _) =>
- val tableSpec = TableSpec(
+ val tableSpec = UnresolvedTableSpec(
properties = Map.empty,
provider = Some(source),
- options = Map.empty,
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
serde = None,
@@ -611,10 +609,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
// We have a potential race condition here in AppendMode, if the table suddenly gets
// created between our existence check and physical execution, but this can't be helped
// in any case.
- val tableSpec = TableSpec(
+ val tableSpec = UnresolvedTableSpec(
properties = Map.empty,
provider = Some(source),
- options = Map.empty,
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
serde = None,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index 270bf0a948e..101dd7ec299 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedIdentifier, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years}
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.IntegerType
@@ -107,10 +107,9 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
}
override def create(): Unit = {
- val tableSpec = TableSpec(
+ val tableSpec = UnresolvedTableSpec(
properties = properties.toMap,
provider = provider,
- options = Map.empty,
location = None,
comment = None,
serde = None,
@@ -196,10 +195,9 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
}
private def internalReplace(orCreate: Boolean): Unit = {
- val tableSpec = TableSpec(
+ val tableSpec = UnresolvedTableSpec(
properties = properties.toMap,
provider = provider,
- options = Map.empty,
location = None,
comment = None,
serde = None,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index b2b35b40492..515b0bb90bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -158,9 +158,9 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
// session catalog and the table provider is not v2.
- case c @ CreateTable(ResolvedV1Identifier(ident), _, _, _, _) =>
+ case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: ResolvedTableSpec, _, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
- c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
+ c.tableSpec.provider, tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
ctas = false)
if (!isV2Provider(provider)) {
constructV1TableCmd(None, c.tableSpec, ident, c.tableSchema, c.partitioning,
@@ -169,10 +169,11 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
c
}
- case c @ CreateTableAsSelect(ResolvedV1Identifier(ident), _, _, _, writeOptions, _, _) =>
+ case c @ CreateTableAsSelect(
+ ResolvedV1Identifier(ident), _, _, tableSpec: ResolvedTableSpec, writeOptions, _, _, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider,
- c.tableSpec.options ++ writeOptions,
+ tableSpec.options ++ writeOptions,
c.tableSpec.location,
c.tableSpec.serde,
ctas = true)
@@ -192,7 +193,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
// session catalog and the table provider is not v2.
- case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) =>
+ case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _, _) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw QueryCompilationErrors.unsupportedTableOperationError(
@@ -201,7 +202,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
c
}
- case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) =>
+ case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _, _) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw QueryCompilationErrors.unsupportedTableOperationError(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index e3ae1b83a16..dfe3c67e18b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -29,17 +29,18 @@ import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedFunctionName, UnresolvedIdentifier}
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
import org.apache.spark.sql.catalyst.util.DateTimeConstants
-import org.apache.spark.sql.errors.QueryParsingErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryParsingErrors}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.types.StringType
/**
* Concrete parser for Spark SQL statements.
@@ -332,7 +333,19 @@ class SparkSqlAstBuilder extends AstBuilder {
withIdentClause(identCtx, ident => {
val table = tableIdentifier(ident, "CREATE TEMPORARY VIEW", ctx)
- val optionsWithLocation = location.map(l => options + ("path" -> l)).getOrElse(options)
+ val optionsList: Map[String, String] =
+ options.options.map { case (key, value) =>
+ val newValue: String =
+ if (value == null) {
+ null
+ } else value match {
+ case Literal(_, _: StringType) => value.toString
+ case _ => throw QueryCompilationErrors.optionMustBeLiteralString(key)
+ }
+ (key, newValue)
+ }.toMap
+ val optionsWithLocation =
+ location.map(l => optionsList + ("path" -> l)).getOrElse(optionsList)
CreateTempViewUsing(table, schema, replace = false, global = false, provider,
optionsWithLocation)
})
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 471a9393b7d..07cce4b931b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -103,8 +103,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
- tableSpec.copy(
- location = tableSpec.location.map(makeQualifiedDBObjectPath(_)))
+ tableSpec.withNewLocation(tableSpec.location.map(makeQualifiedDBObjectPath(_)))
}
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
@@ -173,7 +172,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil
case CreateTable(ResolvedIdentifier(catalog, ident), schema, partitioning,
- tableSpec, ifNotExists) =>
+ tableSpec, ifNotExists, unresolvedOptionsList) =>
ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident)
val newSchema: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
@@ -185,7 +184,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, tableSpec,
- options, ifNotExists, true) =>
+ options, ifNotExists, true, unresolvedOptionsList) =>
catalog match {
case staging: StagingTableCatalog =>
AtomicCreateTableAsSelectExec(staging, ident, parts, query,
@@ -198,7 +197,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case RefreshTable(r: ResolvedTable) =>
RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil
- case ReplaceTable(ResolvedIdentifier(catalog, ident), schema, parts, tableSpec, orCreate) =>
+ case ReplaceTable(
+ ResolvedIdentifier(catalog, ident), schema, parts, tableSpec, orCreate,
+ unresolvedOptionsList) =>
ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident)
val newSchema: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
@@ -217,7 +218,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
case ReplaceTableAsSelect(ResolvedIdentifier(catalog, ident),
- parts, query, tableSpec, options, orCreate, true) =>
+ parts, query, tableSpec, options, orCreate, true, unresolvedOptionsList) =>
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableAsSelectExec(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 55136442b1a..2aac82a990e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -26,7 +26,8 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, TableSpec, View}
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, OptionsListExpressions, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, UnresolvedTableSpec, View}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper, NamespaceHelper, TransformHelper}
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -661,10 +662,12 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
None
}
- val tableSpec = TableSpec(
+ val newOptions = OptionsListExpressions(options.map { case (key, value) =>
+ (key, Literal(value).asInstanceOf[Expression])
+ }.toSeq)
+ val tableSpec = UnresolvedTableSpec(
properties = Map(),
provider = Some(source),
- options = options,
location = location,
comment = { if (description.isEmpty) None else Some(description) },
serde = None,
@@ -675,7 +678,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
tableSchema = schema,
partitioning = Seq(),
tableSpec = tableSpec,
- ignoreIfExists = false)
+ ignoreIfExists = false,
+ optionsListExpressions = newOptions)
sparkSession.sessionState.executePlan(plan).toRdd
sparkSession.table(tableName)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 2e8d671ad70..f913faa030d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, UnresolvedTableSpec}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback}
@@ -290,10 +290,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
* Note, currently the new table creation by this API doesn't fully cover the V2 table.
* TODO (SPARK-33638): Full support of v2 table creation
*/
- val tableSpec = TableSpec(
+ val tableSpec = UnresolvedTableSpec(
Map.empty[String, String],
Some(source),
- Map.empty[String, String],
extraOptions.get("path"),
None,
None,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TableOptionsConstantFoldingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TableOptionsConstantFoldingSuite.scala
new file mode 100644
index 00000000000..2fab2d0f1af
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TableOptionsConstantFoldingSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.test.SharedSparkSession
+
+/** These tests exercise passing constant but non-literal OPTIONS lists, and folding them. */
+class TableOptionsConstantFoldingSuite extends QueryTest with SharedSparkSession {
+ val prefix = "create table t (col int) using json options "
+
+ /** Helper method to create a table with a OPTIONS list and then check the resulting value. */
+ def checkOption(createOption: String, expectedValue: String): Unit = {
+ withTable("t") {
+ sql(s"$prefix ('k' = $createOption)")
+ sql("insert into t values (42)")
+ checkAnswer(spark.table("t"), Seq(Row(42)))
+ val actual = spark.table("t")
+ .queryExecution.sparkPlan.asInstanceOf[FileSourceScanExec].relation.options
+ assert(actual.get("k").get == expectedValue)
+ }
+ }
+
+ test("SPARK-43529: Support constant expressions in CREATE/REPLACE TABLE OPTIONS") {
+ checkOption("1 + 2", "3")
+ checkOption("'a' || 'b'", "ab")
+ checkOption("true or false", "true")
+ checkOption("null", null)
+ checkOption("cast('11 23:4:0' as interval day to second)",
+ "INTERVAL '11 23:04:00' DAY TO SECOND")
+ checkOption("date_diff(current_date(), current_date())", "0")
+ checkOption("date_sub(date'2022-02-02', 1)", "2022-02-01")
+ checkOption("timestampadd(microsecond, 5, timestamp'2022-02-28 00:00:00')",
+ "2022-02-28 00:00:00.000005")
+ checkOption("round(cast(2.25 as decimal(5, 3)), 1)", "2.3")
+ // The result of invoking this "ROUND" function call is NULL, since the target decimal type is
+ // too narrow to contain the result of the cast.
+ checkOption("round(cast(2.25 as decimal(3, 3)), 1)", "null")
+
+ // Test some cases where the provided option value is a non-constant or invalid expression.
+ checkError(
+ exception = intercept[AnalysisException](
+ sql(s"$prefix ('k' = 1 + 2 + unresolvedAttribute)")),
+ errorClass = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+ parameters = Map(
+ "objectName" -> "`unresolvedAttribute`"),
+ queryContext = Array(ExpectedContext("", "", 60, 78, "unresolvedAttribute")))
+ checkError(
+ exception = intercept[AnalysisException](
+ sql(s"$prefix ('k' = true or false or unresolvedAttribute)")),
+ errorClass = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+ parameters = Map(
+ "objectName" -> "`unresolvedAttribute`"),
+ queryContext = Array(ExpectedContext("", "", 69, 87, "unresolvedAttribute")))
+ checkError(
+ exception = intercept[AnalysisException](
+ sql(s"$prefix ('k' = cast(array('9', '9') as array<byte>))")),
+ errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
+ parameters = Map(
+ "key" -> "k",
+ "supported" -> "constant expressions"))
+ checkError(
+ exception = intercept[AnalysisException](
+ sql(s"$prefix ('k' = cast(map('9', '9') as map<string, string>))")),
+ errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
+ parameters = Map(
+ "key" -> "k",
+ "supported" -> "constant expressions"))
+ checkError(
+ exception = intercept[AnalysisException](
+ sql(s"$prefix ('k' = raise_error('failure'))")),
+ errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
+ parameters = Map(
+ "key" -> "k",
+ "supported" -> "constant expressions"))
+ checkError(
+ exception = intercept[AnalysisException](
+ sql(s"$prefix ('k' = raise_error('failure'))")),
+ errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
+ parameters = Map(
+ "key" -> "k",
+ "supported" -> "constant expressions"))
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
index a51ac78fdb2..4fe7809162f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.connector
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition, UnresolvedIdentifier}
-import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, UnresolvedTableSpec}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -51,8 +51,7 @@ class V2CommandsCaseSensitivitySuite
Seq(true, false).foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
Seq("ID", "iD").foreach { ref =>
- val tableSpec = TableSpec(Map.empty, None, Map.empty,
- None, None, None, false)
+ val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.identity(ref) :: Nil,
@@ -75,8 +74,7 @@ class V2CommandsCaseSensitivitySuite
Seq(true, false).foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref =>
- val tableSpec = TableSpec(Map.empty, None, Map.empty,
- None, None, None, false)
+ val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, ref) :: Nil,
@@ -100,8 +98,7 @@ class V2CommandsCaseSensitivitySuite
Seq(true, false).foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
Seq("ID", "iD").foreach { ref =>
- val tableSpec = TableSpec(Map.empty, None, Map.empty,
- None, None, None, false)
+ val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
val plan = ReplaceTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.identity(ref) :: Nil,
@@ -124,8 +121,7 @@ class V2CommandsCaseSensitivitySuite
Seq(true, false).foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref =>
- val tableSpec = TableSpec(Map.empty, None, Map.empty,
- None, None, None, false)
+ val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false)
val plan = ReplaceTableAsSelect(
UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, ref) :: Nil,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org