You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/05 04:26:56 UTC

[flink] branch master updated: [FLINK-30497][table-api] Introduce TableChange to represent DROP change

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cce059c556b [FLINK-30497][table-api] Introduce TableChange to represent DROP change
cce059c556b is described below

commit cce059c556bc2538cba8918e281320516c4be222
Author: Shengkai <33...@users.noreply.github.com>
AuthorDate: Thu Jan 5 12:26:48 2023 +0800

    [FLINK-30497][table-api] Introduce TableChange to represent DROP change
    
    This closes #21592
---
 .../hive/parse/HiveParserDDLSemanticAnalyzer.java  |   1 -
 .../operations/ddl/AlterTableChangeOperation.java  |   9 +
 .../apache/flink/table/catalog/TableChange.java    | 162 ++++++++
 .../planner/expressions/ColumnReferenceFinder.java |   7 +-
 .../planner/operations/AlterSchemaConverter.java   | 430 ++++++++++++---------
 .../operations/SqlToOperationConverter.java        |  88 +----
 .../planner/utils/OperationConverterUtils.java     |  10 +-
 .../expressions/ColumnReferenceFinderTest.java     |   5 +-
 .../operations/SqlToOperationConverterTest.java    | 102 +++--
 9 files changed, 494 insertions(+), 320 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index 79cef5614be..fc7913c4efe 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -2006,7 +2006,6 @@ public class HiveParserDDLSemanticAnalyzer {
         final int numPartCol = oldTable.getPartitionKeys().size();
         TableSchema.Builder builder = TableSchema.builder();
         // add existing non-part col if we're not replacing
-        // TODO: support TableChange in FLINK-30497
         if (!replace) {
             List<TableColumn> nonPartCols =
                     oldSchema.getTableColumns().subList(0, oldSchema.getFieldCount() - numPartCol);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
index e8ef9519de1..ad083d1e574 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
@@ -119,6 +119,15 @@ public class AlterTableChangeOperation extends AlterTableOperation {
             TableChange.ModifyUniqueConstraint modifyUniqueConstraint =
                     (TableChange.ModifyUniqueConstraint) tableChange;
             return String.format("  MODIFY %s", modifyUniqueConstraint.getNewConstraint());
+        } else if (tableChange instanceof TableChange.DropColumn) {
+            TableChange.DropColumn dropColumn = (TableChange.DropColumn) tableChange;
+            return String.format(
+                    "  DROP %s", EncodingUtils.escapeIdentifier(dropColumn.getColumnName()));
+        } else if (tableChange instanceof TableChange.DropConstraint) {
+            TableChange.DropConstraint dropConstraint = (TableChange.DropConstraint) tableChange;
+            return String.format("  DROP CONSTRAINT %s", dropConstraint.getConstraintName());
+        } else if (tableChange instanceof TableChange.DropWatermark) {
+            return "  DROP WATERMARK";
         } else {
             throw new UnsupportedOperationException(
                     String.format("Unknown table change: %s.", tableChange));
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
index 240148836c8..34eba1d6845 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
@@ -229,6 +229,53 @@ public interface TableChange {
         return new ModifyWatermark(newWatermarkSpec);
     }
 
+    /**
+     * A table change to drop column.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; DROP COLUMN &lt;column_name&gt;
+     * </pre>
+     *
+     * @param columnName the column to drop.
+     * @return a TableChange represents the modification.
+     */
+    static DropColumn dropColumn(String columnName) {
+        return new DropColumn(columnName);
+    }
+
+    /**
+     * A table change to drop watermark.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; DROP WATERMARK
+     * </pre>
+     *
+     * @return a TableChange represents the modification.
+     */
+    static DropWatermark dropWatermark() {
+        return DropWatermark.INSTANCE;
+    }
+
+    /**
+     * A table change to drop constraint.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; DROP CONSTRAINT &lt;constraint_name&gt;
+     * </pre>
+     *
+     * @param constraintName the constraint to drop.
+     * @return a TableChange represents the modification.
+     */
+    static DropConstraint dropConstraint(String constraintName) {
+        return new DropConstraint(constraintName);
+    }
+
     /**
      * A table change to set the table option.
      *
@@ -766,6 +813,121 @@ public interface TableChange {
         }
     }
 
+    // --------------------------------------------------------------------------------------------
+    // Drop Change
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * A table change to drop the column.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; DROP COLUMN &lt;column_name&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class DropColumn implements TableChange {
+
+        private final String columnName;
+
+        private DropColumn(String columnName) {
+            this.columnName = columnName;
+        }
+
+        /** Returns the column name. */
+        public String getColumnName() {
+            return columnName;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof DropColumn)) {
+                return false;
+            }
+            DropColumn that = (DropColumn) o;
+            return Objects.equals(columnName, that.columnName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(columnName);
+        }
+
+        @Override
+        public String toString() {
+            return "DropColumn{" + "columnName='" + columnName + '\'' + '}';
+        }
+    }
+
+    /**
+     * A table change to drop the watermark.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; DROP WATERMARK
+     * </pre>
+     */
+    @PublicEvolving
+    class DropWatermark implements TableChange {
+        static final DropWatermark INSTANCE = new DropWatermark();
+
+        @Override
+        public String toString() {
+            return "DropWatermark";
+        }
+    }
+
+    /**
+     * A table change to drop the constraints.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; DROP CONSTRAINT &lt;constraint_name&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class DropConstraint implements TableChange {
+
+        private final String constraintName;
+
+        private DropConstraint(String constraintName) {
+            this.constraintName = constraintName;
+        }
+
+        /** Returns the constraint name. */
+        public String getConstraintName() {
+            return constraintName;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof DropConstraint)) {
+                return false;
+            }
+            DropConstraint that = (DropConstraint) o;
+            return Objects.equals(constraintName, that.constraintName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(constraintName);
+        }
+
+        @Override
+        public String toString() {
+            return "DropConstraint{" + "constraintName='" + constraintName + '\'' + '}';
+        }
+    }
+
     // --------------------------------------------------------------------------------------------
     // Property change
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
index d3593cb4d85..60216e5bd05 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
@@ -37,6 +37,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /** A finder used to look up referenced column name in a {@link ResolvedExpression}. */
 public class ColumnReferenceFinder {
@@ -81,7 +82,11 @@ public class ColumnReferenceFinder {
     public static Set<String> findWatermarkReferencedColumn(ResolvedSchema schema) {
         ColumnReferenceVisitor visitor = new ColumnReferenceVisitor(schema.getColumnNames());
         return schema.getWatermarkSpecs().stream()
-                .flatMap(spec -> visitor.visit(spec.getWatermarkExpression()).stream())
+                .flatMap(
+                        spec ->
+                                Stream.concat(
+                                        visitor.visit(spec.getWatermarkExpression()).stream(),
+                                        Stream.of(spec.getRowtimeAttribute())))
                 .collect(Collectors.toSet());
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
index 193e47c168e..57f6fae4384 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.operations;
 
+import org.apache.flink.sql.parser.ddl.SqlAlterTable;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableDropColumn;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint;
@@ -33,19 +34,24 @@ import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.SchemaResolver;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.UniqueConstraint;
-import org.apache.flink.table.catalog.WatermarkSpec;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.expressions.ColumnReferenceFinder;
 import org.apache.flink.table.planner.utils.OperationConverterUtils;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.calcite.rel.type.RelDataType;
@@ -59,6 +65,7 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -66,7 +73,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -86,67 +92,47 @@ public class AlterSchemaConverter {
     private final SqlValidator sqlValidator;
     private final Function<SqlNode, String> escapeExpression;
     private final Consumer<SqlTableConstraint> constraintValidator;
-    private final SchemaResolver schemaResolver;
+    private final CatalogManager catalogManager;
 
     AlterSchemaConverter(
             SqlValidator sqlValidator,
             Consumer<SqlTableConstraint> constraintValidator,
             Function<SqlNode, String> escapeExpression,
-            SchemaResolver schemaResolver) {
+            CatalogManager catalogManager) {
         this.sqlValidator = sqlValidator;
         this.escapeExpression = escapeExpression;
         this.constraintValidator = constraintValidator;
-        this.schemaResolver = schemaResolver;
+        this.catalogManager = catalogManager;
     }
 
     /**
      * Convert ALTER TABLE ADD | MODIFY (&lt;schema_component&gt; [, &lt;schema_component&gt;, ...])
      * to generate an updated Schema.
      */
-    public Schema applySchemaChange(
-            SqlAlterTableSchema alterTableSchema,
-            ResolvedCatalogTable oldTable,
-            List<TableChange> tableChangeCollector) {
-        AlterSchemaStrategy strategy = computeAlterSchemaStrategy(alterTableSchema);
-        SchemaConverter converter =
-                strategy == AlterSchemaStrategy.ADD
-                        ? new AddSchemaConverter(
-                                oldTable.getUnresolvedSchema(),
-                                (FlinkTypeFactory) sqlValidator.getTypeFactory(),
-                                sqlValidator,
-                                constraintValidator,
-                                escapeExpression,
-                                schemaResolver,
-                                tableChangeCollector)
-                        : new ModifySchemaConverter(
-                                oldTable,
-                                (FlinkTypeFactory) sqlValidator.getTypeFactory(),
-                                sqlValidator,
-                                constraintValidator,
-                                escapeExpression,
-                                schemaResolver,
-                                tableChangeCollector);
+    public Operation convertAlterSchema(
+            SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable oldTable) {
+        SchemaConverter converter = createSchemaConverter(alterTableSchema, oldTable);
         converter.updateColumn(alterTableSchema.getColumnPositions().getList());
         alterTableSchema.getWatermark().ifPresent(converter::updateWatermark);
         alterTableSchema.getFullConstraint().ifPresent(converter::updatePrimaryKey);
-        return converter.convert();
+
+        return buildAlterTableChangeOperation(
+                alterTableSchema, converter.changesCollector, converter.convert(), oldTable);
     }
 
     /** Convert ALTER TABLE RENAME col_name to new_col_name to generate an updated Schema. */
-    public Schema applySchemaChange(
-            SqlAlterTableRenameColumn renameColumn,
-            ResolvedCatalogTable oldTable,
-            List<TableChange> tableChangeCollector) {
+    public Operation convertAlterSchema(
+            SqlAlterTableRenameColumn renameColumn, ResolvedCatalogTable oldTable) {
         String oldColumnName = getColumnName(renameColumn.getOldColumnIdentifier());
         String newColumnName = getColumnName(renameColumn.getNewColumnIdentifier());
-        // validate old column is exists, new column name does not collide with existed column
-        // names, and old column isn't referenced by computed column
-        validateColumnName(
-                oldColumnName,
-                newColumnName,
-                oldTable.getResolvedSchema(),
-                oldTable.getPartitionKeys());
-        validateWatermark(oldTable, oldColumnName);
+
+        ReferencesManager.create(oldTable).checkReferences(oldColumnName);
+        if (oldTable.getResolvedSchema().getColumn(newColumnName).isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "%sThe column `%s` already existed in table schema.",
+                            EX_MSG_PREFIX, newColumnName));
+        }
 
         // generate new schema
         Schema.Builder schemaBuilder = Schema.newBuilder();
@@ -156,12 +142,6 @@ public class AlterSchemaConverter {
                 (builder, column) -> {
                     if (column.getName().equals(oldColumnName)) {
                         buildNewColumnFromOldColumn(builder, column, newColumnName);
-                        tableChangeCollector.add(
-                                TableChange.modifyColumnName(
-                                        unwrap(
-                                                oldTable.getResolvedSchema()
-                                                        .getColumn(oldColumnName)),
-                                        newColumnName));
                     } else {
                         builder.fromColumns(Collections.singletonList(column));
                     }
@@ -169,11 +149,19 @@ public class AlterSchemaConverter {
         buildUpdatedPrimaryKey(
                 schemaBuilder, oldTable, (pk) -> pk.equals(oldColumnName) ? newColumnName : pk);
         buildUpdatedWatermark(schemaBuilder, oldTable);
-        return schemaBuilder.build();
+
+        return buildAlterTableChangeOperation(
+                renameColumn,
+                Collections.singletonList(
+                        TableChange.modifyColumnName(
+                                unwrap(oldTable.getResolvedSchema().getColumn(oldColumnName)),
+                                newColumnName)),
+                schemaBuilder.build(),
+                oldTable);
     }
 
     /** Convert ALTER TABLE DROP (col1 [, col2, ...]) to generate an updated Schema. */
-    public Schema applySchemaChange(
+    public Operation convertAlterSchema(
             SqlAlterTableDropColumn dropColumn, ResolvedCatalogTable oldTable) {
         Set<String> columnsToDrop = new HashSet<>();
         dropColumn
@@ -188,18 +176,25 @@ public class AlterSchemaConverter {
                             }
                         });
 
-        Schema.Builder schemaBuilder = Schema.newBuilder();
-        for (SqlNode columnIdentifier : dropColumn.getColumnList()) {
-            String columnToDrop = getColumnName((SqlIdentifier) columnIdentifier);
-            // validate the column to drop exists in the table schema, is not a primary key and
-            // does not derive any computed column
-            validateColumnName(
-                    columnToDrop,
-                    oldTable.getResolvedSchema(),
-                    oldTable.getPartitionKeys(),
-                    columnsToDrop);
-            validateWatermark(oldTable, columnToDrop);
+        ReferencesManager referencesManager = ReferencesManager.create(oldTable);
+        // Sort by dependencies count from smallest to largest. For example, when dropping column a,
+        // b(b as a+1), the order should be: [b, a] after sort.
+        List<String> sortedColumnsToDrop =
+                columnsToDrop.stream()
+                        .sorted(
+                                Comparator.comparingInt(
+                                                col ->
+                                                        referencesManager.getColumnDependencyCount(
+                                                                (String) col))
+                                        .reversed())
+                        .collect(Collectors.toList());
+        List<TableChange> tableChanges = new ArrayList<>(sortedColumnsToDrop.size());
+        for (String columnToDrop : sortedColumnsToDrop) {
+            referencesManager.dropColumn(columnToDrop);
+            tableChanges.add(TableChange.dropColumn(columnToDrop));
         }
+
+        Schema.Builder schemaBuilder = Schema.newBuilder();
         buildUpdatedColumn(
                 schemaBuilder,
                 oldTable,
@@ -210,11 +205,13 @@ public class AlterSchemaConverter {
                 });
         buildUpdatedPrimaryKey(schemaBuilder, oldTable, Function.identity());
         buildUpdatedWatermark(schemaBuilder, oldTable);
-        return schemaBuilder.build();
+
+        return buildAlterTableChangeOperation(
+                dropColumn, tableChanges, schemaBuilder.build(), oldTable);
     }
 
     /** Convert ALTER TABLE DROP PRIMARY KEY to generate an updated Schema. */
-    public Schema applySchemaChange(
+    public Operation convertAlterSchema(
             SqlAlterTableDropPrimaryKey dropPrimaryKey, ResolvedCatalogTable oldTable) {
         Optional<UniqueConstraint> pkConstraint = oldTable.getResolvedSchema().getPrimaryKey();
         if (!pkConstraint.isPresent()) {
@@ -228,13 +225,18 @@ public class AlterSchemaConverter {
                 oldTable,
                 (builder, column) -> builder.fromColumns(Collections.singletonList(column)));
         buildUpdatedWatermark(schemaBuilder, oldTable);
-        return schemaBuilder.build();
+
+        return buildAlterTableChangeOperation(
+                dropPrimaryKey,
+                Collections.singletonList(TableChange.dropConstraint(pkConstraint.get().getName())),
+                schemaBuilder.build(),
+                oldTable);
     }
 
     /**
      * Convert ALTER TABLE DROP CONSTRAINT constraint_name to generate an updated {@link Schema}.
      */
-    public Schema applySchemaChange(
+    public Operation convertAlterSchema(
             SqlAlterTableDropConstraint dropConstraint, ResolvedCatalogTable oldTable) {
         Optional<UniqueConstraint> pkConstraint = oldTable.getResolvedSchema().getPrimaryKey();
         if (!pkConstraint.isPresent()) {
@@ -258,11 +260,16 @@ public class AlterSchemaConverter {
                 oldTable,
                 (builder, column) -> builder.fromColumns(Collections.singletonList(column)));
         buildUpdatedWatermark(schemaBuilder, oldTable);
-        return schemaBuilder.build();
+
+        return buildAlterTableChangeOperation(
+                dropConstraint,
+                Collections.singletonList(TableChange.dropConstraint(constraintName)),
+                schemaBuilder.build(),
+                oldTable);
     }
 
     /** Convert ALTER TABLE DROP WATERMARK to generate an updated {@link Schema}. */
-    public Schema applySchemaChange(
+    public Operation convertAlterSchema(
             SqlAlterTableDropWatermark dropWatermark, ResolvedCatalogTable oldTable) {
         if (oldTable.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
             throw new ValidationException(
@@ -270,13 +277,19 @@ public class AlterSchemaConverter {
                             "%sThe base table does not define any watermark strategy.",
                             EX_MSG_PREFIX));
         }
+
         Schema.Builder schemaBuilder = Schema.newBuilder();
         buildUpdatedColumn(
                 schemaBuilder,
                 oldTable,
                 (builder, column) -> builder.fromColumns(Collections.singletonList(column)));
         buildUpdatedPrimaryKey(schemaBuilder, oldTable, Function.identity());
-        return schemaBuilder.build();
+
+        return buildAlterTableChangeOperation(
+                dropWatermark,
+                Collections.singletonList(TableChange.dropWatermark()),
+                schemaBuilder.build(),
+                oldTable);
     }
 
     // --------------------------------------------------------------------------------------------
@@ -304,14 +317,13 @@ public class AlterSchemaConverter {
                 SqlValidator sqlValidator,
                 Consumer<SqlTableConstraint> constraintValidator,
                 Function<SqlNode, String> escapeExpressions,
-                SchemaResolver schemaResolver,
-                List<TableChange> changesCollector) {
+                SchemaResolver schemaResolver) {
             this.typeFactory = typeFactory;
             this.sqlValidator = sqlValidator;
             this.constraintValidator = constraintValidator;
             this.escapeExpressions = escapeExpressions;
             this.schemaResolver = schemaResolver;
-            this.changesCollector = changesCollector;
+            this.changesCollector = new ArrayList<>();
             populateColumnsFromSourceTable(oldSchema);
             populatePrimaryKeyFromSourceTable(oldSchema);
             populateWatermarkFromSourceTable(oldSchema);
@@ -540,16 +552,14 @@ public class AlterSchemaConverter {
                 SqlValidator sqlValidator,
                 Consumer<SqlTableConstraint> constraintValidator,
                 Function<SqlNode, String> escapeExpressions,
-                SchemaResolver schemaResolver,
-                List<TableChange> changeCollector) {
+                SchemaResolver schemaResolver) {
             super(
                     oldSchema,
                     typeFactory,
                     sqlValidator,
                     constraintValidator,
                     escapeExpressions,
-                    schemaResolver,
-                    changeCollector);
+                    schemaResolver);
         }
 
         @Override
@@ -634,16 +644,14 @@ public class AlterSchemaConverter {
                 SqlValidator sqlValidator,
                 Consumer<SqlTableConstraint> constraintValidator,
                 Function<SqlNode, String> escapeExpressions,
-                SchemaResolver schemaResolver,
-                List<TableChange> tableChangeCollector) {
+                SchemaResolver schemaResolver) {
             super(
                     oldTable.getUnresolvedSchema(),
                     typeFactory,
                     sqlValidator,
                     constraintValidator,
                     escapeExpressions,
-                    schemaResolver,
-                    tableChangeCollector);
+                    schemaResolver);
             this.oldTable = oldTable;
         }
 
@@ -728,114 +736,141 @@ public class AlterSchemaConverter {
         }
     }
 
-    // --------------------------------------------------------------------------------------------
-
-    private void validateColumnName(
-            String oldColumnName,
-            String newColumnName,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys) {
-        validateColumnName(
-                oldColumnName,
-                oldSchema,
-                partitionKeys,
-                // fail the operation of renaming column, once the column derives a computed column
-                (referencedColumn, computedColumn) -> referencedColumn.contains(oldColumnName));
-        // validate new column
-        if (oldSchema.getColumn(newColumnName).isPresent()) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` already existed in table schema.",
-                            EX_MSG_PREFIX, newColumnName));
-        }
-    }
+    private static class ReferencesManager {
+
+        /** Available columns in the table. */
+        private final Set<String> columns;
+
+        /**
+         * Mappings about the column refers which columns, e.g. column `b` refers to the column `a`
+         * in the expression "b as a+1".
+         */
+        private final Map<String, Set<String>> columnToReferences;
+
+        /**
+         * Reverse mappings about the column refers which columns, e.g. column `a` has the
+         * dependency of column `b` in the expression "b as a+1".
+         */
+        private final Map<String, Set<String>> columnToDependencies;
+
+        /** Primary keys defined on the table. */
+        private final Set<String> primaryKeys;
+
+        /** The name of the column watermark expression depends on. */
+        private final Set<String> watermarkReferences;
+
+        /** The name of the column partition keys contains. */
+        private final Set<String> partitionKeys;
+
+        private ReferencesManager(
+                Set<String> columns,
+                Map<String, Set<String>> columnToReferences,
+                Map<String, Set<String>> columnToDependencies,
+                Set<String> primaryKeys,
+                Set<String> watermarkReferences,
+                Set<String> partitionKeys) {
+            this.columns = columns;
+            this.columnToReferences = columnToReferences;
+            this.columnToDependencies = columnToDependencies;
+            this.primaryKeys = primaryKeys;
+            this.watermarkReferences = watermarkReferences;
+            this.partitionKeys = partitionKeys;
+        }
+
+        static ReferencesManager create(ResolvedCatalogTable catalogTable) {
+            Map<String, Set<String>> columnToReferences = new HashMap<>();
+            Map<String, Set<String>> columnToDependencies = new HashMap<>();
+            catalogTable.getResolvedSchema().getColumns().stream()
+                    .filter(column -> column instanceof Column.ComputedColumn)
+                    .forEach(
+                            column -> {
+                                Set<String> referencedColumns =
+                                        ColumnReferenceFinder.findReferencedColumn(
+                                                column.getName(), catalogTable.getResolvedSchema());
+                                for (String referencedColumn : referencedColumns) {
+                                    columnToReferences
+                                            .computeIfAbsent(
+                                                    referencedColumn, key -> new HashSet<>())
+                                            .add(column.getName());
+                                    columnToDependencies
+                                            .computeIfAbsent(
+                                                    column.getName(), key -> new HashSet<>())
+                                            .add(referencedColumn);
+                                }
+                            });
 
-    private void validateColumnName(
-            String columnToDrop,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys,
-            Set<String> columnsToDrop) {
-        validateColumnName(
-                columnToDrop,
-                oldSchema,
-                partitionKeys,
-                // fail the operation of dropping column, only if the column derives a computed
-                // column, and the computed column is not being dropped along with the old column
-                (referencedColumn, computedColumn) ->
-                        referencedColumn.contains(columnToDrop)
-                                && !columnsToDrop.contains(computedColumn.getName()));
-        oldSchema
-                .getPrimaryKey()
-                .ifPresent(
-                        pk -> {
-                            if (pk.getColumns().contains(columnToDrop)) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "%sThe column `%s` is used as the primary key.",
-                                                EX_MSG_PREFIX, columnToDrop));
-                            }
-                        });
-    }
+            return new ReferencesManager(
+                    new HashSet<>(catalogTable.getResolvedSchema().getColumnNames()),
+                    columnToReferences,
+                    columnToDependencies,
+                    catalogTable
+                            .getResolvedSchema()
+                            .getPrimaryKey()
+                            .map(constraint -> new HashSet<>(constraint.getColumns()))
+                            .orElse(new HashSet<>()),
+                    ColumnReferenceFinder.findWatermarkReferencedColumn(
+                            catalogTable.getResolvedSchema()),
+                    new HashSet<>(catalogTable.getPartitionKeys()));
+        }
+
+        void dropColumn(String columnName) {
+            checkReferences(columnName);
+            if (primaryKeys.contains(columnName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column %s is used as the primary key.",
+                                EX_MSG_PREFIX, EncodingUtils.escapeIdentifier(columnName)));
+            }
 
-    private void validateColumnName(
-            String columnToAlter,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys,
-            BiFunction<Set<String>, Column.ComputedColumn, Boolean> computedColumnChecker) {
-        // validate old column
-        Set<String> tableColumns = new HashSet<>(oldSchema.getColumnNames());
-        if (!tableColumns.contains(columnToAlter)) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` does not exist in the base table.",
-                            EX_MSG_PREFIX, columnToAlter));
+            columnToDependencies
+                    .getOrDefault(columnName, Collections.emptySet())
+                    .forEach(
+                            referredColumn ->
+                                    columnToReferences.get(referredColumn).remove(columnName));
+            columnToDependencies.remove(columnName);
+            columns.remove(columnName);
         }
 
-        // validate old column name isn't referred by computed column case
-        oldSchema.getColumns().stream()
-                .filter(column -> column instanceof Column.ComputedColumn)
-                .forEach(
-                        column -> {
-                            Column.ComputedColumn computedColumn = (Column.ComputedColumn) column;
-                            Set<String> referencedColumn =
-                                    ColumnReferenceFinder.findReferencedColumn(
-                                            computedColumn.getName(), oldSchema);
-                            if (computedColumnChecker.apply(referencedColumn, computedColumn)) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "%sThe column `%s` is referenced by computed column %s.",
-                                                EX_MSG_PREFIX,
-                                                columnToAlter,
-                                                computedColumn.asSummaryString()));
-                            }
-                        });
-        // validate partition keys doesn't contain the old column
-        if (partitionKeys.contains(columnToAlter)) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` is used as the partition keys.",
-                            EX_MSG_PREFIX, columnToAlter));
+        int getColumnDependencyCount(String columnName) {
+            return columnToDependencies.getOrDefault(columnName, Collections.emptySet()).size();
         }
-    }
 
-    private void validateWatermark(ResolvedCatalogTable oldTable, String columnToAlter) {
-        // validate old column isn't referenced by watermark
-        List<WatermarkSpec> watermarkSpecs = oldTable.getResolvedSchema().getWatermarkSpecs();
-        Set<String> referencedColumns =
-                ColumnReferenceFinder.findWatermarkReferencedColumn(oldTable.getResolvedSchema());
-        Set<String> rowtimeAttributes =
-                oldTable.getResolvedSchema().getWatermarkSpecs().stream()
-                        .map(WatermarkSpec::getRowtimeAttribute)
-                        .collect(Collectors.toSet());
-        if (rowtimeAttributes.contains(columnToAlter)
-                || referencedColumns.contains(columnToAlter)) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` is referenced by watermark expression %s.",
-                            EX_MSG_PREFIX, columnToAlter, watermarkSpecs));
+        void checkReferences(String columnName) {
+            if (!columns.contains(columnName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column `%s` does not exist in the base table.",
+                                EX_MSG_PREFIX, columnName));
+            }
+            if (columnToReferences.containsKey(columnName)
+                    && !columnToReferences.get(columnName).isEmpty()) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column %s is referenced by computed column %s.",
+                                EX_MSG_PREFIX,
+                                EncodingUtils.escapeIdentifier(columnName),
+                                columnToReferences.get(columnName).stream()
+                                        .map(EncodingUtils::escapeIdentifier)
+                                        .sorted()
+                                        .collect(Collectors.joining(", "))));
+            }
+            if (partitionKeys.contains(columnName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column `%s` is used as the partition keys.",
+                                EX_MSG_PREFIX, columnName));
+            }
+            if (watermarkReferences.contains(columnName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column `%s` is referenced by watermark expression.",
+                                EX_MSG_PREFIX, columnName));
+            }
         }
     }
 
+    // --------------------------------------------------------------------------------------------
+
     private void buildUpdatedColumn(
             Schema.Builder builder,
             ResolvedCatalogTable oldTable,
@@ -893,6 +928,22 @@ public class AlterSchemaConverter {
         oldColumn.getComment().ifPresent(builder::withComment);
     }
 
+    private Operation buildAlterTableChangeOperation(
+            SqlAlterTable alterTable,
+            List<TableChange> tableChanges,
+            Schema newSchema,
+            ResolvedCatalogTable oldTable) {
+        return new AlterTableChangeOperation(
+                catalogManager.qualifyIdentifier(
+                        UnresolvedIdentifier.of(alterTable.fullTableName())),
+                tableChanges,
+                CatalogTable.of(
+                        newSchema,
+                        oldTable.getComment(),
+                        oldTable.getPartitionKeys(),
+                        oldTable.getOptions()));
+    }
+
     private static String getColumnName(SqlIdentifier identifier) {
         if (!identifier.isSimple()) {
             throw new UnsupportedOperationException(
@@ -903,11 +954,24 @@ public class AlterSchemaConverter {
         return identifier.getSimple();
     }
 
-    private AlterSchemaStrategy computeAlterSchemaStrategy(SqlAlterTableSchema alterTableSchema) {
+    private SchemaConverter createSchemaConverter(
+            SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable oldTable) {
         if (alterTableSchema instanceof SqlAlterTableAdd) {
-            return AlterSchemaStrategy.ADD;
+            return new AddSchemaConverter(
+                    oldTable.getUnresolvedSchema(),
+                    (FlinkTypeFactory) sqlValidator.getTypeFactory(),
+                    sqlValidator,
+                    constraintValidator,
+                    escapeExpression,
+                    catalogManager.getSchemaResolver());
         } else if (alterTableSchema instanceof SqlAlterTableModify) {
-            return AlterSchemaStrategy.MODIFY;
+            return new ModifySchemaConverter(
+                    oldTable,
+                    (FlinkTypeFactory) sqlValidator.getTypeFactory(),
+                    sqlValidator,
+                    constraintValidator,
+                    escapeExpression,
+                    catalogManager.getSchemaResolver());
         }
         throw new UnsupportedOperationException(
                 String.format(
@@ -918,10 +982,4 @@ public class AlterSchemaConverter {
     private <T> T unwrap(Optional<T> value) {
         return value.orElseThrow(() -> new TableException("The value should never be empty."));
     }
-
-    /** A strategy to describe the alter schema kind. */
-    private enum AlterSchemaStrategy {
-        ADD,
-        MODIFY
-    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index bd5aa2ec984..7bab1359df4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -166,7 +166,6 @@ import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
 import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
-import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
 import org.apache.flink.table.operations.ddl.AlterViewAsOperation;
 import org.apache.flink.table.operations.ddl.AlterViewPropertiesOperation;
 import org.apache.flink.table.operations.ddl.AlterViewRenameOperation;
@@ -255,7 +254,7 @@ public class SqlToOperationConverter {
                         flinkPlanner.getOrCreateSqlValidator(),
                         this::validateTableConstraint,
                         this::getQuotedSqlString,
-                        catalogManager.getSchemaResolver());
+                        catalogManager);
     }
 
     /**
@@ -506,44 +505,17 @@ public class SqlToOperationConverter {
             return convertAlterTableReset(
                     tableIdentifier, (CatalogTable) baseTable, (SqlAlterTableReset) sqlAlterTable);
         } else if (sqlAlterTable instanceof SqlAlterTableDropColumn) {
-            return new AlterTableSchemaOperation(
-                    tableIdentifier,
-                    CatalogTable.of(
-                            alterSchemaConverter.applySchemaChange(
-                                    (SqlAlterTableDropColumn) sqlAlterTable, resolvedCatalogTable),
-                            resolvedCatalogTable.getComment(),
-                            resolvedCatalogTable.getPartitionKeys(),
-                            resolvedCatalogTable.getOptions()));
+            return alterSchemaConverter.convertAlterSchema(
+                    (SqlAlterTableDropColumn) sqlAlterTable, resolvedCatalogTable);
         } else if (sqlAlterTable instanceof SqlAlterTableDropPrimaryKey) {
-            return new AlterTableSchemaOperation(
-                    tableIdentifier,
-                    CatalogTable.of(
-                            alterSchemaConverter.applySchemaChange(
-                                    (SqlAlterTableDropPrimaryKey) sqlAlterTable,
-                                    resolvedCatalogTable),
-                            resolvedCatalogTable.getComment(),
-                            resolvedCatalogTable.getPartitionKeys(),
-                            resolvedCatalogTable.getOptions()));
+            return alterSchemaConverter.convertAlterSchema(
+                    (SqlAlterTableDropPrimaryKey) sqlAlterTable, resolvedCatalogTable);
         } else if (sqlAlterTable instanceof SqlAlterTableDropConstraint) {
-            return new AlterTableSchemaOperation(
-                    tableIdentifier,
-                    CatalogTable.of(
-                            alterSchemaConverter.applySchemaChange(
-                                    (SqlAlterTableDropConstraint) sqlAlterTable,
-                                    resolvedCatalogTable),
-                            resolvedCatalogTable.getComment(),
-                            resolvedCatalogTable.getPartitionKeys(),
-                            resolvedCatalogTable.getOptions()));
+            return alterSchemaConverter.convertAlterSchema(
+                    (SqlAlterTableDropConstraint) sqlAlterTable, resolvedCatalogTable);
         } else if (sqlAlterTable instanceof SqlAlterTableDropWatermark) {
-            return new AlterTableSchemaOperation(
-                    tableIdentifier,
-                    CatalogTable.of(
-                            alterSchemaConverter.applySchemaChange(
-                                    (SqlAlterTableDropWatermark) sqlAlterTable,
-                                    resolvedCatalogTable),
-                            resolvedCatalogTable.getComment(),
-                            resolvedCatalogTable.getPartitionKeys(),
-                            resolvedCatalogTable.getOptions()));
+            return alterSchemaConverter.convertAlterSchema(
+                    (SqlAlterTableDropWatermark) sqlAlterTable, resolvedCatalogTable);
         } else if (sqlAlterTable instanceof SqlAddReplaceColumns) {
             return OperationConverterUtils.convertAddReplaceColumns(
                     tableIdentifier,
@@ -554,24 +526,11 @@ public class SqlToOperationConverter {
             return OperationConverterUtils.convertChangeColumn(
                     tableIdentifier,
                     (SqlChangeColumn) sqlAlterTable,
-                    (ResolvedCatalogTable) baseTable,
+                    resolvedCatalogTable,
                     flinkPlanner.getOrCreateSqlValidator());
         } else if (sqlAlterTable instanceof SqlAlterTableRenameColumn) {
-            SqlAlterTableRenameColumn sqlAlterTableRenameColumn =
-                    (SqlAlterTableRenameColumn) sqlAlterTable;
-            ResolvedCatalogTable baseCatalogTable = (ResolvedCatalogTable) baseTable;
-            List<TableChange> tableChanges = new ArrayList<>();
-            Schema newSchema =
-                    alterSchemaConverter.applySchemaChange(
-                            sqlAlterTableRenameColumn, baseCatalogTable, tableChanges);
-            return new AlterTableChangeOperation(
-                    tableIdentifier,
-                    tableChanges,
-                    CatalogTable.of(
-                            newSchema,
-                            resolvedCatalogTable.getComment(),
-                            resolvedCatalogTable.getPartitionKeys(),
-                            resolvedCatalogTable.getOptions()));
+            return alterSchemaConverter.convertAlterSchema(
+                    (SqlAlterTableRenameColumn) sqlAlterTable, resolvedCatalogTable);
         } else if (sqlAlterTable instanceof SqlAddPartitions) {
             List<CatalogPartitionSpec> specs = new ArrayList<>();
             List<CatalogPartition> partitions = new ArrayList<>();
@@ -598,10 +557,8 @@ public class SqlToOperationConverter {
                     optionalCatalogTable.get(),
                     (SqlAlterTableCompact) sqlAlterTable);
         } else if (sqlAlterTable instanceof SqlAlterTableSchema) {
-            return convertAlterTableSchema(
-                    tableIdentifier,
-                    optionalCatalogTable.get().getResolvedTable(),
-                    (SqlAlterTableSchema) sqlAlterTable);
+            return alterSchemaConverter.convertAlterSchema(
+                    (SqlAlterTableSchema) sqlAlterTable, resolvedCatalogTable);
         } else {
             throw new ValidationException(
                     String.format(
@@ -724,23 +681,6 @@ public class SqlToOperationConverter {
                         tableIdentifier));
     }
 
-    private Operation convertAlterTableSchema(
-            ObjectIdentifier tableIdentifier,
-            ResolvedCatalogTable oldTable,
-            SqlAlterTableSchema alterTableSchema) {
-        List<TableChange> tableChanges = new ArrayList<>();
-        Schema newSchema =
-                alterSchemaConverter.applySchemaChange(alterTableSchema, oldTable, tableChanges);
-        return new AlterTableChangeOperation(
-                tableIdentifier,
-                tableChanges,
-                CatalogTable.of(
-                        newSchema,
-                        oldTable.getComment(),
-                        oldTable.getPartitionKeys(),
-                        oldTable.getOptions()));
-    }
-
     /** Convert CREATE FUNCTION statement. */
     private Operation convertCreateFunction(SqlCreateFunction sqlCreateFunction) {
         UnresolvedIdentifier unresolvedIdentifier =
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index 7587200850c..7e0079b1568 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -100,11 +100,9 @@ public class OperationConverterUtils {
             setWatermarkAndPK(builder, catalogTable.getSchema());
         }
         List<TableChange> tableChanges = new ArrayList<>();
-        // TODO: support TableChange in FLINK-30497
         for (SqlNode sqlNode : addReplaceColumns.getNewColumns()) {
             TableColumn tableColumn = toTableColumn((SqlTableColumn) sqlNode, sqlValidator);
             builder.add(tableColumn);
-            // TODO: support ALTER TABLE REPLACE with TableChange in FLINK-30497
             if (!addReplaceColumns.isReplace()) {
                 tableChanges.add(
                         TableChange.add(
@@ -133,6 +131,14 @@ public class OperationConverterUtils {
                         newProperties,
                         catalogTable.getComment());
         if (addReplaceColumns.isReplace()) {
+            // It's hard to determine how to decompose the ALTER TABLE REPLACE into multiple
+            // TableChanges. For example, with old schema <a INT, b INT, c INT> and the new schema
+            // <a INT, d INT>, there are multiple alternatives:
+            // plan 1: DROP COLUMN c, RENAME COLUMN b TO d;
+            // plan 2: DROP COLUMN b, RENAME COLUMN c TO d;
+            // So we don't translate with TableChanges here. One workaround is
+            // to minimize the edit distance, i.e., minimize the modification times, but it
+            // still cannot provide a deterministic answer.
             return new AlterTableSchemaOperation(tableIdentifier, newTable);
         } else {
             return new AlterTableChangeOperation(tableIdentifier, tableChanges, newTable);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java
index 0444afea947..6e406315e12 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java
@@ -55,8 +55,9 @@ public class ColumnReferenceFinderTest extends TableTestBase {
                                                 "tuple",
                                                 DataTypes.ROW(
                                                         DataTypes.TIMESTAMP(3), DataTypes.INT()))
+                                        .column("g", DataTypes.TIMESTAMP(3))
                                         .columnByExpression("ts", "tuple.f0")
-                                        .watermark("ts", "ts - interval '5' day")
+                                        .watermark("ts", "g - interval '5' day")
                                         .build());
     }
 
@@ -75,6 +76,6 @@ public class ColumnReferenceFinderTest extends TableTestBase {
                 .containsExactlyInAnyOrder("tuple");
 
         assertThat(ColumnReferenceFinder.findWatermarkReferencedColumn(resolvedSchema))
-                .containsExactlyInAnyOrder("ts");
+                .containsExactlyInAnyOrder("ts", "g");
     }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 10a7af4f4f6..253c147effa 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -82,7 +82,6 @@ import org.apache.flink.table.operations.command.ShowJarsOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
-import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -1332,14 +1331,12 @@ public class SqlToOperationConverterTest {
         // rename column c that is used in a computed column
         assertThatThrownBy(() -> parse("alter table tb1 rename a to a1"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "The column `a` is referenced by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b).");
+                .hasMessageContaining("The column `a` is referenced by computed column `d`.");
 
         // rename column used in the watermark expression
         assertThatThrownBy(() -> parse("alter table tb1 rename ts to ts1"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "The column `ts` is referenced by watermark expression [WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds].");
+                .hasMessageContaining("The column `ts` is referenced by watermark expression.");
 
         // rename nested column
         assertThatThrownBy(() -> parse("alter table tb1 rename e.f1 to e.f11"))
@@ -1374,7 +1371,7 @@ public class SqlToOperationConverterTest {
         assertThatThrownBy(() -> parse("alter table `cat1`.`db1`.`tb2` rename e to e1"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nThe column `e` is referenced by computed column `j` STRING AS upper(e).");
+                        "Failed to execute ALTER TABLE statement.\nThe column `e` is referenced by computed column `g`, `j`.");
 
         // rename column used as partition key
         assertThatThrownBy(() -> parse("alter table tb2 rename a to a1"))
@@ -1409,8 +1406,7 @@ public class SqlToOperationConverterTest {
         // drop a column which generates a computed column
         assertThatThrownBy(() -> parse("alter table tb1 drop a"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "The column `a` is referenced by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b).");
+                .hasMessageContaining("The column `a` is referenced by computed column `d`.");
 
         // drop a column which is pk
         assertThatThrownBy(() -> parse("alter table tb1 drop c"))
@@ -1420,8 +1416,7 @@ public class SqlToOperationConverterTest {
         // drop a column which defines watermark
         assertThatThrownBy(() -> parse("alter table tb1 drop ts"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "The column `ts` is referenced by watermark expression [WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds].");
+                .hasMessageContaining("The column `ts` is referenced by watermark expression.");
     }
 
     @Test
@@ -1429,30 +1424,31 @@ public class SqlToOperationConverterTest {
         prepareNonManagedTable(false);
         // drop a single column
         Operation operation = parse("alter table tb1 drop c");
-        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
-        assertThat(operation.asSummaryString())
-                .isEqualTo(
-                        "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
-                                + "  `a` INT NOT NULL,\n"
-                                + "  `b` BIGINT NOT NULL,\n"
-                                + "  `d` AS [a*(b+2 + a*b)],\n"
-                                + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
-                                + "  `f` AS [e.f1 + e.f2.f0],\n"
-                                + "  `g` METADATA VIRTUAL,\n"
-                                + "  `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
-                                + ")");
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
+        assertThat(operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  DROP `c`");
+        assertThat(
+                        ((AlterTableChangeOperation) operation)
+                                .getNewTable().getUnresolvedSchema().getColumns().stream()
+                                        .map(Schema.UnresolvedColumn::getName)
+                                        .collect(Collectors.toList()))
+                .doesNotContain("c");
 
         // drop computed column and referenced columns together
         operation = parse("alter table tb1 drop (f, e, b, d)");
-        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
         assertThat(operation.asSummaryString())
                 .isEqualTo(
-                        "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
-                                + "  `a` INT NOT NULL,\n"
-                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
-                                + "  `g` METADATA VIRTUAL,\n"
-                                + "  `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
-                                + ")");
+                        "ALTER TABLE cat1.db1.tb1\n"
+                                + "  DROP `d`,\n"
+                                + "  DROP `f`,\n"
+                                + "  DROP `b`,\n"
+                                + "  DROP `e`");
+        assertThat(
+                        ((AlterTableChangeOperation) operation)
+                                .getNewTable().getUnresolvedSchema().getColumns().stream()
+                                        .map(Schema.UnresolvedColumn::getName)
+                                        .collect(Collectors.toList()))
+                .doesNotContain("f", "e", "b", "d");
     }
 
     @Test
@@ -1474,25 +1470,27 @@ public class SqlToOperationConverterTest {
     @Test
     public void testAlterTableDropConstraint() throws Exception {
         prepareNonManagedTable(true);
-        String expectedSummaryString =
-                "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
-                        + "  `a` INT NOT NULL,\n"
-                        + "  `b` BIGINT NOT NULL,\n"
-                        + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
-                        + "  `d` AS [a*(b+2 + a*b)],\n"
-                        + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
-                        + "  `f` AS [e.f1 + e.f2.f0],\n"
-                        + "  `g` METADATA VIRTUAL,\n"
-                        + "  `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
-                        + ")";
+        String expectedSummaryString = "ALTER TABLE cat1.db1.tb1\n  DROP CONSTRAINT ct1";
 
         Operation operation = parse("alter table tb1 drop constraint ct1");
-        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
         assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
+        assertThat(
+                        ((AlterTableChangeOperation) operation)
+                                .getNewTable()
+                                .getUnresolvedSchema()
+                                .getPrimaryKey())
+                .isNotPresent();
 
         operation = parse("alter table tb1 drop primary key");
-        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
         assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
+        assertThat(
+                        ((AlterTableChangeOperation) operation)
+                                .getNewTable()
+                                .getUnresolvedSchema()
+                                .getPrimaryKey())
+                .isNotPresent();
     }
 
     @Test
@@ -1507,19 +1505,15 @@ public class SqlToOperationConverterTest {
     public void testAlterTableDropWatermark() throws Exception {
         prepareNonManagedTable("tb1", true);
         Operation operation = parse("alter table tb1 drop watermark");
-        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
         assertThat(operation.asSummaryString())
-                .isEqualTo(
-                        "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
-                                + "  `a` INT NOT NULL,\n"
-                                + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
-                                + "  `d` AS [a*(b+2 + a*b)],\n"
-                                + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
-                                + "  `f` AS [e.f1 + e.f2.f0],\n"
-                                + "  `g` METADATA VIRTUAL,\n"
-                                + "  `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
-                                + ")");
+                .isEqualTo("ALTER TABLE cat1.db1.tb1\n  DROP WATERMARK");
+        assertThat(
+                        ((AlterTableChangeOperation) operation)
+                                .getNewTable()
+                                .getUnresolvedSchema()
+                                .getWatermarkSpecs())
+                .isEmpty();
     }
 
     @Test