You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2021/08/03 07:44:23 UTC
[spark] branch branch-3.2 updated: [SPARK-36380][SQL] Simplify the
logical plan names for ALTER TABLE ... COLUMN
This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 7c58684 [SPARK-36380][SQL] Simplify the logical plan names for ALTER TABLE ... COLUMN
7c58684 is described below
commit 7c586842d71064169aa77baf666a8566d9ed785e
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Aug 3 10:43:00 2021 +0300
[SPARK-36380][SQL] Simplify the logical plan names for ALTER TABLE ... COLUMN
### What changes were proposed in this pull request?
This a followup of the recent work such as https://github.com/apache/spark/pull/33200
For `ALTER TABLE` commands, the logical plans do not have the common `AlterTable` prefix in the name and just use names like `SetTableLocation`. This PR proposes to follow the same naming rule in `ALTER TABE ... COLUMN` commands.
This PR also moves these AlterTable commands to a individual file and give them a base trait.
### Why are the changes needed?
name simplification
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing test
Closes #33609 from cloud-fan/dsv2.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Max Gekk <ma...@gmail.com>
(cherry picked from commit 7cb9c1c2415a0984515e4d4733f816673e4ae3c8)
Signed-off-by: Max Gekk <ma...@gmail.com>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 12 +-
.../sql/catalyst/analysis/CheckAnalysis.scala | 12 +-
.../spark/sql/catalyst/parser/AstBuilder.scala | 12 +-
.../plans/logical/v2AlterTableCommands.scala | 230 +++++++++++++++++++++
.../sql/catalyst/plans/logical/v2Commands.scala | 193 +----------------
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 54 ++---
.../catalyst/analysis/ResolveSessionCatalog.scala | 10 +-
.../datasources/v2/DataSourceV2Strategy.scala | 26 +--
.../connector/V2CommandsCaseSensitivitySuite.scala | 26 +--
.../execution/command/PlanResolutionSuite.scala | 12 +-
10 files changed, 303 insertions(+), 284 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a9c085a..75fad11a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -269,7 +269,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveRelations ::
ResolveTables ::
ResolvePartitionSpec ::
- ResolveAlterTableColumnCommands ::
+ ResolveAlterTableCommands ::
AddMetadataColumns ::
DeduplicateRelations ::
ResolveReferences ::
@@ -3607,15 +3607,15 @@ class Analyzer(override val catalogManager: CatalogManager)
* Rule to mostly resolve, normalize and rewrite column names based on case sensitivity
* for alter table column commands.
*/
- object ResolveAlterTableColumnCommands extends Rule[LogicalPlan] {
+ object ResolveAlterTableCommands extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
- case a: AlterTableColumnCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
+ case a: AlterTableCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
val table = a.table.asInstanceOf[ResolvedTable]
a.transformExpressions {
case u: UnresolvedFieldName => resolveFieldNames(table, u.name, u)
}
- case a @ AlterTableAddColumns(r: ResolvedTable, cols) if !a.resolved =>
+ case a @ AddColumns(r: ResolvedTable, cols) if !a.resolved =>
// 'colsToAdd' keeps track of new columns being added. It stores a mapping from a
// normalized parent name of fields to field names that belong to the parent.
// For example, if we add columns "a.b.c", "a.b.d", and "a.c", 'colsToAdd' will become
@@ -3668,7 +3668,7 @@ class Analyzer(override val catalogManager: CatalogManager)
resolved.copyTagsFrom(a)
resolved
- case a @ AlterTableAlterColumn(
+ case a @ AlterColumn(
table: ResolvedTable, ResolvedFieldName(path, field), dataType, _, _, position) =>
val newDataType = dataType.flatMap { dt =>
// Hive style syntax provides the column type, even if it may not have changed.
@@ -3705,7 +3705,7 @@ class Analyzer(override val catalogManager: CatalogManager)
}.getOrElse(throw QueryCompilationErrors.missingFieldError(fieldName, table, context.origin))
}
- private def hasUnresolvedFieldName(a: AlterTableColumnCommand): Boolean = {
+ private def hasUnresolvedFieldName(a: AlterTableCommand): Boolean = {
a.expressions.exists(_.find(_.isInstanceOf[UnresolvedFieldName]).isDefined)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 77f721c..09cd71b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -442,8 +442,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case write: V2WriteCommand if write.resolved =>
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
- case alter: AlterTableColumnCommand if alter.table.resolved =>
- checkAlterTableColumnCommand(alter)
+ case alter: AlterTableCommand =>
+ checkAlterTableCommand(alter)
case _ => // Falls back to the following checks
}
@@ -939,7 +939,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
/**
* Validates the options used for alter table commands after table and columns are resolved.
*/
- private def checkAlterTableColumnCommand(alter: AlterTableColumnCommand): Unit = {
+ private def checkAlterTableCommand(alter: AlterTableCommand): Unit = {
def checkColumnNotExists(op: String, fieldNames: Seq[String], struct: StructType): Unit = {
if (struct.findNestedField(fieldNames, includeCollections = true).isDefined) {
alter.failAnalysis(s"Cannot $op column, because ${fieldNames.quoted} " +
@@ -948,7 +948,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
}
alter match {
- case AlterTableAddColumns(table: ResolvedTable, colsToAdd) =>
+ case AddColumns(table: ResolvedTable, colsToAdd) =>
colsToAdd.foreach { colToAdd =>
checkColumnNotExists("add", colToAdd.name, table.schema)
}
@@ -957,10 +957,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
"in the user specified columns",
alter.conf.resolver)
- case AlterTableRenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
+ case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
checkColumnNotExists("rename", col.path :+ newName, table.schema)
- case a @ AlterTableAlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) =>
+ case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) =>
val fieldName = col.name.quoted
if (a.dataType.isDefined) {
val field = CharVarcharUtils.getRawType(col.field.metadata)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index de5a84c..8f5c2f1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3614,7 +3614,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
val colToken = if (ctx.COLUMN() != null) "COLUMN" else "COLUMNS"
- AlterTableAddColumns(
+ AddColumns(
createUnresolvedTable(ctx.multipartIdentifier, s"ALTER TABLE ... ADD $colToken"),
ctx.columns.qualifiedColTypeWithPosition.asScala.map(typedVisit[QualifiedColType]).toSeq
)
@@ -3630,7 +3630,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
override def visitRenameTableColumn(
ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) {
- AlterTableRenameColumn(
+ RenameColumn(
createUnresolvedTable(ctx.table, "ALTER TABLE ... RENAME COLUMN"),
UnresolvedFieldName(typedVisit[Seq[String]](ctx.from)),
ctx.to.getText)
@@ -3684,7 +3684,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
assert(Seq(dataType, nullable, comment, position).count(_.nonEmpty) == 1)
- AlterTableAlterColumn(
+ AlterColumn(
createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"),
UnresolvedFieldName(typedVisit[Seq[String]](ctx.column)),
dataType = dataType,
@@ -3718,7 +3718,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
Some("please run ALTER COLUMN ... SET/DROP NOT NULL instead"))
}
- AlterTableAlterColumn(
+ AlterColumn(
createUnresolvedTable(ctx.table, s"ALTER TABLE ... CHANGE COLUMN"),
UnresolvedFieldName(columnNameParts),
dataType = Option(ctx.colType().dataType()).map(typedVisit[DataType]),
@@ -3733,7 +3733,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
if (ctx.partitionSpec != null) {
operationNotAllowed("ALTER TABLE table PARTITION partition_spec REPLACE COLUMNS", ctx)
}
- AlterTableReplaceColumns(
+ ReplaceColumns(
createUnresolvedTable(ctx.multipartIdentifier, "ALTER TABLE ... REPLACE COLUMNS"),
ctx.columns.qualifiedColTypeWithPosition.asScala.map { colType =>
if (colType.NULL != null) {
@@ -3766,7 +3766,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
override def visitDropTableColumns(
ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) {
val columnsToDrop = ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]])
- AlterTableDropColumns(
+ DropColumns(
createUnresolvedTable(
ctx.multipartIdentifier,
"ALTER TABLE ... DROP COLUMNS"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
new file mode 100644
index 0000000..302a810
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition}
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.util.TypeUtils
+import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.DataType
+
+/**
+ * The base trait for commands that need to alter a v2 table with [[TableChange]]s.
+ */
+trait AlterTableCommand extends UnaryCommand {
+ def changes: Seq[TableChange]
+ def table: LogicalPlan
+ final override def child: LogicalPlan = table
+}
+
+/**
+ * The logical plan that defines or changes the comment of an TABLE for v2 catalogs.
+ *
+ * {{{
+ * COMMENT ON TABLE tableIdentifier IS ('text' | NULL)
+ * }}}
+ *
+ * where the `text` is the new comment written as a string literal; or `NULL` to drop the comment.
+ */
+case class CommentOnTable(table: LogicalPlan, comment: String) extends AlterTableCommand {
+ override def changes: Seq[TableChange] = {
+ Seq(TableChange.setProperty(TableCatalog.PROP_COMMENT, comment))
+ }
+ override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+ copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... SET LOCATION command.
+ */
+case class SetTableLocation(
+ table: LogicalPlan,
+ partitionSpec: Option[TablePartitionSpec],
+ location: String) extends AlterTableCommand {
+ override def changes: Seq[TableChange] = {
+ if (partitionSpec.nonEmpty) {
+ throw QueryCompilationErrors.alterV2TableSetLocationWithPartitionNotSupportedError()
+ }
+ Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location))
+ }
+ override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+ copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... SET TBLPROPERTIES command.
+ */
+case class SetTableProperties(
+ table: LogicalPlan,
+ properties: Map[String, String]) extends AlterTableCommand {
+ override def changes: Seq[TableChange] = {
+ properties.map { case (key, value) =>
+ TableChange.setProperty(key, value)
+ }.toSeq
+ }
+ override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+ copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... UNSET TBLPROPERTIES command.
+ */
+case class UnsetTableProperties(
+ table: LogicalPlan,
+ propertyKeys: Seq[String],
+ ifExists: Boolean) extends AlterTableCommand {
+ override def changes: Seq[TableChange] = {
+ propertyKeys.map(key => TableChange.removeProperty(key))
+ }
+ override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+ copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... ADD COLUMNS command.
+ */
+case class AddColumns(
+ table: LogicalPlan,
+ columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
+ columnsToAdd.foreach { c =>
+ TypeUtils.failWithIntervalType(c.dataType)
+ }
+
+ override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
+
+ override def changes: Seq[TableChange] = {
+ columnsToAdd.map { col =>
+ require(col.path.forall(_.resolved),
+ "FieldName should be resolved before it's converted to TableChange.")
+ require(col.position.forall(_.resolved),
+ "FieldPosition should be resolved before it's converted to TableChange.")
+ TableChange.addColumn(
+ col.name.toArray,
+ col.dataType,
+ col.nullable,
+ col.comment.orNull,
+ col.position.map(_.position).orNull)
+ }
+ }
+
+ override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+ copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... REPLACE COLUMNS command.
+ */
+case class ReplaceColumns(
+ table: LogicalPlan,
+ columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
+ columnsToAdd.foreach { c =>
+ TypeUtils.failWithIntervalType(c.dataType)
+ }
+
+ override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
+
+ override def changes: Seq[TableChange] = {
+ // REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
+ require(table.resolved)
+ val deleteChanges = table.schema.fieldNames.map { name =>
+ TableChange.deleteColumn(Array(name))
+ }
+ val addChanges = columnsToAdd.map { col =>
+ assert(col.path.isEmpty)
+ assert(col.position.isEmpty)
+ TableChange.addColumn(
+ col.name.toArray,
+ col.dataType,
+ col.nullable,
+ col.comment.orNull,
+ null)
+ }
+ deleteChanges ++ addChanges
+ }
+
+ override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+ copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... DROP COLUMNS command.
+ */
+case class DropColumns(
+ table: LogicalPlan,
+ columnsToDrop: Seq[FieldName]) extends AlterTableCommand {
+ override def changes: Seq[TableChange] = {
+ columnsToDrop.map { col =>
+ require(col.resolved, "FieldName should be resolved before it's converted to TableChange.")
+ TableChange.deleteColumn(col.name.toArray)
+ }
+ }
+
+ override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+ copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... RENAME COLUMN command.
+ */
+case class RenameColumn(
+ table: LogicalPlan,
+ column: FieldName,
+ newName: String) extends AlterTableCommand {
+ override def changes: Seq[TableChange] = {
+ require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
+ Seq(TableChange.renameColumn(column.name.toArray, newName))
+ }
+
+ override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+ copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... ALTER COLUMN command.
+ */
+case class AlterColumn(
+ table: LogicalPlan,
+ column: FieldName,
+ dataType: Option[DataType],
+ nullable: Option[Boolean],
+ comment: Option[String],
+ position: Option[FieldPosition]) extends AlterTableCommand {
+ override def changes: Seq[TableChange] = {
+ require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
+ val colName = column.name.toArray
+ val typeChange = dataType.map { newDataType =>
+ TableChange.updateColumnType(colName, newDataType)
+ }
+ val nullabilityChange = nullable.map { nullable =>
+ TableChange.updateColumnNullability(colName, nullable)
+ }
+ val commentChange = comment.map { newComment =>
+ TableChange.updateColumnComment(colName, newComment)
+ }
+ val positionChange = position.map { newPosition =>
+ require(newPosition.resolved,
+ "FieldPosition should be resolved before it's converted to TableChange.")
+ TableChange.updateColumnPosition(colName, newPosition.position)
+ }
+ typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange
+ }
+
+ override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+ copy(table = newChild)
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index d2b5909..195bb8c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -17,12 +17,12 @@
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, NamedRelation, PartitionSpec, UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
import org.apache.spark.sql.catalyst.trees.BinaryLike
-import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.Write
@@ -676,21 +676,6 @@ case class CommentOnNamespace(child: LogicalPlan, comment: String) extends Unary
}
/**
- * The logical plan that defines or changes the comment of an TABLE for v2 catalogs.
- *
- * {{{
- * COMMENT ON TABLE tableIdentifier IS ('text' | NULL)
- * }}}
- *
- * where the `text` is the new comment written as a string literal; or `NULL` to drop the comment.
- *
- */
-case class CommentOnTable(child: LogicalPlan, comment: String) extends UnaryCommand {
- override protected def withNewChildInternal(newChild: LogicalPlan): CommentOnTable =
- copy(child = newChild)
-}
-
-/**
* The logical plan of the REFRESH FUNCTION command.
*/
case class RefreshFunction(child: LogicalPlan) extends UnaryCommand {
@@ -1043,177 +1028,3 @@ case class UncacheTable(
override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
}
-
-/**
- * The logical plan of the ALTER TABLE ... SET LOCATION command.
- */
-case class SetTableLocation(
- table: LogicalPlan,
- partitionSpec: Option[TablePartitionSpec],
- location: String) extends UnaryCommand {
- override def child: LogicalPlan = table
- override protected def withNewChildInternal(newChild: LogicalPlan): SetTableLocation =
- copy(table = newChild)
-}
-
-/**
- * The logical plan of the ALTER TABLE ... SET TBLPROPERTIES command.
- */
-case class SetTableProperties(
- table: LogicalPlan,
- properties: Map[String, String]) extends UnaryCommand {
- override def child: LogicalPlan = table
- override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
- copy(table = newChild)
-}
-
-/**
- * The logical plan of the ALTER TABLE ... UNSET TBLPROPERTIES command.
- */
-case class UnsetTableProperties(
- table: LogicalPlan,
- propertyKeys: Seq[String],
- ifExists: Boolean) extends UnaryCommand {
- override def child: LogicalPlan = table
- override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
- copy(table = newChild)
-}
-
-trait AlterTableColumnCommand extends UnaryCommand {
- def table: LogicalPlan
- def changes: Seq[TableChange]
- override def child: LogicalPlan = table
-}
-
-/**
- * The logical plan of the ALTER TABLE ... ADD COLUMNS command.
- */
-case class AlterTableAddColumns(
- table: LogicalPlan,
- columnsToAdd: Seq[QualifiedColType]) extends AlterTableColumnCommand {
- columnsToAdd.foreach { c =>
- TypeUtils.failWithIntervalType(c.dataType)
- }
-
- override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
-
- override def changes: Seq[TableChange] = {
- columnsToAdd.map { col =>
- require(col.path.forall(_.resolved),
- "FieldName should be resolved before it's converted to TableChange.")
- require(col.position.forall(_.resolved),
- "FieldPosition should be resolved before it's converted to TableChange.")
- TableChange.addColumn(
- col.name.toArray,
- col.dataType,
- col.nullable,
- col.comment.orNull,
- col.position.map(_.position).orNull)
- }
- }
-
- override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
- copy(table = newChild)
-}
-
-/**
- * The logical plan of the ALTER TABLE ... REPLACE COLUMNS command.
- */
-case class AlterTableReplaceColumns(
- table: LogicalPlan,
- columnsToAdd: Seq[QualifiedColType]) extends AlterTableColumnCommand {
- columnsToAdd.foreach { c =>
- TypeUtils.failWithIntervalType(c.dataType)
- }
-
- override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
-
- override def changes: Seq[TableChange] = {
- // REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
- require(table.resolved)
- val deleteChanges = table.schema.fieldNames.map { name =>
- TableChange.deleteColumn(Array(name))
- }
- val addChanges = columnsToAdd.map { col =>
- assert(col.path.isEmpty)
- assert(col.position.isEmpty)
- TableChange.addColumn(
- col.name.toArray,
- col.dataType,
- col.nullable,
- col.comment.orNull,
- null)
- }
- deleteChanges ++ addChanges
- }
-
- override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
- copy(table = newChild)
-}
-
-/**
- * The logical plan of the ALTER TABLE ... DROP COLUMNS command.
- */
-case class AlterTableDropColumns(
- table: LogicalPlan,
- columnsToDrop: Seq[FieldName]) extends AlterTableColumnCommand {
- override def changes: Seq[TableChange] = {
- columnsToDrop.map { col =>
- require(col.resolved, "FieldName should be resolved before it's converted to TableChange.")
- TableChange.deleteColumn(col.name.toArray)
- }
- }
-
- override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
- copy(table = newChild)
-}
-
-/**
- * The logical plan of the ALTER TABLE ... RENAME COLUMN command.
- */
-case class AlterTableRenameColumn(
- table: LogicalPlan,
- column: FieldName,
- newName: String) extends AlterTableColumnCommand {
- override def changes: Seq[TableChange] = {
- require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
- Seq(TableChange.renameColumn(column.name.toArray, newName))
- }
-
- override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
- copy(table = newChild)
-}
-
-/**
- * The logical plan of the ALTER TABLE ... ALTER COLUMN command.
- */
-case class AlterTableAlterColumn(
- table: LogicalPlan,
- column: FieldName,
- dataType: Option[DataType],
- nullable: Option[Boolean],
- comment: Option[String],
- position: Option[FieldPosition]) extends AlterTableColumnCommand {
- override def changes: Seq[TableChange] = {
- require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
- val colName = column.name.toArray
- val typeChange = dataType.map { newDataType =>
- TableChange.updateColumnType(colName, newDataType)
- }
- val nullabilityChange = nullable.map { nullable =>
- TableChange.updateColumnNullability(colName, nullable)
- }
- val commentChange = comment.map { newComment =>
- TableChange.updateColumnComment(colName, newComment)
- }
- val positionChange = position.map { newPosition =>
- require(newPosition.resolved,
- "FieldPosition should be resolved before it's converted to TableChange.")
- TableChange.updateColumnPosition(colName, newPosition.position)
- }
- typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange
- }
-
- override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
- copy(table = newChild)
-}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index ea35f8b..45f865f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -788,7 +788,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add column") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x int"),
- AlterTableAddColumns(
+ AddColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
Seq(QualifiedColType(None, "x", IntegerType, true, None, None)
)))
@@ -797,7 +797,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add multiple columns") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMNS x int, y string"),
- AlterTableAddColumns(
+ AddColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None),
Seq(QualifiedColType(None, "x", IntegerType, true, None, None),
QualifiedColType(None, "y", StringType, true, None, None)
@@ -807,7 +807,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add column with COLUMNS") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMNS x int"),
- AlterTableAddColumns(
+ AddColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None),
Seq(QualifiedColType(None, "x", IntegerType, true, None, None)
)))
@@ -816,7 +816,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add column with COLUMNS (...)") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMNS (x int)"),
- AlterTableAddColumns(
+ AddColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None),
Seq(QualifiedColType(None, "x", IntegerType, true, None, None)
)))
@@ -825,7 +825,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add column with COLUMNS (...) and COMMENT") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMNS (x int COMMENT 'doc')"),
- AlterTableAddColumns(
+ AddColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None),
Seq(QualifiedColType(None, "x", IntegerType, true, Some("doc"), None)
)))
@@ -834,7 +834,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add non-nullable column") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x int NOT NULL"),
- AlterTableAddColumns(
+ AddColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
Seq(QualifiedColType(None, "x", IntegerType, false, None, None)
)))
@@ -843,7 +843,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add column with COMMENT") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x int COMMENT 'doc'"),
- AlterTableAddColumns(
+ AddColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
Seq(QualifiedColType(None, "x", IntegerType, true, Some("doc"), None)
)))
@@ -852,7 +852,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add column with position") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x int FIRST"),
- AlterTableAddColumns(
+ AddColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
Seq(QualifiedColType(
None,
@@ -865,7 +865,7 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x int AFTER y"),
- AlterTableAddColumns(
+ AddColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
Seq(QualifiedColType(
None,
@@ -880,7 +880,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add column with nested column name") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc'"),
- AlterTableAddColumns(
+ AddColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
Seq(QualifiedColType(
Some(UnresolvedFieldName(Seq("x", "y"))), "z", IntegerType, true, Some("doc"), None)
@@ -890,7 +890,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add multiple columns with nested column name") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc', a.b string FIRST"),
- AlterTableAddColumns(
+ AddColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
Seq(
QualifiedColType(
@@ -930,7 +930,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: rename column") {
comparePlans(
parsePlan("ALTER TABLE table_name RENAME COLUMN a.b.c TO d"),
- AlterTableRenameColumn(
+ RenameColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... RENAME COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
"d"))
@@ -939,7 +939,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: update column type using ALTER") {
comparePlans(
parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c TYPE bigint"),
- AlterTableAlterColumn(
+ AlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
Some(LongType),
@@ -958,7 +958,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: update column type") {
comparePlans(
parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c TYPE bigint"),
- AlterTableAlterColumn(
+ AlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
Some(LongType),
@@ -970,7 +970,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: update column comment") {
comparePlans(
parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c COMMENT 'new comment'"),
- AlterTableAlterColumn(
+ AlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
None,
@@ -982,7 +982,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: update column position") {
comparePlans(
parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c FIRST"),
- AlterTableAlterColumn(
+ AlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
None,
@@ -1008,7 +1008,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: SET/DROP NOT NULL") {
comparePlans(
parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c SET NOT NULL"),
- AlterTableAlterColumn(
+ AlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
None,
@@ -1018,7 +1018,7 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c DROP NOT NULL"),
- AlterTableAlterColumn(
+ AlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
None,
@@ -1030,7 +1030,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: drop column") {
comparePlans(
parsePlan("ALTER TABLE table_name DROP COLUMN a.b.c"),
- AlterTableDropColumns(
+ DropColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", None),
Seq(UnresolvedFieldName(Seq("a", "b", "c")))))
}
@@ -1040,7 +1040,7 @@ class DDLParserSuite extends AnalysisTest {
Seq(sql, sql.replace("COLUMN", "COLUMNS")).foreach { drop =>
comparePlans(
parsePlan(drop),
- AlterTableDropColumns(
+ DropColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", None),
Seq(UnresolvedFieldName(Seq("x")),
UnresolvedFieldName(Seq("y")),
@@ -1055,7 +1055,7 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan(sql1),
- AlterTableAlterColumn(
+ AlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
Some(IntegerType),
@@ -1065,7 +1065,7 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan(sql2),
- AlterTableAlterColumn(
+ AlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
Some(IntegerType),
@@ -1075,7 +1075,7 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan(sql3),
- AlterTableAlterColumn(
+ AlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
Some(IntegerType),
@@ -1099,19 +1099,19 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan(sql1),
- AlterTableReplaceColumns(
+ ReplaceColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None),
Seq(QualifiedColType(None, "x", StringType, true, None, None))))
comparePlans(
parsePlan(sql2),
- AlterTableReplaceColumns(
+ ReplaceColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None),
Seq(QualifiedColType(None, "x", StringType, true, Some("x1"), None))))
comparePlans(
parsePlan(sql3),
- AlterTableReplaceColumns(
+ ReplaceColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None),
Seq(
QualifiedColType(None, "x", StringType, true, Some("x1"), None),
@@ -1120,7 +1120,7 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan(sql4),
- AlterTableReplaceColumns(
+ ReplaceColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None),
Seq(
QualifiedColType(None, "x", StringType, true, Some("x1"), None),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 3a2f525..80063cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -46,7 +46,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
- case AlterTableAddColumns(ResolvedV1TableIdentifier(ident), cols) =>
+ case AddColumns(ResolvedV1TableIdentifier(ident), cols) =>
cols.foreach { c =>
assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand")
if (!c.nullable) {
@@ -55,10 +55,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
AlterTableAddColumnsCommand(ident.asTableIdentifier, cols.map(convertToStructField))
- case AlterTableReplaceColumns(ResolvedV1TableIdentifier(_), _) =>
+ case ReplaceColumns(ResolvedV1TableIdentifier(_), _) =>
throw QueryCompilationErrors.replaceColumnsOnlySupportedWithV2TableError
- case a @ AlterTableAlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, _, _) =>
+ case a @ AlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, _, _) =>
if (a.column.name.length > 1) {
throw QueryCompilationErrors.alterQualifiedColumnOnlySupportedWithV2TableError
}
@@ -87,10 +87,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
builder.build())
AlterTableChangeColumnCommand(ident.asTableIdentifier, colName, newColumn)
- case AlterTableRenameColumn(ResolvedV1TableIdentifier(_), _, _) =>
+ case RenameColumn(ResolvedV1TableIdentifier(_), _, _) =>
throw QueryCompilationErrors.renameColumnOnlySupportedWithV2TableError
- case AlterTableDropColumns(ResolvedV1TableIdentifier(_), _) =>
+ case DropColumns(ResolvedV1TableIdentifier(_), _) =>
throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError
case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 3d69029..1a50c32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.toPrettySQL
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TableChange}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog}
import org.apache.spark.sql.connector.read.LocalScan
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.connector.write.V1Write
@@ -314,10 +314,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
ns,
Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil
- case CommentOnTable(ResolvedTable(catalog, identifier, _, _), comment) =>
- val changes = TableChange.setProperty(TableCatalog.PROP_COMMENT, comment)
- AlterTableExec(catalog, identifier, Seq(changes)) :: Nil
-
case CreateNamespace(catalog, namespace, ifNotExists, properties) =>
CreateNamespaceExec(catalog, namespace, ifNotExists, properties) :: Nil
@@ -424,25 +420,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
UncacheTableExec(r.table, cascade = !isTempView(r.table)) :: Nil
- case SetTableLocation(table: ResolvedTable, partitionSpec, location) =>
- if (partitionSpec.nonEmpty) {
- throw QueryCompilationErrors.alterV2TableSetLocationWithPartitionNotSupportedError()
- }
- val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location))
- AlterTableExec(table.catalog, table.identifier, changes) :: Nil
-
- case SetTableProperties(table: ResolvedTable, props) =>
- val changes = props.map { case (key, value) =>
- TableChange.setProperty(key, value)
- }.toSeq
- AlterTableExec(table.catalog, table.identifier, changes) :: Nil
-
- // TODO: v2 `UNSET TBLPROPERTIES` should respect the ifExists flag.
- case UnsetTableProperties(table: ResolvedTable, keys, _) =>
- val changes = keys.map(key => TableChange.removeProperty(key))
- AlterTableExec(table.catalog, table.identifier, changes) :: Nil
-
- case a: AlterTableColumnCommand if a.table.resolved =>
+ case a: AlterTableCommand if a.table.resolved =>
val table = a.table.asInstanceOf[ResolvedTable]
AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
index 763cd6a..1d6e0a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.connector
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition}
-import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddColumns, AlterTableAlterColumn, AlterTableColumnCommand, AlterTableDropColumns, AlterTableRenameColumn, CreateTableAsSelect, LogicalPlan, QualifiedColType, ReplaceTableAsSelect}
+import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -140,7 +140,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
Seq("POINT.Z", "poInt.z", "poInt.Z").foreach { ref =>
val field = ref.split("\\.")
alterTableTest(
- AlterTableAddColumns(
+ AddColumns(
table,
Seq(QualifiedColType(
Some(UnresolvedFieldName(field.init)), field.last, LongType, true, None, None))),
@@ -152,7 +152,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("AlterTable: add column resolution - positional") {
Seq("ID", "iD").foreach { ref =>
alterTableTest(
- AlterTableAddColumns(
+ AddColumns(
table,
Seq(QualifiedColType(
None,
@@ -168,7 +168,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("AlterTable: add column resolution - column position referencing new column") {
alterTableTest(
- AlterTableAddColumns(
+ AddColumns(
table,
Seq(QualifiedColType(
None,
@@ -191,7 +191,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("AlterTable: add column resolution - nested positional") {
Seq("X", "Y").foreach { ref =>
alterTableTest(
- AlterTableAddColumns(
+ AddColumns(
table,
Seq(QualifiedColType(
Some(UnresolvedFieldName(Seq("point"))),
@@ -207,7 +207,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("AlterTable: add column resolution - column position referencing new nested column") {
alterTableTest(
- AlterTableAddColumns(
+ AddColumns(
table,
Seq(QualifiedColType(
Some(UnresolvedFieldName(Seq("point"))),
@@ -229,7 +229,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("SPARK-36372: Adding duplicate columns should not be allowed") {
alterTableTest(
- AlterTableAddColumns(
+ AddColumns(
table,
Seq(QualifiedColType(
Some(UnresolvedFieldName(Seq("point"))),
@@ -252,7 +252,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("AlterTable: drop column resolution") {
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
alterTableTest(
- AlterTableDropColumns(table, Seq(UnresolvedFieldName(ref))),
+ DropColumns(table, Seq(UnresolvedFieldName(ref))),
Seq("Missing field " + ref.quoted)
)
}
@@ -261,7 +261,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("AlterTable: rename column resolution") {
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
alterTableTest(
- AlterTableRenameColumn(table, UnresolvedFieldName(ref), "newName"),
+ RenameColumn(table, UnresolvedFieldName(ref), "newName"),
Seq("Missing field " + ref.quoted)
)
}
@@ -270,7 +270,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("AlterTable: drop column nullability resolution") {
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
alterTableTest(
- AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, Some(true), None, None),
+ AlterColumn(table, UnresolvedFieldName(ref), None, Some(true), None, None),
Seq("Missing field " + ref.quoted)
)
}
@@ -279,7 +279,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("AlterTable: change column type resolution") {
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
alterTableTest(
- AlterTableAlterColumn(table, UnresolvedFieldName(ref), Some(StringType), None, None, None),
+ AlterColumn(table, UnresolvedFieldName(ref), Some(StringType), None, None, None),
Seq("Missing field " + ref.quoted)
)
}
@@ -288,14 +288,14 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("AlterTable: change column comment resolution") {
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
alterTableTest(
- AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, None, Some("comment"), None),
+ AlterColumn(table, UnresolvedFieldName(ref), None, None, Some("comment"), None),
Seq("Missing field " + ref.quoted)
)
}
}
private def alterTableTest(
- alter: AlterTableColumnCommand,
+ alter: AlterTableCommand,
error: Seq[String],
expectErrorOnCaseSensitive: Boolean = true): Unit = {
Seq(true, false).foreach { caseSensitive =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 25a8c4e..2c2f833 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat,
import org.apache.spark.sql.catalyst.expressions.{AnsiCast, AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral}
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
-import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.FakeV2Provider
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, V1Table}
@@ -1132,7 +1132,7 @@ class PlanResolutionSuite extends AnalysisTest {
"ALTER COLUMN with qualified column is only supported with v2 tables"))
} else {
parsed1 match {
- case AlterTableAlterColumn(
+ case AlterColumn(
_: ResolvedTable,
column: ResolvedFieldName,
Some(LongType),
@@ -1144,7 +1144,7 @@ class PlanResolutionSuite extends AnalysisTest {
}
parsed2 match {
- case AlterTableAlterColumn(
+ case AlterColumn(
_: ResolvedTable,
column: ResolvedFieldName,
None,
@@ -1198,14 +1198,14 @@ class PlanResolutionSuite extends AnalysisTest {
test("alter table: hive style change column") {
Seq("v2Table", "testcat.tab").foreach { tblName =>
parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i int COMMENT 'an index'") match {
- case AlterTableAlterColumn(
+ case AlterColumn(
_: ResolvedTable, _: ResolvedFieldName, None, None, Some(comment), None) =>
assert(comment == "an index")
case _ => fail("expect AlterTableAlterColumn with comment change only")
}
parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i long COMMENT 'an index'") match {
- case AlterTableAlterColumn(
+ case AlterColumn(
_: ResolvedTable, _: ResolvedFieldName, Some(dataType), None, Some(comment), None) =>
assert(comment == "an index")
assert(dataType == LongType)
@@ -1241,7 +1241,7 @@ class PlanResolutionSuite extends AnalysisTest {
val catalog = if (isSessionCatalog) v2SessionCatalog else testCat
val tableIdent = if (isSessionCatalog) "v2Table" else "tab"
parsed match {
- case AlterTableAlterColumn(r: ResolvedTable, _, _, _, _, _) =>
+ case AlterColumn(r: ResolvedTable, _, _, _, _, _) =>
assert(r.catalog == catalog)
assert(r.identifier.name() == tableIdent)
case Project(_, AsDataSourceV2Relation(r)) =>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org