You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2021/08/03 07:44:23 UTC

[spark] branch branch-3.2 updated: [SPARK-36380][SQL] Simplify the logical plan names for ALTER TABLE ... COLUMN

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 7c58684  [SPARK-36380][SQL] Simplify the logical plan names for ALTER TABLE ... COLUMN
7c58684 is described below

commit 7c586842d71064169aa77baf666a8566d9ed785e
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Aug 3 10:43:00 2021 +0300

    [SPARK-36380][SQL] Simplify the logical plan names for ALTER TABLE ... COLUMN
    
    ### What changes were proposed in this pull request?
    
    This a followup of the recent work such as https://github.com/apache/spark/pull/33200
    
    For `ALTER TABLE` commands, the logical plans do not have the common `AlterTable` prefix in the name and just use names like `SetTableLocation`. This PR proposes to follow the same naming rule in `ALTER TABE ... COLUMN` commands.
    
    This PR also moves these AlterTable commands to a individual file and give them a base trait.
    
    ### Why are the changes needed?
    
    name simplification
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing test
    
    Closes #33609 from cloud-fan/dsv2.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
    (cherry picked from commit 7cb9c1c2415a0984515e4d4733f816673e4ae3c8)
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  12 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  12 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  12 +-
 .../plans/logical/v2AlterTableCommands.scala       | 230 +++++++++++++++++++++
 .../sql/catalyst/plans/logical/v2Commands.scala    | 193 +----------------
 .../spark/sql/catalyst/parser/DDLParserSuite.scala |  54 ++---
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  10 +-
 .../datasources/v2/DataSourceV2Strategy.scala      |  26 +--
 .../connector/V2CommandsCaseSensitivitySuite.scala |  26 +--
 .../execution/command/PlanResolutionSuite.scala    |  12 +-
 10 files changed, 303 insertions(+), 284 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a9c085a..75fad11a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -269,7 +269,7 @@ class Analyzer(override val catalogManager: CatalogManager)
       ResolveRelations ::
       ResolveTables ::
       ResolvePartitionSpec ::
-      ResolveAlterTableColumnCommands ::
+      ResolveAlterTableCommands ::
       AddMetadataColumns ::
       DeduplicateRelations ::
       ResolveReferences ::
@@ -3607,15 +3607,15 @@ class Analyzer(override val catalogManager: CatalogManager)
    * Rule to mostly resolve, normalize and rewrite column names based on case sensitivity
    * for alter table column commands.
    */
-  object ResolveAlterTableColumnCommands extends Rule[LogicalPlan] {
+  object ResolveAlterTableCommands extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
-      case a: AlterTableColumnCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
+      case a: AlterTableCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
         val table = a.table.asInstanceOf[ResolvedTable]
         a.transformExpressions {
           case u: UnresolvedFieldName => resolveFieldNames(table, u.name, u)
         }
 
-      case a @ AlterTableAddColumns(r: ResolvedTable, cols) if !a.resolved =>
+      case a @ AddColumns(r: ResolvedTable, cols) if !a.resolved =>
         // 'colsToAdd' keeps track of new columns being added. It stores a mapping from a
         // normalized parent name of fields to field names that belong to the parent.
         // For example, if we add columns "a.b.c", "a.b.d", and "a.c", 'colsToAdd' will become
@@ -3668,7 +3668,7 @@ class Analyzer(override val catalogManager: CatalogManager)
         resolved.copyTagsFrom(a)
         resolved
 
-      case a @ AlterTableAlterColumn(
+      case a @ AlterColumn(
           table: ResolvedTable, ResolvedFieldName(path, field), dataType, _, _, position) =>
         val newDataType = dataType.flatMap { dt =>
           // Hive style syntax provides the column type, even if it may not have changed.
@@ -3705,7 +3705,7 @@ class Analyzer(override val catalogManager: CatalogManager)
       }.getOrElse(throw QueryCompilationErrors.missingFieldError(fieldName, table, context.origin))
     }
 
-    private def hasUnresolvedFieldName(a: AlterTableColumnCommand): Boolean = {
+    private def hasUnresolvedFieldName(a: AlterTableCommand): Boolean = {
       a.expressions.exists(_.find(_.isInstanceOf[UnresolvedFieldName]).isDefined)
     }
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 77f721c..09cd71b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -442,8 +442,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
           case write: V2WriteCommand if write.resolved =>
             write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
 
-          case alter: AlterTableColumnCommand if alter.table.resolved =>
-            checkAlterTableColumnCommand(alter)
+          case alter: AlterTableCommand =>
+            checkAlterTableCommand(alter)
 
           case _ => // Falls back to the following checks
         }
@@ -939,7 +939,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
   /**
    * Validates the options used for alter table commands after table and columns are resolved.
    */
-  private def checkAlterTableColumnCommand(alter: AlterTableColumnCommand): Unit = {
+  private def checkAlterTableCommand(alter: AlterTableCommand): Unit = {
     def checkColumnNotExists(op: String, fieldNames: Seq[String], struct: StructType): Unit = {
       if (struct.findNestedField(fieldNames, includeCollections = true).isDefined) {
         alter.failAnalysis(s"Cannot $op column, because ${fieldNames.quoted} " +
@@ -948,7 +948,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
     }
 
     alter match {
-      case AlterTableAddColumns(table: ResolvedTable, colsToAdd) =>
+      case AddColumns(table: ResolvedTable, colsToAdd) =>
         colsToAdd.foreach { colToAdd =>
           checkColumnNotExists("add", colToAdd.name, table.schema)
         }
@@ -957,10 +957,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
           "in the user specified columns",
           alter.conf.resolver)
 
-      case AlterTableRenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
+      case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
         checkColumnNotExists("rename", col.path :+ newName, table.schema)
 
-      case a @ AlterTableAlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) =>
+      case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) =>
         val fieldName = col.name.quoted
         if (a.dataType.isDefined) {
           val field = CharVarcharUtils.getRawType(col.field.metadata)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index de5a84c..8f5c2f1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3614,7 +3614,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
    */
   override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
     val colToken = if (ctx.COLUMN() != null) "COLUMN" else "COLUMNS"
-    AlterTableAddColumns(
+    AddColumns(
       createUnresolvedTable(ctx.multipartIdentifier, s"ALTER TABLE ... ADD $colToken"),
       ctx.columns.qualifiedColTypeWithPosition.asScala.map(typedVisit[QualifiedColType]).toSeq
     )
@@ -3630,7 +3630,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
    */
   override def visitRenameTableColumn(
       ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableRenameColumn(
+    RenameColumn(
       createUnresolvedTable(ctx.table, "ALTER TABLE ... RENAME COLUMN"),
       UnresolvedFieldName(typedVisit[Seq[String]](ctx.from)),
       ctx.to.getText)
@@ -3684,7 +3684,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
 
     assert(Seq(dataType, nullable, comment, position).count(_.nonEmpty) == 1)
 
-    AlterTableAlterColumn(
+    AlterColumn(
       createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"),
       UnresolvedFieldName(typedVisit[Seq[String]](ctx.column)),
       dataType = dataType,
@@ -3718,7 +3718,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
         Some("please run ALTER COLUMN ... SET/DROP NOT NULL instead"))
     }
 
-    AlterTableAlterColumn(
+    AlterColumn(
       createUnresolvedTable(ctx.table, s"ALTER TABLE ... CHANGE COLUMN"),
       UnresolvedFieldName(columnNameParts),
       dataType = Option(ctx.colType().dataType()).map(typedVisit[DataType]),
@@ -3733,7 +3733,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
     if (ctx.partitionSpec != null) {
       operationNotAllowed("ALTER TABLE table PARTITION partition_spec REPLACE COLUMNS", ctx)
     }
-    AlterTableReplaceColumns(
+    ReplaceColumns(
       createUnresolvedTable(ctx.multipartIdentifier, "ALTER TABLE ... REPLACE COLUMNS"),
       ctx.columns.qualifiedColTypeWithPosition.asScala.map { colType =>
         if (colType.NULL != null) {
@@ -3766,7 +3766,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
   override def visitDropTableColumns(
       ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) {
     val columnsToDrop = ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]])
-    AlterTableDropColumns(
+    DropColumns(
       createUnresolvedTable(
         ctx.multipartIdentifier,
         "ALTER TABLE ... DROP COLUMNS"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
new file mode 100644
index 0000000..302a810
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition}
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.util.TypeUtils
+import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.DataType
+
+/**
+ * The base trait for commands that need to alter a v2 table with [[TableChange]]s.
+ */
+trait AlterTableCommand extends UnaryCommand {
+  def changes: Seq[TableChange]
+  def table: LogicalPlan
+  final override def child: LogicalPlan = table
+}
+
+/**
+ * The logical plan that defines or changes the comment of an TABLE for v2 catalogs.
+ *
+ * {{{
+ *   COMMENT ON TABLE tableIdentifier IS ('text' | NULL)
+ * }}}
+ *
+ * where the `text` is the new comment written as a string literal; or `NULL` to drop the comment.
+ */
+case class CommentOnTable(table: LogicalPlan, comment: String) extends AlterTableCommand {
+  override def changes: Seq[TableChange] = {
+    Seq(TableChange.setProperty(TableCatalog.PROP_COMMENT, comment))
+  }
+  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+    copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... SET LOCATION command.
+ */
+case class SetTableLocation(
+    table: LogicalPlan,
+    partitionSpec: Option[TablePartitionSpec],
+    location: String) extends AlterTableCommand {
+  override def changes: Seq[TableChange] = {
+    if (partitionSpec.nonEmpty) {
+      throw QueryCompilationErrors.alterV2TableSetLocationWithPartitionNotSupportedError()
+    }
+    Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location))
+  }
+  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+    copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... SET TBLPROPERTIES command.
+ */
+case class SetTableProperties(
+    table: LogicalPlan,
+    properties: Map[String, String]) extends AlterTableCommand {
+  override def changes: Seq[TableChange] = {
+    properties.map { case (key, value) =>
+      TableChange.setProperty(key, value)
+    }.toSeq
+  }
+  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+    copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... UNSET TBLPROPERTIES command.
+ */
+case class UnsetTableProperties(
+    table: LogicalPlan,
+    propertyKeys: Seq[String],
+    ifExists: Boolean) extends AlterTableCommand {
+  override def changes: Seq[TableChange] = {
+    propertyKeys.map(key => TableChange.removeProperty(key))
+  }
+  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+    copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... ADD COLUMNS command.
+ */
+case class AddColumns(
+    table: LogicalPlan,
+    columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
+  columnsToAdd.foreach { c =>
+    TypeUtils.failWithIntervalType(c.dataType)
+  }
+
+  override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
+
+  override def changes: Seq[TableChange] = {
+    columnsToAdd.map { col =>
+      require(col.path.forall(_.resolved),
+        "FieldName should be resolved before it's converted to TableChange.")
+      require(col.position.forall(_.resolved),
+        "FieldPosition should be resolved before it's converted to TableChange.")
+      TableChange.addColumn(
+        col.name.toArray,
+        col.dataType,
+        col.nullable,
+        col.comment.orNull,
+        col.position.map(_.position).orNull)
+    }
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+    copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... REPLACE COLUMNS command.
+ */
+case class ReplaceColumns(
+    table: LogicalPlan,
+    columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
+  columnsToAdd.foreach { c =>
+    TypeUtils.failWithIntervalType(c.dataType)
+  }
+
+  override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
+
+  override def changes: Seq[TableChange] = {
+    // REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
+    require(table.resolved)
+    val deleteChanges = table.schema.fieldNames.map { name =>
+      TableChange.deleteColumn(Array(name))
+    }
+    val addChanges = columnsToAdd.map { col =>
+      assert(col.path.isEmpty)
+      assert(col.position.isEmpty)
+      TableChange.addColumn(
+        col.name.toArray,
+        col.dataType,
+        col.nullable,
+        col.comment.orNull,
+        null)
+    }
+    deleteChanges ++ addChanges
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+    copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... DROP COLUMNS command.
+ */
+case class DropColumns(
+    table: LogicalPlan,
+    columnsToDrop: Seq[FieldName]) extends AlterTableCommand {
+  override def changes: Seq[TableChange] = {
+    columnsToDrop.map { col =>
+      require(col.resolved, "FieldName should be resolved before it's converted to TableChange.")
+      TableChange.deleteColumn(col.name.toArray)
+    }
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+    copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... RENAME COLUMN command.
+ */
+case class RenameColumn(
+    table: LogicalPlan,
+    column: FieldName,
+    newName: String) extends AlterTableCommand {
+  override def changes: Seq[TableChange] = {
+    require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
+    Seq(TableChange.renameColumn(column.name.toArray, newName))
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+    copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... ALTER COLUMN command.
+ */
+case class AlterColumn(
+    table: LogicalPlan,
+    column: FieldName,
+    dataType: Option[DataType],
+    nullable: Option[Boolean],
+    comment: Option[String],
+    position: Option[FieldPosition]) extends AlterTableCommand {
+  override def changes: Seq[TableChange] = {
+    require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
+    val colName = column.name.toArray
+    val typeChange = dataType.map { newDataType =>
+      TableChange.updateColumnType(colName, newDataType)
+    }
+    val nullabilityChange = nullable.map { nullable =>
+      TableChange.updateColumnNullability(colName, nullable)
+    }
+    val commentChange = comment.map { newComment =>
+      TableChange.updateColumnComment(colName, newComment)
+    }
+    val positionChange = position.map { newPosition =>
+      require(newPosition.resolved,
+        "FieldPosition should be resolved before it's converted to TableChange.")
+      TableChange.updateColumnPosition(colName, newPosition.position)
+    }
+    typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+    copy(table = newChild)
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index d2b5909..195bb8c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -17,12 +17,12 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, NamedRelation, PartitionSpec, UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
 import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
 import org.apache.spark.sql.catalyst.trees.BinaryLike
-import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.write.Write
@@ -676,21 +676,6 @@ case class CommentOnNamespace(child: LogicalPlan, comment: String) extends Unary
 }
 
 /**
- * The logical plan that defines or changes the comment of an TABLE for v2 catalogs.
- *
- * {{{
- *   COMMENT ON TABLE tableIdentifier IS ('text' | NULL)
- * }}}
- *
- * where the `text` is the new comment written as a string literal; or `NULL` to drop the comment.
- *
- */
-case class CommentOnTable(child: LogicalPlan, comment: String) extends UnaryCommand {
-  override protected def withNewChildInternal(newChild: LogicalPlan): CommentOnTable =
-    copy(child = newChild)
-}
-
-/**
  * The logical plan of the REFRESH FUNCTION command.
  */
 case class RefreshFunction(child: LogicalPlan) extends UnaryCommand {
@@ -1043,177 +1028,3 @@ case class UncacheTable(
 
   override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
 }
-
-/**
- * The logical plan of the ALTER TABLE ... SET LOCATION command.
- */
-case class SetTableLocation(
-    table: LogicalPlan,
-    partitionSpec: Option[TablePartitionSpec],
-    location: String) extends UnaryCommand {
-  override def child: LogicalPlan = table
-  override protected def withNewChildInternal(newChild: LogicalPlan): SetTableLocation =
-    copy(table = newChild)
-}
-
-/**
- * The logical plan of the ALTER TABLE ... SET TBLPROPERTIES command.
- */
-case class SetTableProperties(
-    table: LogicalPlan,
-    properties: Map[String, String]) extends UnaryCommand {
-  override def child: LogicalPlan = table
-  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
-    copy(table = newChild)
-}
-
-/**
- * The logical plan of the ALTER TABLE ... UNSET TBLPROPERTIES command.
- */
-case class UnsetTableProperties(
-    table: LogicalPlan,
-    propertyKeys: Seq[String],
-    ifExists: Boolean) extends UnaryCommand {
-  override def child: LogicalPlan = table
-  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
-    copy(table = newChild)
-}
-
-trait AlterTableColumnCommand extends UnaryCommand {
-  def table: LogicalPlan
-  def changes: Seq[TableChange]
-  override def child: LogicalPlan = table
-}
-
-/**
- * The logical plan of the ALTER TABLE ... ADD COLUMNS command.
- */
-case class AlterTableAddColumns(
-    table: LogicalPlan,
-    columnsToAdd: Seq[QualifiedColType]) extends AlterTableColumnCommand {
-  columnsToAdd.foreach { c =>
-    TypeUtils.failWithIntervalType(c.dataType)
-  }
-
-  override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
-
-  override def changes: Seq[TableChange] = {
-    columnsToAdd.map { col =>
-      require(col.path.forall(_.resolved),
-        "FieldName should be resolved before it's converted to TableChange.")
-      require(col.position.forall(_.resolved),
-        "FieldPosition should be resolved before it's converted to TableChange.")
-      TableChange.addColumn(
-        col.name.toArray,
-        col.dataType,
-        col.nullable,
-        col.comment.orNull,
-        col.position.map(_.position).orNull)
-    }
-  }
-
-  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
-    copy(table = newChild)
-}
-
-/**
- * The logical plan of the ALTER TABLE ... REPLACE COLUMNS command.
- */
-case class AlterTableReplaceColumns(
-    table: LogicalPlan,
-    columnsToAdd: Seq[QualifiedColType]) extends AlterTableColumnCommand {
-  columnsToAdd.foreach { c =>
-    TypeUtils.failWithIntervalType(c.dataType)
-  }
-
-  override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
-
-  override def changes: Seq[TableChange] = {
-    // REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
-    require(table.resolved)
-    val deleteChanges = table.schema.fieldNames.map { name =>
-      TableChange.deleteColumn(Array(name))
-    }
-    val addChanges = columnsToAdd.map { col =>
-      assert(col.path.isEmpty)
-      assert(col.position.isEmpty)
-      TableChange.addColumn(
-        col.name.toArray,
-        col.dataType,
-        col.nullable,
-        col.comment.orNull,
-        null)
-    }
-    deleteChanges ++ addChanges
-  }
-
-  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
-    copy(table = newChild)
-}
-
-/**
- * The logical plan of the ALTER TABLE ... DROP COLUMNS command.
- */
-case class AlterTableDropColumns(
-    table: LogicalPlan,
-    columnsToDrop: Seq[FieldName]) extends AlterTableColumnCommand {
-  override def changes: Seq[TableChange] = {
-    columnsToDrop.map { col =>
-      require(col.resolved, "FieldName should be resolved before it's converted to TableChange.")
-      TableChange.deleteColumn(col.name.toArray)
-    }
-  }
-
-  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
-    copy(table = newChild)
-}
-
-/**
- * The logical plan of the ALTER TABLE ... RENAME COLUMN command.
- */
-case class AlterTableRenameColumn(
-    table: LogicalPlan,
-    column: FieldName,
-    newName: String) extends AlterTableColumnCommand {
-  override def changes: Seq[TableChange] = {
-    require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
-    Seq(TableChange.renameColumn(column.name.toArray, newName))
-  }
-
-  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
-    copy(table = newChild)
-}
-
-/**
- * The logical plan of the ALTER TABLE ... ALTER COLUMN command.
- */
-case class AlterTableAlterColumn(
-    table: LogicalPlan,
-    column: FieldName,
-    dataType: Option[DataType],
-    nullable: Option[Boolean],
-    comment: Option[String],
-    position: Option[FieldPosition]) extends AlterTableColumnCommand {
-  override def changes: Seq[TableChange] = {
-    require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
-    val colName = column.name.toArray
-    val typeChange = dataType.map { newDataType =>
-      TableChange.updateColumnType(colName, newDataType)
-    }
-    val nullabilityChange = nullable.map { nullable =>
-      TableChange.updateColumnNullability(colName, nullable)
-    }
-    val commentChange = comment.map { newComment =>
-      TableChange.updateColumnComment(colName, newComment)
-    }
-    val positionChange = position.map { newPosition =>
-      require(newPosition.resolved,
-        "FieldPosition should be resolved before it's converted to TableChange.")
-      TableChange.updateColumnPosition(colName, newPosition.position)
-    }
-    typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange
-  }
-
-  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
-    copy(table = newChild)
-}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index ea35f8b..45f865f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -788,7 +788,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: add column") {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x int"),
-      AlterTableAddColumns(
+      AddColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
         Seq(QualifiedColType(None, "x", IntegerType, true, None, None)
       )))
@@ -797,7 +797,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: add multiple columns") {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMNS x int, y string"),
-      AlterTableAddColumns(
+      AddColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None),
         Seq(QualifiedColType(None, "x", IntegerType, true, None, None),
           QualifiedColType(None, "y", StringType, true, None, None)
@@ -807,7 +807,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: add column with COLUMNS") {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMNS x int"),
-      AlterTableAddColumns(
+      AddColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None),
         Seq(QualifiedColType(None, "x", IntegerType, true, None, None)
       )))
@@ -816,7 +816,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: add column with COLUMNS (...)") {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMNS (x int)"),
-      AlterTableAddColumns(
+      AddColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None),
         Seq(QualifiedColType(None, "x", IntegerType, true, None, None)
       )))
@@ -825,7 +825,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: add column with COLUMNS (...) and COMMENT") {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMNS (x int COMMENT 'doc')"),
-      AlterTableAddColumns(
+      AddColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None),
         Seq(QualifiedColType(None, "x", IntegerType, true, Some("doc"), None)
       )))
@@ -834,7 +834,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: add non-nullable column") {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x int NOT NULL"),
-      AlterTableAddColumns(
+      AddColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
         Seq(QualifiedColType(None, "x", IntegerType, false, None, None)
       )))
@@ -843,7 +843,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: add column with COMMENT") {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x int COMMENT 'doc'"),
-      AlterTableAddColumns(
+      AddColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
         Seq(QualifiedColType(None, "x", IntegerType, true, Some("doc"), None)
       )))
@@ -852,7 +852,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: add column with position") {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x int FIRST"),
-      AlterTableAddColumns(
+      AddColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
         Seq(QualifiedColType(
           None,
@@ -865,7 +865,7 @@ class DDLParserSuite extends AnalysisTest {
 
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x int AFTER y"),
-      AlterTableAddColumns(
+      AddColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
         Seq(QualifiedColType(
           None,
@@ -880,7 +880,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: add column with nested column name") {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc'"),
-      AlterTableAddColumns(
+      AddColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
         Seq(QualifiedColType(
           Some(UnresolvedFieldName(Seq("x", "y"))), "z", IntegerType, true, Some("doc"), None)
@@ -890,7 +890,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: add multiple columns with nested column name") {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc', a.b string FIRST"),
-      AlterTableAddColumns(
+      AddColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
         Seq(
           QualifiedColType(
@@ -930,7 +930,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: rename column") {
     comparePlans(
       parsePlan("ALTER TABLE table_name RENAME COLUMN a.b.c TO d"),
-      AlterTableRenameColumn(
+      RenameColumn(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... RENAME COLUMN", None),
         UnresolvedFieldName(Seq("a", "b", "c")),
         "d"))
@@ -939,7 +939,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: update column type using ALTER") {
     comparePlans(
       parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c TYPE bigint"),
-      AlterTableAlterColumn(
+      AlterColumn(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None),
         UnresolvedFieldName(Seq("a", "b", "c")),
         Some(LongType),
@@ -958,7 +958,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: update column type") {
     comparePlans(
       parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c TYPE bigint"),
-      AlterTableAlterColumn(
+      AlterColumn(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
         UnresolvedFieldName(Seq("a", "b", "c")),
         Some(LongType),
@@ -970,7 +970,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: update column comment") {
     comparePlans(
       parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c COMMENT 'new comment'"),
-      AlterTableAlterColumn(
+      AlterColumn(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
         UnresolvedFieldName(Seq("a", "b", "c")),
         None,
@@ -982,7 +982,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: update column position") {
     comparePlans(
       parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c FIRST"),
-      AlterTableAlterColumn(
+      AlterColumn(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
         UnresolvedFieldName(Seq("a", "b", "c")),
         None,
@@ -1008,7 +1008,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: SET/DROP NOT NULL") {
     comparePlans(
       parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c SET NOT NULL"),
-      AlterTableAlterColumn(
+      AlterColumn(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None),
         UnresolvedFieldName(Seq("a", "b", "c")),
         None,
@@ -1018,7 +1018,7 @@ class DDLParserSuite extends AnalysisTest {
 
     comparePlans(
       parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c DROP NOT NULL"),
-      AlterTableAlterColumn(
+      AlterColumn(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None),
         UnresolvedFieldName(Seq("a", "b", "c")),
         None,
@@ -1030,7 +1030,7 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: drop column") {
     comparePlans(
       parsePlan("ALTER TABLE table_name DROP COLUMN a.b.c"),
-      AlterTableDropColumns(
+      DropColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", None),
         Seq(UnresolvedFieldName(Seq("a", "b", "c")))))
   }
@@ -1040,7 +1040,7 @@ class DDLParserSuite extends AnalysisTest {
     Seq(sql, sql.replace("COLUMN", "COLUMNS")).foreach { drop =>
       comparePlans(
         parsePlan(drop),
-        AlterTableDropColumns(
+        DropColumns(
           UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", None),
           Seq(UnresolvedFieldName(Seq("x")),
             UnresolvedFieldName(Seq("y")),
@@ -1055,7 +1055,7 @@ class DDLParserSuite extends AnalysisTest {
 
     comparePlans(
       parsePlan(sql1),
-      AlterTableAlterColumn(
+      AlterColumn(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
         UnresolvedFieldName(Seq("a", "b", "c")),
         Some(IntegerType),
@@ -1065,7 +1065,7 @@ class DDLParserSuite extends AnalysisTest {
 
     comparePlans(
       parsePlan(sql2),
-      AlterTableAlterColumn(
+      AlterColumn(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
         UnresolvedFieldName(Seq("a", "b", "c")),
         Some(IntegerType),
@@ -1075,7 +1075,7 @@ class DDLParserSuite extends AnalysisTest {
 
     comparePlans(
       parsePlan(sql3),
-      AlterTableAlterColumn(
+      AlterColumn(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
         UnresolvedFieldName(Seq("a", "b", "c")),
         Some(IntegerType),
@@ -1099,19 +1099,19 @@ class DDLParserSuite extends AnalysisTest {
 
     comparePlans(
       parsePlan(sql1),
-      AlterTableReplaceColumns(
+      ReplaceColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None),
         Seq(QualifiedColType(None, "x", StringType, true, None, None))))
 
     comparePlans(
       parsePlan(sql2),
-      AlterTableReplaceColumns(
+      ReplaceColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None),
         Seq(QualifiedColType(None, "x", StringType, true, Some("x1"), None))))
 
     comparePlans(
       parsePlan(sql3),
-      AlterTableReplaceColumns(
+      ReplaceColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None),
         Seq(
           QualifiedColType(None, "x", StringType, true, Some("x1"), None),
@@ -1120,7 +1120,7 @@ class DDLParserSuite extends AnalysisTest {
 
     comparePlans(
       parsePlan(sql4),
-      AlterTableReplaceColumns(
+      ReplaceColumns(
         UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None),
         Seq(
           QualifiedColType(None, "x", StringType, true, Some("x1"), None),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 3a2f525..80063cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -46,7 +46,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
   import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
-    case AlterTableAddColumns(ResolvedV1TableIdentifier(ident), cols) =>
+    case AddColumns(ResolvedV1TableIdentifier(ident), cols) =>
       cols.foreach { c =>
         assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand")
         if (!c.nullable) {
@@ -55,10 +55,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
       }
       AlterTableAddColumnsCommand(ident.asTableIdentifier, cols.map(convertToStructField))
 
-    case AlterTableReplaceColumns(ResolvedV1TableIdentifier(_), _) =>
+    case ReplaceColumns(ResolvedV1TableIdentifier(_), _) =>
       throw QueryCompilationErrors.replaceColumnsOnlySupportedWithV2TableError
 
-    case a @ AlterTableAlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, _, _) =>
+    case a @ AlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, _, _) =>
       if (a.column.name.length > 1) {
         throw QueryCompilationErrors.alterQualifiedColumnOnlySupportedWithV2TableError
       }
@@ -87,10 +87,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         builder.build())
       AlterTableChangeColumnCommand(ident.asTableIdentifier, colName, newColumn)
 
-    case AlterTableRenameColumn(ResolvedV1TableIdentifier(_), _, _) =>
+    case RenameColumn(ResolvedV1TableIdentifier(_), _, _) =>
       throw QueryCompilationErrors.renameColumnOnlySupportedWithV2TableError
 
-    case AlterTableDropColumns(ResolvedV1TableIdentifier(_), _) =>
+    case DropColumns(ResolvedV1TableIdentifier(_), _) =>
       throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError
 
     case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 3d69029..1a50c32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.toPrettySQL
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TableChange}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog}
 import org.apache.spark.sql.connector.read.LocalScan
 import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
 import org.apache.spark.sql.connector.write.V1Write
@@ -314,10 +314,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
         ns,
         Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil
 
-    case CommentOnTable(ResolvedTable(catalog, identifier, _, _), comment) =>
-      val changes = TableChange.setProperty(TableCatalog.PROP_COMMENT, comment)
-      AlterTableExec(catalog, identifier, Seq(changes)) :: Nil
-
     case CreateNamespace(catalog, namespace, ifNotExists, properties) =>
       CreateNamespaceExec(catalog, namespace, ifNotExists, properties) :: Nil
 
@@ -424,25 +420,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       }
       UncacheTableExec(r.table, cascade = !isTempView(r.table)) :: Nil
 
-    case SetTableLocation(table: ResolvedTable, partitionSpec, location) =>
-      if (partitionSpec.nonEmpty) {
-        throw QueryCompilationErrors.alterV2TableSetLocationWithPartitionNotSupportedError()
-      }
-      val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location))
-      AlterTableExec(table.catalog, table.identifier, changes) :: Nil
-
-    case SetTableProperties(table: ResolvedTable, props) =>
-      val changes = props.map { case (key, value) =>
-        TableChange.setProperty(key, value)
-      }.toSeq
-      AlterTableExec(table.catalog, table.identifier, changes) :: Nil
-
-    // TODO: v2 `UNSET TBLPROPERTIES` should respect the ifExists flag.
-    case UnsetTableProperties(table: ResolvedTable, keys, _) =>
-      val changes = keys.map(key => TableChange.removeProperty(key))
-      AlterTableExec(table.catalog, table.identifier, changes) :: Nil
-
-    case a: AlterTableColumnCommand if a.table.resolved =>
+    case a: AlterTableCommand if a.table.resolved =>
       val table = a.table.asInstanceOf[ResolvedTable]
       AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
index 763cd6a..1d6e0a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.connector
 
 import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition}
-import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddColumns, AlterTableAlterColumn, AlterTableColumnCommand, AlterTableDropColumns, AlterTableRenameColumn, CreateTableAsSelect, LogicalPlan, QualifiedColType, ReplaceTableAsSelect}
+import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceTableAsSelect}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -140,7 +140,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
     Seq("POINT.Z", "poInt.z", "poInt.Z").foreach { ref =>
       val field = ref.split("\\.")
       alterTableTest(
-        AlterTableAddColumns(
+        AddColumns(
           table,
           Seq(QualifiedColType(
             Some(UnresolvedFieldName(field.init)), field.last, LongType, true, None, None))),
@@ -152,7 +152,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
   test("AlterTable: add column resolution - positional") {
     Seq("ID", "iD").foreach { ref =>
       alterTableTest(
-        AlterTableAddColumns(
+        AddColumns(
           table,
           Seq(QualifiedColType(
             None,
@@ -168,7 +168,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
 
   test("AlterTable: add column resolution - column position referencing new column") {
     alterTableTest(
-      AlterTableAddColumns(
+      AddColumns(
         table,
         Seq(QualifiedColType(
           None,
@@ -191,7 +191,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
   test("AlterTable: add column resolution - nested positional") {
     Seq("X", "Y").foreach { ref =>
       alterTableTest(
-        AlterTableAddColumns(
+        AddColumns(
           table,
           Seq(QualifiedColType(
             Some(UnresolvedFieldName(Seq("point"))),
@@ -207,7 +207,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
 
   test("AlterTable: add column resolution - column position referencing new nested column") {
     alterTableTest(
-      AlterTableAddColumns(
+      AddColumns(
         table,
         Seq(QualifiedColType(
           Some(UnresolvedFieldName(Seq("point"))),
@@ -229,7 +229,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
 
   test("SPARK-36372: Adding duplicate columns should not be allowed") {
     alterTableTest(
-      AlterTableAddColumns(
+      AddColumns(
         table,
         Seq(QualifiedColType(
           Some(UnresolvedFieldName(Seq("point"))),
@@ -252,7 +252,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
   test("AlterTable: drop column resolution") {
     Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
       alterTableTest(
-        AlterTableDropColumns(table, Seq(UnresolvedFieldName(ref))),
+        DropColumns(table, Seq(UnresolvedFieldName(ref))),
         Seq("Missing field " + ref.quoted)
       )
     }
@@ -261,7 +261,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
   test("AlterTable: rename column resolution") {
     Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
       alterTableTest(
-        AlterTableRenameColumn(table, UnresolvedFieldName(ref), "newName"),
+        RenameColumn(table, UnresolvedFieldName(ref), "newName"),
         Seq("Missing field " + ref.quoted)
       )
     }
@@ -270,7 +270,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
   test("AlterTable: drop column nullability resolution") {
     Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
       alterTableTest(
-        AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, Some(true), None, None),
+        AlterColumn(table, UnresolvedFieldName(ref), None, Some(true), None, None),
         Seq("Missing field " + ref.quoted)
       )
     }
@@ -279,7 +279,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
   test("AlterTable: change column type resolution") {
     Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
       alterTableTest(
-        AlterTableAlterColumn(table, UnresolvedFieldName(ref), Some(StringType), None, None, None),
+        AlterColumn(table, UnresolvedFieldName(ref), Some(StringType), None, None, None),
         Seq("Missing field " + ref.quoted)
       )
     }
@@ -288,14 +288,14 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
   test("AlterTable: change column comment resolution") {
     Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
       alterTableTest(
-        AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, None, Some("comment"), None),
+        AlterColumn(table, UnresolvedFieldName(ref), None, None, Some("comment"), None),
         Seq("Missing field " + ref.quoted)
       )
     }
   }
 
   private def alterTableTest(
-      alter: AlterTableColumnCommand,
+      alter: AlterTableCommand,
       error: Seq[String],
       expectErrorOnCaseSensitive: Boolean = true): Unit = {
     Seq(true, false).foreach { caseSensitive =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 25a8c4e..2c2f833 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat,
 import org.apache.spark.sql.catalyst.expressions.{AnsiCast, AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral}
 import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
-import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.FakeV2Provider
 import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, V1Table}
@@ -1132,7 +1132,7 @@ class PlanResolutionSuite extends AnalysisTest {
             "ALTER COLUMN with qualified column is only supported with v2 tables"))
         } else {
           parsed1 match {
-            case AlterTableAlterColumn(
+            case AlterColumn(
                 _: ResolvedTable,
                 column: ResolvedFieldName,
                 Some(LongType),
@@ -1144,7 +1144,7 @@ class PlanResolutionSuite extends AnalysisTest {
           }
 
           parsed2 match {
-            case AlterTableAlterColumn(
+            case AlterColumn(
                 _: ResolvedTable,
                 column: ResolvedFieldName,
                 None,
@@ -1198,14 +1198,14 @@ class PlanResolutionSuite extends AnalysisTest {
   test("alter table: hive style change column") {
     Seq("v2Table", "testcat.tab").foreach { tblName =>
       parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i int COMMENT 'an index'") match {
-        case AlterTableAlterColumn(
+        case AlterColumn(
             _: ResolvedTable, _: ResolvedFieldName, None, None, Some(comment), None) =>
           assert(comment == "an index")
         case _ => fail("expect AlterTableAlterColumn with comment change only")
       }
 
       parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i long COMMENT 'an index'") match {
-        case AlterTableAlterColumn(
+        case AlterColumn(
             _: ResolvedTable, _: ResolvedFieldName, Some(dataType), None, Some(comment), None) =>
           assert(comment == "an index")
           assert(dataType == LongType)
@@ -1241,7 +1241,7 @@ class PlanResolutionSuite extends AnalysisTest {
       val catalog = if (isSessionCatalog) v2SessionCatalog else testCat
       val tableIdent = if (isSessionCatalog) "v2Table" else "tab"
       parsed match {
-        case AlterTableAlterColumn(r: ResolvedTable, _, _, _, _, _) =>
+        case AlterColumn(r: ResolvedTable, _, _, _, _, _) =>
           assert(r.catalog == catalog)
           assert(r.identifier.name() == tableIdent)
         case Project(_, AsDataSourceV2Relation(r)) =>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org