You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/03 12:14:31 UTC

[flink] branch master updated: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d5b8ad7672c [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change
d5b8ad7672c is described below

commit d5b8ad7672cd981cdbc757549ed2ab1fcf007500
Author: Shengkai <10...@qq.com>
AuthorDate: Fri Dec 30 16:15:02 2022 +0800

    [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change
    
    This closes #21577
---
 .../hive/parse/HiveParserDDLSemanticAnalyzer.java  |  39 +-
 .../sql/parser/ddl/SqlAlterTableRenameColumn.java  |   2 +-
 .../operations/ddl/AlterTableChangeOperation.java  |  43 ++
 .../apache/flink/table/catalog/TableChange.java    | 496 ++++++++++++++++++++-
 .../planner/operations/AlterSchemaConverter.java   | 279 +++++++-----
 .../operations/SqlToOperationConverter.java        |  20 +-
 .../planner/utils/OperationConverterUtils.java     |  85 +++-
 .../operations/SqlToOperationConverterTest.java    | 106 +++--
 8 files changed, 891 insertions(+), 179 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index b386a46b62b..79cef5614be 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -42,11 +42,14 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.CatalogViewImpl;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -439,7 +442,7 @@ public class HiveParserDDLSemanticAnalyzer {
         if (partSpecNode != null) {
             partSpec = getPartSpec(partSpecNode);
         }
-        CatalogBaseTable alteredTable = getAlteredTable(tableName, false);
+        ResolvedCatalogBaseTable<?> alteredTable = getAlteredTable(tableName, false);
         switch (ast.getType()) {
             case HiveASTParser.TOK_ALTERTABLE_RENAME:
                 operation = convertAlterTableRename(tableName, ast, false);
@@ -1880,7 +1883,7 @@ public class HiveParserDDLSemanticAnalyzer {
     }
 
     private Operation convertAlterTableChangeCol(
-            CatalogBaseTable alteredTable, String[] qualified, HiveParserASTNode ast)
+            ResolvedCatalogBaseTable<?> alteredTable, String[] qualified, HiveParserASTNode ast)
             throws SemanticException {
         String newComment = null;
         boolean first = false;
@@ -1933,7 +1936,7 @@ public class HiveParserDDLSemanticAnalyzer {
         String tblName = HiveParserBaseSemanticAnalyzer.getDotName(qualified);
 
         ObjectIdentifier tableIdentifier = parseObjectIdentifier(tblName);
-        CatalogTable oldTable = (CatalogTable) alteredTable;
+        ResolvedCatalogTable oldTable = (ResolvedCatalogTable) alteredTable;
         String oldName = HiveParserBaseSemanticAnalyzer.unescapeIdentifier(oldColName);
         String newName = HiveParserBaseSemanticAnalyzer.unescapeIdentifier(newColName);
 
@@ -1954,8 +1957,27 @@ public class HiveParserDDLSemanticAnalyzer {
         if (isCascade) {
             props.put(ALTER_COL_CASCADE, "true");
         }
-        return new AlterTableSchemaOperation(
+
+        List<TableChange> tableChanges =
+                OperationConverterUtils.buildModifyColumnChange(
+                        oldTable.getResolvedSchema()
+                                .getColumn(oldName)
+                                .orElseThrow(
+                                        () ->
+                                                new ValidationException(
+                                                        "Can not find the old column: "
+                                                                + oldColName)),
+                        Column.physical(newTableColumn.getName(), newTableColumn.getType())
+                                .withComment(newComment),
+                        first
+                                ? TableChange.ColumnPosition.first()
+                                : (flagCol == null
+                                        ? null
+                                        : TableChange.ColumnPosition.after(flagCol)));
+
+        return new AlterTableChangeOperation(
                 tableIdentifier,
+                tableChanges,
                 new CatalogTableImpl(
                         newSchema, oldTable.getPartitionKeys(), props, oldTable.getComment()));
     }
@@ -1984,6 +2006,7 @@ public class HiveParserDDLSemanticAnalyzer {
         final int numPartCol = oldTable.getPartitionKeys().size();
         TableSchema.Builder builder = TableSchema.builder();
         // add existing non-part col if we're not replacing
+        // TODO: support TableChange in FLINK-30497
         if (!replace) {
             List<TableColumn> nonPartCols =
                     oldSchema.getTableColumns().subList(0, oldSchema.getFieldCount() - numPartCol);
@@ -2146,9 +2169,9 @@ public class HiveParserDDLSemanticAnalyzer {
         return new AlterViewPropertiesOperation(viewIdentifier, newView);
     }
 
-    private CatalogBaseTable getAlteredTable(String tableName, boolean expectView) {
+    private ResolvedCatalogBaseTable<?> getAlteredTable(String tableName, boolean expectView) {
         ObjectIdentifier objectIdentifier = parseObjectIdentifier(tableName);
-        CatalogBaseTable catalogBaseTable = getCatalogBaseTable(objectIdentifier);
+        ResolvedCatalogBaseTable<?> catalogBaseTable = getCatalogBaseTable(objectIdentifier);
         if (expectView) {
             if (catalogBaseTable instanceof CatalogTable) {
                 throw new ValidationException("ALTER VIEW for a table is not allowed");
@@ -2177,11 +2200,11 @@ public class HiveParserDDLSemanticAnalyzer {
         return database;
     }
 
-    private CatalogBaseTable getCatalogBaseTable(ObjectIdentifier tableIdentifier) {
+    private ResolvedCatalogBaseTable<?> getCatalogBaseTable(ObjectIdentifier tableIdentifier) {
         return getCatalogBaseTable(tableIdentifier, false);
     }
 
-    private CatalogBaseTable getCatalogBaseTable(
+    private ResolvedCatalogBaseTable<?> getCatalogBaseTable(
             ObjectIdentifier tableIdentifier, boolean ifExists) {
         Optional<ContextResolvedTable> optionalCatalogTable =
                 catalogManager.getTable(tableIdentifier);
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
index 28196183ff7..72aa3c1a067 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
@@ -50,7 +50,7 @@ public class SqlAlterTableRenameColumn extends SqlAlterTable {
                 tableIdentifier, originColumnIdentifier, newColumnIdentifier);
     }
 
-    public SqlIdentifier getOriginColumnIdentifier() {
+    public SqlIdentifier getOldColumnIdentifier() {
         return originColumnIdentifier;
     }
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
index fd7a89ee933..e8ef9519de1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.operations.ddl;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.utils.EncodingUtils;
 
 import java.util.Collections;
 import java.util.List;
@@ -76,6 +77,48 @@ public class AlterTableChangeOperation extends AlterTableOperation {
             TableChange.AddUniqueConstraint addUniqueConstraint =
                     (TableChange.AddUniqueConstraint) tableChange;
             return String.format("  ADD %s", addUniqueConstraint.getConstraint());
+        } else if (tableChange instanceof TableChange.ModifyColumnComment) {
+            TableChange.ModifyColumnComment modifyColumnComment =
+                    (TableChange.ModifyColumnComment) tableChange;
+            return String.format(
+                    "  MODIFY %s COMMENT '%s'",
+                    EncodingUtils.escapeIdentifier(modifyColumnComment.getNewColumn().getName()),
+                    modifyColumnComment.getNewComment());
+        } else if (tableChange instanceof TableChange.ModifyPhysicalColumnType) {
+            TableChange.ModifyPhysicalColumnType modifyPhysicalColumnType =
+                    (TableChange.ModifyPhysicalColumnType) tableChange;
+            return String.format(
+                    "  MODIFY %s %s",
+                    EncodingUtils.escapeIdentifier(
+                            modifyPhysicalColumnType.getNewColumn().getName()),
+                    modifyPhysicalColumnType.getNewType());
+        } else if (tableChange instanceof TableChange.ModifyColumnPosition) {
+            TableChange.ModifyColumnPosition modifyColumnPosition =
+                    (TableChange.ModifyColumnPosition) tableChange;
+            return String.format(
+                    "  MODIFY %s %s",
+                    EncodingUtils.escapeIdentifier(modifyColumnPosition.getNewColumn().getName()),
+                    modifyColumnPosition.getNewPosition());
+        } else if (tableChange instanceof TableChange.ModifyColumnName) {
+            TableChange.ModifyColumnName modifyColumnName =
+                    (TableChange.ModifyColumnName) tableChange;
+            return String.format(
+                    "  MODIFY %s TO %s",
+                    EncodingUtils.escapeIdentifier(modifyColumnName.getOldColumnName()),
+                    EncodingUtils.escapeIdentifier(modifyColumnName.getNewColumnName()));
+        } else if (tableChange instanceof TableChange.ModifyColumn) {
+            TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) tableChange;
+            return String.format(
+                    "  MODIFY %s %s",
+                    modifyColumn.getNewColumn(),
+                    modifyColumn.getNewPosition() == null ? "" : modifyColumn.getNewPosition());
+        } else if (tableChange instanceof TableChange.ModifyWatermark) {
+            TableChange.ModifyWatermark modifyWatermark = (TableChange.ModifyWatermark) tableChange;
+            return String.format("  MODIFY %s", modifyWatermark.getNewWatermark());
+        } else if (tableChange instanceof TableChange.ModifyUniqueConstraint) {
+            TableChange.ModifyUniqueConstraint modifyUniqueConstraint =
+                    (TableChange.ModifyUniqueConstraint) tableChange;
+            return String.format("  MODIFY %s", modifyUniqueConstraint.getNewConstraint());
         } else {
             throw new UnsupportedOperationException(
                     String.format("Unknown table change: %s.", tableChange));
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
index 1a2b40d9134..240148836c8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
@@ -19,7 +19,9 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -51,7 +53,7 @@ public interface TableChange {
      * <p>It is equal to the following statement:
      *
      * <pre>
-     *    ALTER TABLE &lt;table_name&gt; ADD &lt;column_definition&gt; &lt;position&gt;
+     *    ALTER TABLE &lt;table_name&gt; ADD &lt;column_definition&gt; &lt;column_position&gt;
      * </pre>
      *
      * @param column the added column definition.
@@ -68,7 +70,7 @@ public interface TableChange {
      * <p>It is equal to the following statement:
      *
      * <pre>
-     *    ALTER TABLE &lt;table_name&gt; ADD PRIMARY KEY (&lt;column_name&gt;...) NOT ENFORCED;
+     *    ALTER TABLE &lt;table_name&gt; ADD PRIMARY KEY (&lt;column_name&gt;...) NOT ENFORCED
      * </pre>
      *
      * @param constraint the added constraint definition.
@@ -94,6 +96,139 @@ public interface TableChange {
         return new AddWatermark(watermarkSpec);
     }
 
+    /**
+     * A table change to modify a column. The modification includes:
+     *
+     * <ul>
+     *   <li>change column data type
+     *   <li>reorder column position
+     *   <li>modify column comment
+     *   <li>rename column name
+     *   <li>change the computed expression
+     *   <li>change the metadata column expression
+     * </ul>
+     *
+     * <p>Some fine-grained column changes are represented by the {@link
+     * TableChange#modifyPhysicalColumnType}, {@link TableChange#modifyColumnName}, {@link
+     * TableChange#modifyColumnComment} and {@link TableChange#modifyColumnPosition}.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_definition&gt; COMMENT '&lt;column_comment&gt;' &lt;column_position&gt;
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param newColumn the definition of the new column.
+     * @param columnPosition the new position of the column.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyColumn modify(
+            Column oldColumn, Column newColumn, @Nullable ColumnPosition columnPosition) {
+        return new ModifyColumn(oldColumn, newColumn, columnPosition);
+    }
+
+    /**
+     * A table change that modify the physical column data type.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;new_column_type&gt;
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param newType the type of the new column.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyPhysicalColumnType modifyPhysicalColumnType(Column oldColumn, DataType newType) {
+        return new ModifyPhysicalColumnType(oldColumn, newType);
+    }
+
+    /**
+     * A table change to modify the column name.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; RENAME &lt;old_column_name&gt; TO &lt;new_column_name&gt;
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param newName the name of the new column.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyColumnName modifyColumnName(Column oldColumn, String newName) {
+        return new ModifyColumnName(oldColumn, newName);
+    }
+
+    /**
+     * A table change to modify the column comment.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;original_column_type&gt; COMMENT '&lt;new_column_comment&gt;'
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param newComment the modified comment.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyColumnComment modifyColumnComment(Column oldColumn, String newComment) {
+        return new ModifyColumnComment(oldColumn, newComment);
+    }
+
+    /**
+     * A table change to modify the column position.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;original_column_type&gt; &lt;column_position&gt;
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param columnPosition the new position of the column.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyColumnPosition modifyColumnPosition(
+            Column oldColumn, ColumnPosition columnPosition) {
+        return new ModifyColumnPosition(oldColumn, columnPosition);
+    }
+
+    /**
+     * A table change to modify a unique constraint.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY PRIMARY KEY (&lt;column_name&gt;...) NOT ENFORCED;
+     * </pre>
+     *
+     * @param newConstraint the modified constraint definition.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyUniqueConstraint modify(UniqueConstraint newConstraint) {
+        return new ModifyUniqueConstraint(newConstraint);
+    }
+
+    /**
+     * A table change to modify a watermark.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY WATERMARK FOR &lt;row_time&gt; AS &lt;row_time_expression&gt;
+     * </pre>
+     *
+     * @param newWatermarkSpec the modified watermark definition.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyWatermark modify(WatermarkSpec newWatermarkSpec) {
+        return new ModifyWatermark(newWatermarkSpec);
+    }
+
     /**
      * A table change to set the table option.
      *
@@ -137,7 +272,7 @@ public interface TableChange {
      * <p>It is equal to the following statement:
      *
      * <pre>
-     *    ALTER TABLE &lt;table_name&gt; ADD &lt;column_definition&gt; &lt;position&gt;
+     *    ALTER TABLE &lt;table_name&gt; ADD &lt;column_definition&gt; &lt;column_position&gt;
      * </pre>
      */
     @PublicEvolving
@@ -276,6 +411,361 @@ public interface TableChange {
         }
     }
 
+    // --------------------------------------------------------------------------------------------
+    // Modify Change
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * A base schema change to modify a column. The modification includes:
+     *
+     * <ul>
+     *   <li>change column data type
+     *   <li>reorder column position
+     *   <li>modify column comment
+     *   <li>rename column name
+     *   <li>change the computed expression
+     *   <li>change the metadata column expression
+     * </ul>
+     *
+     * <p>Some fine-grained column changes are defined in the {@link ModifyPhysicalColumnType},
+     * {@link ModifyColumnComment}, {@link ModifyColumnPosition} and {@link ModifyColumnName}.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_definition&gt; COMMENT '&lt;column_comment&gt;' &lt;column_position&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyColumn implements TableChange {
+
+        protected final Column oldColumn;
+        protected final Column newColumn;
+
+        protected final @Nullable ColumnPosition newPosition;
+
+        public ModifyColumn(
+                Column oldColumn, Column newColumn, @Nullable ColumnPosition newPosition) {
+            this.oldColumn = oldColumn;
+            this.newColumn = newColumn;
+            this.newPosition = newPosition;
+        }
+
+        /** Returns the original {@link Column} instance. */
+        public Column getOldColumn() {
+            return oldColumn;
+        }
+
+        /** Returns the modified {@link Column} instance. */
+        public Column getNewColumn() {
+            return newColumn;
+        }
+
+        /**
+         * Returns the position of the modified {@link Column} instance. When the return value is
+         * null, it means modify the column at the original position. When the return value is
+         * FIRST, it means move the modified column to the first. When the return value is AFTER, it
+         * means move the column after the referred column.
+         */
+        public @Nullable ColumnPosition getNewPosition() {
+            return newPosition;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof ModifyColumn)) {
+                return false;
+            }
+            ModifyColumn that = (ModifyColumn) o;
+            return Objects.equals(oldColumn, that.oldColumn)
+                    && Objects.equals(newColumn, that.newColumn)
+                    && Objects.equals(newPosition, that.newPosition);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(oldColumn, newColumn, newPosition);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyColumn{"
+                    + "oldColumn="
+                    + oldColumn
+                    + ", newColumn="
+                    + newColumn
+                    + ", newPosition="
+                    + newPosition
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change to modify the column comment.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;original_column_type&gt; COMMENT '&lt;new_column_comment&gt;'
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyColumnComment extends ModifyColumn {
+
+        private final String newComment;
+
+        private ModifyColumnComment(Column oldColumn, String newComment) {
+            super(oldColumn, oldColumn.withComment(newComment), null);
+            this.newComment = newComment;
+        }
+
+        /** Get the new comment for the column. */
+        public String getNewComment() {
+            return newComment;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return (o instanceof ModifyColumnComment) && super.equals(o);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyColumnComment{"
+                    + "Column="
+                    + oldColumn
+                    + ", newComment='"
+                    + newComment
+                    + '\''
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change to modify the column position.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;original_column_type&gt; &lt;column_position&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyColumnPosition extends ModifyColumn {
+
+        public ModifyColumnPosition(Column oldColumn, ColumnPosition newPosition) {
+            super(oldColumn, oldColumn, newPosition);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return (o instanceof ModifyColumnPosition) && super.equals(o);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyColumnPosition{"
+                    + "Column="
+                    + oldColumn
+                    + ", newPosition="
+                    + newPosition
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change that modify the physical column data type.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;new_column_type&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyPhysicalColumnType extends ModifyColumn {
+
+        private ModifyPhysicalColumnType(Column oldColumn, DataType newType) {
+            super(oldColumn, oldColumn.copy(newType), null);
+            Preconditions.checkArgument(oldColumn.isPhysical());
+        }
+
+        /** Get the column type for the new column. */
+        public DataType getNewType() {
+            return newColumn.getDataType();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return (o instanceof ModifyPhysicalColumnType) && super.equals(o);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyPhysicalColumnType{"
+                    + "Column="
+                    + oldColumn
+                    + ", newType="
+                    + getNewType()
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change to modify the column name.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; RENAME &lt;old_column_name&gt; TO &lt;new_column_name&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyColumnName extends ModifyColumn {
+
+        private ModifyColumnName(Column oldColumn, String newName) {
+            super(oldColumn, createNewColumn(oldColumn, newName), null);
+        }
+
+        private static Column createNewColumn(Column oldColumn, String newName) {
+            if (oldColumn instanceof Column.PhysicalColumn) {
+                return Column.physical(newName, oldColumn.getDataType())
+                        .withComment(oldColumn.comment);
+            } else if (oldColumn instanceof Column.MetadataColumn) {
+                Column.MetadataColumn metadataColumn = (Column.MetadataColumn) oldColumn;
+                return Column.metadata(
+                                newName,
+                                oldColumn.getDataType(),
+                                metadataColumn.getMetadataKey().orElse(null),
+                                metadataColumn.isVirtual())
+                        .withComment(oldColumn.comment);
+            } else {
+                return Column.computed(newName, ((Column.ComputedColumn) oldColumn).getExpression())
+                        .withComment(oldColumn.comment);
+            }
+        }
+
+        /** Returns the origin column name. */
+        public String getOldColumnName() {
+            return oldColumn.getName();
+        }
+
+        /** Returns the new column name after renaming the column name. */
+        public String getNewColumnName() {
+            return newColumn.getName();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return (o instanceof ModifyColumnName) && super.equals(o);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyColumnName{"
+                    + "Column="
+                    + oldColumn
+                    + ", newName="
+                    + getNewColumnName()
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change to modify a unique constraint.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY PRIMARY KEY (&lt;column_name&gt; ...) NOT ENFORCED
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyUniqueConstraint implements TableChange {
+
+        private final UniqueConstraint newConstraint;
+
+        public ModifyUniqueConstraint(UniqueConstraint newConstraint) {
+            this.newConstraint = newConstraint;
+        }
+
+        /** Returns the modified unique constraint. */
+        public UniqueConstraint getNewConstraint() {
+            return newConstraint;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof ModifyUniqueConstraint)) {
+                return false;
+            }
+            ModifyUniqueConstraint that = (ModifyUniqueConstraint) o;
+            return Objects.equals(newConstraint, that.newConstraint);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(newConstraint);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyUniqueConstraint{" + "newConstraint=" + newConstraint + '}';
+        }
+    }
+
+    /**
+     * A table change to modify the watermark.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY WATERMARK FOR &lt;row_time_column_name&gt; AS &lt;watermark_expression&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyWatermark implements TableChange {
+
+        private final WatermarkSpec newWatermark;
+
+        public ModifyWatermark(WatermarkSpec newWatermark) {
+            this.newWatermark = newWatermark;
+        }
+
+        /** Returns the modified watermark. */
+        public WatermarkSpec getNewWatermark() {
+            return newWatermark;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof ModifyWatermark)) {
+                return false;
+            }
+            ModifyWatermark that = (ModifyWatermark) o;
+            return Objects.equals(newWatermark, that.newWatermark);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(newWatermark);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyWatermark{" + "newWatermark=" + newWatermark + '}';
+        }
+    }
+
     // --------------------------------------------------------------------------------------------
     // Property change
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
index b0b5e65fdf1..193e47c168e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
@@ -43,12 +43,12 @@ import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.expressions.SqlCallExpression;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.expressions.ColumnReferenceFinder;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
@@ -72,6 +72,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType;
+import static org.apache.flink.table.planner.utils.OperationConverterUtils.buildModifyColumnChange;
 import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
 
 /**
@@ -104,13 +105,13 @@ public class AlterSchemaConverter {
      */
     public Schema applySchemaChange(
             SqlAlterTableSchema alterTableSchema,
-            Schema originSchema,
+            ResolvedCatalogTable oldTable,
             List<TableChange> tableChangeCollector) {
         AlterSchemaStrategy strategy = computeAlterSchemaStrategy(alterTableSchema);
         SchemaConverter converter =
                 strategy == AlterSchemaStrategy.ADD
                         ? new AddSchemaConverter(
-                                originSchema,
+                                oldTable.getUnresolvedSchema(),
                                 (FlinkTypeFactory) sqlValidator.getTypeFactory(),
                                 sqlValidator,
                                 constraintValidator,
@@ -118,7 +119,7 @@ public class AlterSchemaConverter {
                                 schemaResolver,
                                 tableChangeCollector)
                         : new ModifySchemaConverter(
-                                originSchema,
+                                oldTable,
                                 (FlinkTypeFactory) sqlValidator.getTypeFactory(),
                                 sqlValidator,
                                 constraintValidator,
@@ -133,41 +134,47 @@ public class AlterSchemaConverter {
 
     /** Convert ALTER TABLE RENAME col_name to new_col_name to generate an updated Schema. */
     public Schema applySchemaChange(
-            SqlAlterTableRenameColumn renameColumn, ResolvedCatalogTable originTable) {
-        String originColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
+            SqlAlterTableRenameColumn renameColumn,
+            ResolvedCatalogTable oldTable,
+            List<TableChange> tableChangeCollector) {
+        String oldColumnName = getColumnName(renameColumn.getOldColumnIdentifier());
         String newColumnName = getColumnName(renameColumn.getNewColumnIdentifier());
-        // validate origin column is exists, new column name does not collide with existed column
-        // names, and origin column isn't referenced by computed column
+        // validate old column is exists, new column name does not collide with existed column
+        // names, and old column isn't referenced by computed column
         validateColumnName(
-                originColumnName,
+                oldColumnName,
                 newColumnName,
-                originTable.getResolvedSchema(),
-                originTable.getPartitionKeys());
-        validateWatermark(originTable, originColumnName);
+                oldTable.getResolvedSchema(),
+                oldTable.getPartitionKeys());
+        validateWatermark(oldTable, oldColumnName);
 
         // generate new schema
         Schema.Builder schemaBuilder = Schema.newBuilder();
         buildUpdatedColumn(
                 schemaBuilder,
-                originTable,
+                oldTable,
                 (builder, column) -> {
-                    if (column.getName().equals(originColumnName)) {
-                        buildNewColumnFromOriginColumn(builder, column, newColumnName);
+                    if (column.getName().equals(oldColumnName)) {
+                        buildNewColumnFromOldColumn(builder, column, newColumnName);
+                        tableChangeCollector.add(
+                                TableChange.modifyColumnName(
+                                        unwrap(
+                                                oldTable.getResolvedSchema()
+                                                        .getColumn(oldColumnName)),
+                                        newColumnName));
                     } else {
                         builder.fromColumns(Collections.singletonList(column));
                     }
                 });
         buildUpdatedPrimaryKey(
-                schemaBuilder,
-                originTable,
-                (pk) -> pk.equals(originColumnName) ? newColumnName : pk);
-        buildUpdatedWatermark(schemaBuilder, originTable);
+                schemaBuilder, oldTable, (pk) -> pk.equals(oldColumnName) ? newColumnName : pk);
+        buildUpdatedWatermark(schemaBuilder, oldTable);
         return schemaBuilder.build();
     }
 
     /** Convert ALTER TABLE DROP (col1 [, col2, ...]) to generate an updated Schema. */
     public Schema applySchemaChange(
-            SqlAlterTableDropColumn dropColumn, ResolvedCatalogTable originTable) {
+            SqlAlterTableDropColumn dropColumn, ResolvedCatalogTable oldTable) {
         Set<String> columnsToDrop = new HashSet<>();
         dropColumn
                 .getColumnList()
@@ -188,28 +195,28 @@ public class AlterSchemaConverter {
             // does not derive any computed column
             validateColumnName(
                     columnToDrop,
-                    originTable.getResolvedSchema(),
-                    originTable.getPartitionKeys(),
+                    oldTable.getResolvedSchema(),
+                    oldTable.getPartitionKeys(),
                     columnsToDrop);
-            validateWatermark(originTable, columnToDrop);
+            validateWatermark(oldTable, columnToDrop);
         }
         buildUpdatedColumn(
                 schemaBuilder,
-                originTable,
+                oldTable,
                 (builder, column) -> {
                     if (!columnsToDrop.contains(column.getName())) {
                         builder.fromColumns(Collections.singletonList(column));
                     }
                 });
-        buildUpdatedPrimaryKey(schemaBuilder, originTable, Function.identity());
-        buildUpdatedWatermark(schemaBuilder, originTable);
+        buildUpdatedPrimaryKey(schemaBuilder, oldTable, Function.identity());
+        buildUpdatedWatermark(schemaBuilder, oldTable);
         return schemaBuilder.build();
     }
 
     /** Convert ALTER TABLE DROP PRIMARY KEY to generate an updated Schema. */
     public Schema applySchemaChange(
-            SqlAlterTableDropPrimaryKey dropPrimaryKey, ResolvedCatalogTable originTable) {
-        Optional<UniqueConstraint> pkConstraint = originTable.getResolvedSchema().getPrimaryKey();
+            SqlAlterTableDropPrimaryKey dropPrimaryKey, ResolvedCatalogTable oldTable) {
+        Optional<UniqueConstraint> pkConstraint = oldTable.getResolvedSchema().getPrimaryKey();
         if (!pkConstraint.isPresent()) {
             throw new ValidationException(
                     String.format(
@@ -218,9 +225,9 @@ public class AlterSchemaConverter {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         buildUpdatedColumn(
                 schemaBuilder,
-                originTable,
+                oldTable,
                 (builder, column) -> builder.fromColumns(Collections.singletonList(column)));
-        buildUpdatedWatermark(schemaBuilder, originTable);
+        buildUpdatedWatermark(schemaBuilder, oldTable);
         return schemaBuilder.build();
     }
 
@@ -228,8 +235,8 @@ public class AlterSchemaConverter {
      * Convert ALTER TABLE DROP CONSTRAINT constraint_name to generate an updated {@link Schema}.
      */
     public Schema applySchemaChange(
-            SqlAlterTableDropConstraint dropConstraint, ResolvedCatalogTable originTable) {
-        Optional<UniqueConstraint> pkConstraint = originTable.getResolvedSchema().getPrimaryKey();
+            SqlAlterTableDropConstraint dropConstraint, ResolvedCatalogTable oldTable) {
+        Optional<UniqueConstraint> pkConstraint = oldTable.getResolvedSchema().getPrimaryKey();
         if (!pkConstraint.isPresent()) {
             throw new ValidationException(
                     String.format(
@@ -248,16 +255,16 @@ public class AlterSchemaConverter {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         buildUpdatedColumn(
                 schemaBuilder,
-                originTable,
+                oldTable,
                 (builder, column) -> builder.fromColumns(Collections.singletonList(column)));
-        buildUpdatedWatermark(schemaBuilder, originTable);
+        buildUpdatedWatermark(schemaBuilder, oldTable);
         return schemaBuilder.build();
     }
 
     /** Convert ALTER TABLE DROP WATERMARK to generate an updated {@link Schema}. */
     public Schema applySchemaChange(
-            SqlAlterTableDropWatermark dropWatermark, ResolvedCatalogTable originTable) {
-        if (originTable.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
+            SqlAlterTableDropWatermark dropWatermark, ResolvedCatalogTable oldTable) {
+        if (oldTable.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
             throw new ValidationException(
                     String.format(
                             "%sThe base table does not define any watermark strategy.",
@@ -266,9 +273,9 @@ public class AlterSchemaConverter {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         buildUpdatedColumn(
                 schemaBuilder,
-                originTable,
+                oldTable,
                 (builder, column) -> builder.fromColumns(Collections.singletonList(column)));
-        buildUpdatedPrimaryKey(schemaBuilder, originTable, Function.identity());
+        buildUpdatedPrimaryKey(schemaBuilder, oldTable, Function.identity());
         return schemaBuilder.build();
     }
 
@@ -289,10 +296,10 @@ public class AlterSchemaConverter {
         SchemaResolver schemaResolver;
 
         List<TableChange> changesCollector;
-        List<Function<ResolvedSchema, TableChange>> changeBuilders = new ArrayList<>();
+        List<Function<ResolvedSchema, List<TableChange>>> changeBuilders = new ArrayList<>();
 
         SchemaConverter(
-                Schema originSchema,
+                Schema oldSchema,
                 FlinkTypeFactory typeFactory,
                 SqlValidator sqlValidator,
                 Consumer<SqlTableConstraint> constraintValidator,
@@ -305,13 +312,13 @@ public class AlterSchemaConverter {
             this.escapeExpressions = escapeExpressions;
             this.schemaResolver = schemaResolver;
             this.changesCollector = changesCollector;
-            populateColumnsFromSourceTable(originSchema);
-            populatePrimaryKeyFromSourceTable(originSchema);
-            populateWatermarkFromSourceTable(originSchema);
+            populateColumnsFromSourceTable(oldSchema);
+            populatePrimaryKeyFromSourceTable(oldSchema);
+            populateWatermarkFromSourceTable(oldSchema);
         }
 
-        private void populateColumnsFromSourceTable(Schema originSchema) {
-            originSchema
+        private void populateColumnsFromSourceTable(Schema oldSchema) {
+            oldSchema
                     .getColumns()
                     .forEach(
                             column -> {
@@ -321,15 +328,15 @@ public class AlterSchemaConverter {
                             });
         }
 
-        private void populatePrimaryKeyFromSourceTable(Schema originSchema) {
-            if (originSchema.getPrimaryKey().isPresent()) {
-                primaryKey = originSchema.getPrimaryKey().get();
+        private void populatePrimaryKeyFromSourceTable(Schema oldSchema) {
+            if (oldSchema.getPrimaryKey().isPresent()) {
+                primaryKey = oldSchema.getPrimaryKey().get();
             }
         }
 
-        private void populateWatermarkFromSourceTable(Schema originSchema) {
+        private void populateWatermarkFromSourceTable(Schema oldSchema) {
             for (Schema.UnresolvedWatermarkSpec sourceWatermarkSpec :
-                    originSchema.getWatermarkSpecs()) {
+                    oldSchema.getWatermarkSpecs()) {
                 watermarkSpec = sourceWatermarkSpec;
             }
         }
@@ -379,14 +386,12 @@ public class AlterSchemaConverter {
         private void updatePrimaryKeyNullability(String columnName) {
             Schema.UnresolvedColumn column = columns.get(columnName);
             if (column instanceof Schema.UnresolvedPhysicalColumn) {
-                AbstractDataType<?> originType =
+                AbstractDataType<?> oldType =
                         ((Schema.UnresolvedPhysicalColumn) column).getDataType();
                 columns.put(
                         columnName,
                         new Schema.UnresolvedPhysicalColumn(
-                                columnName,
-                                originType.notNull(),
-                                column.getComment().orElse(null)));
+                                columnName, oldType.notNull(), column.getComment().orElse(null)));
             }
         }
 
@@ -442,10 +447,7 @@ public class AlterSchemaConverter {
 
         @Nullable
         String getComment(SqlTableColumn column) {
-            return column.getComment()
-                    .map(SqlCharStringLiteral.class::cast)
-                    .map(c -> c.getValueAs(String.class))
-                    .orElse(null);
+            return OperationConverterUtils.getComment(column);
         }
 
         private void applyColumnPosition(List<SqlNode> alterColumns) {
@@ -511,7 +513,9 @@ public class AlterSchemaConverter {
                 ResolvedSchema resolvedSchema = schemaResolver.resolve(updatedSchema);
                 changesCollector.addAll(
                         changeBuilders.stream()
-                                .map(changeBuilder -> changeBuilder.apply(resolvedSchema))
+                                .flatMap(
+                                        changeBuilder ->
+                                                changeBuilder.apply(resolvedSchema).stream())
                                 .collect(Collectors.toList()));
                 return updatedSchema;
             } catch (Exception e) {
@@ -528,10 +532,10 @@ public class AlterSchemaConverter {
         abstract void checkAndCollectWatermarkChange();
     }
 
-    private static class AddSchemaConverter extends SchemaConverter {
+    private class AddSchemaConverter extends SchemaConverter {
 
         AddSchemaConverter(
-                Schema originSchema,
+                Schema oldSchema,
                 FlinkTypeFactory typeFactory,
                 SqlValidator sqlValidator,
                 Consumer<SqlTableConstraint> constraintValidator,
@@ -539,7 +543,7 @@ public class AlterSchemaConverter {
                 SchemaResolver schemaResolver,
                 List<TableChange> changeCollector) {
             super(
-                    originSchema,
+                    oldSchema,
                     typeFactory,
                     sqlValidator,
                     constraintValidator,
@@ -559,7 +563,10 @@ public class AlterSchemaConverter {
                                 primaryKey.getColumnNames().stream()
                                         .collect(Collectors.joining("`, `", "[`", "`]"))));
             }
-            changeBuilders.add(schema -> TableChange.add(unwrap(schema.getPrimaryKey())));
+            changeBuilders.add(
+                    schema ->
+                            Collections.singletonList(
+                                    TableChange.add(unwrap(schema.getPrimaryKey()))));
         }
 
         @Override
@@ -574,7 +581,10 @@ public class AlterSchemaConverter {
                                 ((SqlCallExpression) watermarkSpec.getWatermarkExpression())
                                         .getSqlExpression()));
             }
-            changeBuilders.add(schema -> TableChange.add(schema.getWatermarkSpecs().get(0)));
+            changeBuilders.add(
+                    schema ->
+                            Collections.singletonList(
+                                    TableChange.add(schema.getWatermarkSpecs().get(0))));
         }
 
         @Override
@@ -590,29 +600,36 @@ public class AlterSchemaConverter {
             if (columnPosition.isFirstColumn()) {
                 changeBuilders.add(
                         schema ->
-                                TableChange.add(
-                                        unwrap(schema.getColumn(columnName)),
-                                        TableChange.ColumnPosition.first()));
+                                Collections.singletonList(
+                                        TableChange.add(
+                                                unwrap(schema.getColumn(columnName)),
+                                                TableChange.ColumnPosition.first())));
                 sortedColumnNames.add(0, columnName);
             } else if (columnPosition.isAfterReferencedColumn()) {
                 String referenceName = getReferencedColumn(columnPosition);
                 sortedColumnNames.add(sortedColumnNames.indexOf(referenceName) + 1, columnName);
                 changeBuilders.add(
                         schema ->
-                                TableChange.add(
-                                        unwrap(schema.getColumn(columnName)),
-                                        TableChange.ColumnPosition.after(referenceName)));
+                                Collections.singletonList(
+                                        TableChange.add(
+                                                unwrap(schema.getColumn(columnName)),
+                                                TableChange.ColumnPosition.after(referenceName))));
             } else {
-                changeBuilders.add(schema -> TableChange.add(unwrap(schema.getColumn(columnName))));
+                changeBuilders.add(
+                        schema ->
+                                Collections.singletonList(
+                                        TableChange.add(unwrap(schema.getColumn(columnName)))));
                 sortedColumnNames.add(columnName);
             }
         }
     }
 
-    private static class ModifySchemaConverter extends SchemaConverter {
+    private class ModifySchemaConverter extends SchemaConverter {
+
+        private final ResolvedCatalogTable oldTable;
 
         ModifySchemaConverter(
-                Schema originSchema,
+                ResolvedCatalogTable oldTable,
                 FlinkTypeFactory typeFactory,
                 SqlValidator sqlValidator,
                 Consumer<SqlTableConstraint> constraintValidator,
@@ -620,13 +637,14 @@ public class AlterSchemaConverter {
                 SchemaResolver schemaResolver,
                 List<TableChange> tableChangeCollector) {
             super(
-                    originSchema,
+                    oldTable.getUnresolvedSchema(),
                     typeFactory,
                     sqlValidator,
                     constraintValidator,
                     escapeExpressions,
                     schemaResolver,
                     tableChangeCollector);
+            this.oldTable = oldTable;
         }
 
         @Override
@@ -639,13 +657,33 @@ public class AlterSchemaConverter {
                                 EX_MSG_PREFIX, columnName));
             }
 
+            Column oldColumn = unwrap(oldTable.getResolvedSchema().getColumn(columnName));
             if (columnPosition.isFirstColumn()) {
                 sortedColumnNames.remove(columnName);
                 sortedColumnNames.add(0, columnName);
+
+                changeBuilders.add(
+                        schema ->
+                                buildModifyColumnChange(
+                                        oldColumn,
+                                        unwrap(schema.getColumn(columnName)),
+                                        TableChange.ColumnPosition.first()));
             } else if (columnPosition.isAfterReferencedColumn()) {
                 String referenceName = getReferencedColumn(columnPosition);
                 sortedColumnNames.remove(columnName);
                 sortedColumnNames.add(sortedColumnNames.indexOf(referenceName) + 1, columnName);
+
+                changeBuilders.add(
+                        schema ->
+                                buildModifyColumnChange(
+                                        oldColumn,
+                                        unwrap(schema.getColumn(columnName)),
+                                        TableChange.ColumnPosition.after(referenceName)));
+            } else {
+                changeBuilders.add(
+                        schema ->
+                                buildModifyColumnChange(
+                                        oldColumn, unwrap(schema.getColumn(columnName)), null));
             }
         }
 
@@ -658,6 +696,10 @@ public class AlterSchemaConverter {
                                         + "want to add a new one.",
                                 EX_MSG_PREFIX));
             }
+            changeBuilders.add(
+                    schema ->
+                            Collections.singletonList(
+                                    TableChange.modify(unwrap(schema.getPrimaryKey()))));
         }
 
         @Override
@@ -669,6 +711,10 @@ public class AlterSchemaConverter {
                                         + "want to add a new one.",
                                 EX_MSG_PREFIX));
             }
+            changeBuilders.add(
+                    schema ->
+                            Collections.singletonList(
+                                    TableChange.modify(schema.getWatermarkSpecs().get(0))));
         }
 
         @Nullable
@@ -685,18 +731,18 @@ public class AlterSchemaConverter {
     // --------------------------------------------------------------------------------------------
 
     private void validateColumnName(
-            String originColumnName,
+            String oldColumnName,
             String newColumnName,
-            ResolvedSchema originSchemas,
+            ResolvedSchema oldSchema,
             List<String> partitionKeys) {
         validateColumnName(
-                originColumnName,
-                originSchemas,
+                oldColumnName,
+                oldSchema,
                 partitionKeys,
                 // fail the operation of renaming column, once the column derives a computed column
-                (referencedColumn, computedColumn) -> referencedColumn.contains(originColumnName));
+                (referencedColumn, computedColumn) -> referencedColumn.contains(oldColumnName));
         // validate new column
-        if (originSchemas.getColumn(newColumnName).isPresent()) {
+        if (oldSchema.getColumn(newColumnName).isPresent()) {
             throw new ValidationException(
                     String.format(
                             "%sThe column `%s` already existed in table schema.",
@@ -706,19 +752,19 @@ public class AlterSchemaConverter {
 
     private void validateColumnName(
             String columnToDrop,
-            ResolvedSchema originSchema,
+            ResolvedSchema oldSchema,
             List<String> partitionKeys,
             Set<String> columnsToDrop) {
         validateColumnName(
                 columnToDrop,
-                originSchema,
+                oldSchema,
                 partitionKeys,
                 // fail the operation of dropping column, only if the column derives a computed
-                // column, and the computed column is not being dropped along with the origin column
+                // column, and the computed column is not being dropped along with the old column
                 (referencedColumn, computedColumn) ->
                         referencedColumn.contains(columnToDrop)
                                 && !columnsToDrop.contains(computedColumn.getName()));
-        originSchema
+        oldSchema
                 .getPrimaryKey()
                 .ifPresent(
                         pk -> {
@@ -733,11 +779,11 @@ public class AlterSchemaConverter {
 
     private void validateColumnName(
             String columnToAlter,
-            ResolvedSchema originSchema,
+            ResolvedSchema oldSchema,
             List<String> partitionKeys,
             BiFunction<Set<String>, Column.ComputedColumn, Boolean> computedColumnChecker) {
-        // validate origin column
-        Set<String> tableColumns = new HashSet<>(originSchema.getColumnNames());
+        // validate old column
+        Set<String> tableColumns = new HashSet<>(oldSchema.getColumnNames());
         if (!tableColumns.contains(columnToAlter)) {
             throw new ValidationException(
                     String.format(
@@ -745,15 +791,15 @@ public class AlterSchemaConverter {
                             EX_MSG_PREFIX, columnToAlter));
         }
 
-        // validate origin column name isn't referred by computed column case
-        originSchema.getColumns().stream()
+        // validate old column name isn't referred by computed column case
+        oldSchema.getColumns().stream()
                 .filter(column -> column instanceof Column.ComputedColumn)
                 .forEach(
                         column -> {
                             Column.ComputedColumn computedColumn = (Column.ComputedColumn) column;
                             Set<String> referencedColumn =
                                     ColumnReferenceFinder.findReferencedColumn(
-                                            computedColumn.getName(), originSchema);
+                                            computedColumn.getName(), oldSchema);
                             if (computedColumnChecker.apply(referencedColumn, computedColumn)) {
                                 throw new ValidationException(
                                         String.format(
@@ -763,7 +809,7 @@ public class AlterSchemaConverter {
                                                 computedColumn.asSummaryString()));
                             }
                         });
-        // validate partition keys doesn't contain the origin column
+        // validate partition keys doesn't contain the old column
         if (partitionKeys.contains(columnToAlter)) {
             throw new ValidationException(
                     String.format(
@@ -772,14 +818,13 @@ public class AlterSchemaConverter {
         }
     }
 
-    private void validateWatermark(ResolvedCatalogTable originTable, String columnToAlter) {
-        // validate origin column isn't referenced by watermark
-        List<WatermarkSpec> watermarkSpecs = originTable.getResolvedSchema().getWatermarkSpecs();
+    private void validateWatermark(ResolvedCatalogTable oldTable, String columnToAlter) {
+        // validate old column isn't referenced by watermark
+        List<WatermarkSpec> watermarkSpecs = oldTable.getResolvedSchema().getWatermarkSpecs();
         Set<String> referencedColumns =
-                ColumnReferenceFinder.findWatermarkReferencedColumn(
-                        originTable.getResolvedSchema());
+                ColumnReferenceFinder.findWatermarkReferencedColumn(oldTable.getResolvedSchema());
         Set<String> rowtimeAttributes =
-                originTable.getResolvedSchema().getWatermarkSpecs().stream()
+                oldTable.getResolvedSchema().getWatermarkSpecs().stream()
                         .map(WatermarkSpec::getRowtimeAttribute)
                         .collect(Collectors.toSet());
         if (rowtimeAttributes.contains(columnToAlter)
@@ -793,37 +838,34 @@ public class AlterSchemaConverter {
 
     private void buildUpdatedColumn(
             Schema.Builder builder,
-            ResolvedCatalogTable originTable,
+            ResolvedCatalogTable oldTable,
             BiConsumer<Schema.Builder, Schema.UnresolvedColumn> columnConsumer) {
         // build column
-        originTable
-                .getUnresolvedSchema()
+        oldTable.getUnresolvedSchema()
                 .getColumns()
                 .forEach(column -> columnConsumer.accept(builder, column));
     }
 
     private void buildUpdatedPrimaryKey(
             Schema.Builder builder,
-            ResolvedCatalogTable originTable,
+            ResolvedCatalogTable oldTable,
             Function<String, String> columnRenamer) {
-        originTable
-                .getUnresolvedSchema()
+        oldTable.getUnresolvedSchema()
                 .getPrimaryKey()
                 .ifPresent(
                         pk -> {
-                            List<String> originPrimaryKeyNames = pk.getColumnNames();
+                            List<String> oldPrimaryKeyNames = pk.getColumnNames();
                             String constrainName = pk.getConstraintName();
                             List<String> newPrimaryKeyNames =
-                                    originPrimaryKeyNames.stream()
+                                    oldPrimaryKeyNames.stream()
                                             .map(columnRenamer)
                                             .collect(Collectors.toList());
                             builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
                         });
     }
 
-    private void buildUpdatedWatermark(Schema.Builder builder, ResolvedCatalogTable originTable) {
-        originTable
-                .getUnresolvedSchema()
+    private void buildUpdatedWatermark(Schema.Builder builder, ResolvedCatalogTable oldTable) {
+        oldTable.getUnresolvedSchema()
                 .getWatermarkSpecs()
                 .forEach(
                         watermarkSpec ->
@@ -832,24 +874,23 @@ public class AlterSchemaConverter {
                                         watermarkSpec.getWatermarkExpression()));
     }
 
-    private void buildNewColumnFromOriginColumn(
-            Schema.Builder builder, Schema.UnresolvedColumn originColumn, String columnName) {
-        if (originColumn instanceof Schema.UnresolvedComputedColumn) {
+    private void buildNewColumnFromOldColumn(
+            Schema.Builder builder, Schema.UnresolvedColumn oldColumn, String columnName) {
+        if (oldColumn instanceof Schema.UnresolvedComputedColumn) {
             builder.columnByExpression(
-                    columnName, ((Schema.UnresolvedComputedColumn) originColumn).getExpression());
-        } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
-            builder.column(
-                    columnName, ((Schema.UnresolvedPhysicalColumn) originColumn).getDataType());
-        } else if (originColumn instanceof Schema.UnresolvedMetadataColumn) {
+                    columnName, ((Schema.UnresolvedComputedColumn) oldColumn).getExpression());
+        } else if (oldColumn instanceof Schema.UnresolvedPhysicalColumn) {
+            builder.column(columnName, ((Schema.UnresolvedPhysicalColumn) oldColumn).getDataType());
+        } else if (oldColumn instanceof Schema.UnresolvedMetadataColumn) {
             Schema.UnresolvedMetadataColumn metadataColumn =
-                    (Schema.UnresolvedMetadataColumn) originColumn;
+                    (Schema.UnresolvedMetadataColumn) oldColumn;
             builder.columnByMetadata(
                     columnName,
                     metadataColumn.getDataType(),
                     metadataColumn.getMetadataKey(),
                     metadataColumn.isVirtual());
         }
-        originColumn.getComment().ifPresent(builder::withComment);
+        oldColumn.getComment().ifPresent(builder::withComment);
     }
 
     private static String getColumnName(SqlIdentifier identifier) {
@@ -874,7 +915,7 @@ public class AlterSchemaConverter {
                         alterTableSchema.getClass().getCanonicalName()));
     }
 
-    private static <T> T unwrap(Optional<T> value) {
+    private <T> T unwrap(Optional<T> value) {
         return value.orElseThrow(() -> new TableException("The value should never be empty."));
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index acb46f60fc9..bd5aa2ec984 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -554,16 +554,19 @@ public class SqlToOperationConverter {
             return OperationConverterUtils.convertChangeColumn(
                     tableIdentifier,
                     (SqlChangeColumn) sqlAlterTable,
-                    (CatalogTable) baseTable,
+                    (ResolvedCatalogTable) baseTable,
                     flinkPlanner.getOrCreateSqlValidator());
         } else if (sqlAlterTable instanceof SqlAlterTableRenameColumn) {
             SqlAlterTableRenameColumn sqlAlterTableRenameColumn =
                     (SqlAlterTableRenameColumn) sqlAlterTable;
+            ResolvedCatalogTable baseCatalogTable = (ResolvedCatalogTable) baseTable;
+            List<TableChange> tableChanges = new ArrayList<>();
             Schema newSchema =
                     alterSchemaConverter.applySchemaChange(
-                            sqlAlterTableRenameColumn, resolvedCatalogTable);
-            return new AlterTableSchemaOperation(
+                            sqlAlterTableRenameColumn, baseCatalogTable, tableChanges);
+            return new AlterTableChangeOperation(
                     tableIdentifier,
+                    tableChanges,
                     CatalogTable.of(
                             newSchema,
                             resolvedCatalogTable.getComment(),
@@ -723,20 +726,19 @@ public class SqlToOperationConverter {
 
     private Operation convertAlterTableSchema(
             ObjectIdentifier tableIdentifier,
-            ResolvedCatalogTable originalTable,
+            ResolvedCatalogTable oldTable,
             SqlAlterTableSchema alterTableSchema) {
         List<TableChange> tableChanges = new ArrayList<>();
         Schema newSchema =
-                alterSchemaConverter.applySchemaChange(
-                        alterTableSchema, originalTable.getUnresolvedSchema(), tableChanges);
+                alterSchemaConverter.applySchemaChange(alterTableSchema, oldTable, tableChanges);
         return new AlterTableChangeOperation(
                 tableIdentifier,
                 tableChanges,
                 CatalogTable.of(
                         newSchema,
-                        originalTable.getComment(),
-                        originalTable.getPartitionKeys(),
-                        originalTable.getOptions()));
+                        oldTable.getComment(),
+                        oldTable.getPartitionKeys(),
+                        oldTable.getOptions()));
     }
 
     /** Convert CREATE FUNCTION statement. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index 43539d23552..7587200850c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
@@ -47,12 +48,16 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.validate.SqlValidator;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -95,6 +100,7 @@ public class OperationConverterUtils {
             setWatermarkAndPK(builder, catalogTable.getSchema());
         }
         List<TableChange> tableChanges = new ArrayList<>();
+        // TODO: support TableChange in FLINK-30497
         for (SqlNode sqlNode : addReplaceColumns.getNewColumns()) {
             TableColumn tableColumn = toTableColumn((SqlTableColumn) sqlNode, sqlValidator);
             builder.add(tableColumn);
@@ -103,12 +109,7 @@ public class OperationConverterUtils {
                 tableChanges.add(
                         TableChange.add(
                                 Column.physical(tableColumn.getName(), tableColumn.getType())
-                                        .withComment(
-                                                ((SqlTableColumn) sqlNode)
-                                                        .getComment()
-                                                        .map(SqlCharStringLiteral.class::cast)
-                                                        .map(c -> c.getValueAs(String.class))
-                                                        .orElse(null))));
+                                        .withComment(getComment((SqlTableColumn) sqlNode))));
             }
         }
 
@@ -141,7 +142,7 @@ public class OperationConverterUtils {
     public static Operation convertChangeColumn(
             ObjectIdentifier tableIdentifier,
             SqlChangeColumn changeColumn,
-            CatalogTable catalogTable,
+            ResolvedCatalogTable catalogTable,
             SqlValidator sqlValidator) {
         String oldName = changeColumn.getOldName().getSimple();
         if (catalogTable.getPartitionKeys().indexOf(oldName) >= 0) {
@@ -151,12 +152,29 @@ public class OperationConverterUtils {
         TableSchema oldSchema = catalogTable.getSchema();
         boolean first = changeColumn.isFirst();
         String after = changeColumn.getAfter() == null ? null : changeColumn.getAfter().getSimple();
-        TableColumn newTableColumn = toTableColumn(changeColumn.getNewColumn(), sqlValidator);
+        TableColumn.PhysicalColumn newTableColumn =
+                toTableColumn(changeColumn.getNewColumn(), sqlValidator);
         TableSchema newSchema = changeColumn(oldSchema, oldName, newTableColumn, first, after);
         Map<String, String> newProperties = new HashMap<>(catalogTable.getOptions());
         newProperties.putAll(extractProperties(changeColumn.getProperties()));
-        return new AlterTableSchemaOperation(
+
+        List<TableChange> tableChanges =
+                buildModifyColumnChange(
+                        catalogTable
+                                .getResolvedSchema()
+                                .getColumn(oldName)
+                                .orElseThrow(
+                                        () ->
+                                                new ValidationException(
+                                                        "Failed to get old column: " + oldName)),
+                        Column.physical(newTableColumn.getName(), newTableColumn.getType())
+                                .withComment(getComment(changeColumn.getNewColumn())),
+                        first
+                                ? TableChange.ColumnPosition.first()
+                                : (after == null ? null : TableChange.ColumnPosition.after(after)));
+        return new AlterTableChangeOperation(
                 tableIdentifier,
+                tableChanges,
                 new CatalogTableImpl(
                         newSchema,
                         catalogTable.getPartitionKeys(),
@@ -165,6 +183,44 @@ public class OperationConverterUtils {
         // TODO: handle watermark and constraints
     }
 
+    public static List<TableChange> buildModifyColumnChange(
+            Column oldColumn,
+            Column newColumn,
+            @Nullable TableChange.ColumnPosition columnPosition) {
+        if (oldColumn.isPhysical() && newColumn.isPhysical()) {
+            List<TableChange> changes = new ArrayList<>();
+            String newComment = newColumn.getComment().orElse(oldColumn.getComment().orElse(null));
+            if (!newColumn.getComment().equals(oldColumn.getComment())) {
+                changes.add(TableChange.modifyColumnComment(oldColumn, newComment));
+            }
+
+            if (!oldColumn
+                    .getDataType()
+                    .getLogicalType()
+                    .equals(newColumn.getDataType().getLogicalType())) {
+                changes.add(
+                        TableChange.modifyPhysicalColumnType(
+                                oldColumn.withComment(newComment), newColumn.getDataType()));
+            }
+
+            if (!Objects.equals(newColumn.getName(), oldColumn.getName())) {
+                changes.add(
+                        TableChange.modifyColumnName(
+                                oldColumn.withComment(newComment).copy(newColumn.getDataType()),
+                                newColumn.getName()));
+            }
+
+            if (columnPosition != null) {
+                changes.add(TableChange.modifyColumnPosition(newColumn, columnPosition));
+            }
+
+            return changes;
+        } else {
+            return Collections.singletonList(
+                    TableChange.modify(oldColumn, newColumn, columnPosition));
+        }
+    }
+
     // change a column in the old table schema and return the updated table schema
     public static TableSchema changeColumn(
             TableSchema oldSchema,
@@ -206,7 +262,14 @@ public class OperationConverterUtils {
         return builder.build();
     }
 
-    private static TableColumn toTableColumn(
+    public static @Nullable String getComment(SqlTableColumn column) {
+        return column.getComment()
+                .map(SqlCharStringLiteral.class::cast)
+                .map(c -> c.getValueAs(String.class))
+                .orElse(null);
+    }
+
+    private static TableColumn.PhysicalColumn toTableColumn(
             SqlTableColumn tableColumn, SqlValidator sqlValidator) {
         if (!(tableColumn instanceof SqlRegularColumn)) {
             throw new TableException("Only regular columns are supported for this operation yet.");
@@ -214,7 +277,7 @@ public class OperationConverterUtils {
         SqlRegularColumn regularColumn = (SqlRegularColumn) tableColumn;
         String name = regularColumn.getName().getSimple();
         SqlDataTypeSpec typeSpec = regularColumn.getType();
-        boolean nullable = typeSpec.getNullable() == null ? true : typeSpec.getNullable();
+        boolean nullable = typeSpec.getNullable() == null || typeSpec.getNullable();
         LogicalType logicalType =
                 FlinkTypeFactory.toLogicalType(typeSpec.deriveType(sqlValidator, nullable));
         DataType dataType = TypeConversions.fromLogicalToDataType(logicalType);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 5ad6bce141a..10a7af4f4f6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -1273,39 +1273,61 @@ public class SqlToOperationConverterTest {
         prepareTable("tb1", false, false, true, 3);
         // rename pk column c
         Operation operation = parse("alter table tb1 rename c to c1");
-        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
         assertThat(operation.asSummaryString())
+                .isEqualTo("ALTER TABLE cat1.db1.tb1\n  MODIFY `c` TO `c1`");
+        assertThat(((AlterTableChangeOperation) operation).getNewTable().getUnresolvedSchema())
                 .isEqualTo(
-                        "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
-                                + "  `a` INT NOT NULL,\n"
-                                + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c1` STRING NOT NULL COMMENT 'column comment',\n"
-                                + "  `d` AS [a*(b+2 + a*b)],\n"
-                                + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
-                                + "  `f` AS [e.f1 + e.f2.f0],\n"
-                                + "  `g` METADATA VIRTUAL,\n"
-                                + "  `ts` TIMESTAMP(3) COMMENT 'just a comment',\n"
-                                + "  WATERMARK FOR `ts` AS [ts - interval '5' seconds],\n"
-                                + "  CONSTRAINT `ct1` PRIMARY KEY (`a`, `b`, `c1`) NOT ENFORCED\n"
-                                + ")");
+                        Schema.newBuilder()
+                                .column("a", DataTypes.INT().notNull())
+                                .column("b", DataTypes.BIGINT().notNull())
+                                .column("c1", DataTypes.STRING().notNull())
+                                .withComment("column comment")
+                                .columnByExpression("d", "a*(b+2 + a*b)")
+                                .column(
+                                        "e",
+                                        DataTypes.ROW(
+                                                DataTypes.STRING(),
+                                                DataTypes.INT(),
+                                                DataTypes.ROW(
+                                                        DataTypes.DOUBLE(),
+                                                        DataTypes.ARRAY(DataTypes.FLOAT()))))
+                                .columnByExpression("f", "e.f1 + e.f2.f0")
+                                .columnByMetadata("g", DataTypes.STRING(), null, true)
+                                .column("ts", DataTypes.TIMESTAMP(3))
+                                .withComment("just a comment")
+                                .watermark("ts", "ts - interval '5' seconds")
+                                .primaryKeyNamed("ct1", "a", "b", "c1")
+                                .build());
 
         // rename computed column
         operation = parse("alter table tb1 rename f to f1");
-        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
         assertThat(operation.asSummaryString())
+                .isEqualTo("ALTER TABLE cat1.db1.tb1\n  MODIFY `f` TO `f1`");
+        assertThat(((AlterTableChangeOperation) operation).getNewTable().getUnresolvedSchema())
                 .isEqualTo(
-                        "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
-                                + "  `a` INT NOT NULL,\n"
-                                + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
-                                + "  `d` AS [a*(b+2 + a*b)],\n"
-                                + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
-                                + "  `f1` AS [e.f1 + e.f2.f0],\n"
-                                + "  `g` METADATA VIRTUAL,\n"
-                                + "  `ts` TIMESTAMP(3) COMMENT 'just a comment',\n"
-                                + "  WATERMARK FOR `ts` AS [ts - interval '5' seconds],\n"
-                                + "  CONSTRAINT `ct1` PRIMARY KEY (`a`, `b`, `c`) NOT ENFORCED\n"
-                                + ")");
+                        Schema.newBuilder()
+                                .column("a", DataTypes.INT().notNull())
+                                .column("b", DataTypes.BIGINT().notNull())
+                                .column("c", DataTypes.STRING().notNull())
+                                .withComment("column comment")
+                                .columnByExpression("d", "a*(b+2 + a*b)")
+                                .column(
+                                        "e",
+                                        DataTypes.ROW(
+                                                DataTypes.STRING(),
+                                                DataTypes.INT(),
+                                                DataTypes.ROW(
+                                                        DataTypes.DOUBLE(),
+                                                        DataTypes.ARRAY(DataTypes.FLOAT()))))
+                                .columnByExpression("f1", "e.f1 + e.f2.f0")
+                                .columnByMetadata("g", DataTypes.STRING(), null, true)
+                                .column("ts", DataTypes.TIMESTAMP(3))
+                                .withComment("just a comment")
+                                .watermark("ts", "ts - interval '5' seconds")
+                                .primaryKeyNamed("ct1", "a", "b", "c")
+                                .build());
 
         // rename column c that is used in a computed column
         assertThatThrownBy(() -> parse("alter table tb1 rename a to a1"))
@@ -2076,6 +2098,11 @@ public class SqlToOperationConverterTest {
         Operation operation =
                 parse(
                         "alter table tb1 modify b bigint not null comment 'move b to first and add comment' first");
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "ALTER TABLE cat1.db1.tb1\n"
+                                + "  MODIFY `b` COMMENT 'move b to first and add comment',\n"
+                                + "  MODIFY `b` FIRST");
         assertAlterTableSchema(
                 operation,
                 tableIdentifier,
@@ -2103,6 +2130,11 @@ public class SqlToOperationConverterTest {
 
         // change nullability and pos
         operation = parse("alter table tb1 modify ts timestamp(3) not null after e");
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "ALTER TABLE cat1.db1.tb1\n"
+                                + "  MODIFY `ts` TIMESTAMP(3) NOT NULL,\n"
+                                + "  MODIFY `ts` AFTER `e`");
         assertAlterTableSchema(
                 operation,
                 tableIdentifier,
@@ -2137,7 +2169,17 @@ public class SqlToOperationConverterTest {
                                 + "f as upper(e) comment 'change f' after ts,\n"
                                 + "g int not null comment 'change g',\n"
                                 + "constraint ct2 primary key(e) not enforced)");
-
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "ALTER TABLE cat1.db1.tb1\n"
+                                + "  MODIFY `d` INT NOT NULL AS `a` + 2 COMMENT 'change d' AFTER `b`,\n"
+                                + "  MODIFY `c` BIGINT,\n"
+                                + "  MODIFY `c` FIRST,\n"
+                                + "  MODIFY `e` COMMENT 'change e',\n"
+                                + "  MODIFY `e` STRING NOT NULL,\n"
+                                + "  MODIFY `f` STRING NOT NULL AS UPPER(`e`) COMMENT 'change f' AFTER `ts`,\n"
+                                + "  MODIFY `g` INT NOT NULL COMMENT 'change g' ,\n"
+                                + "  MODIFY CONSTRAINT `ct2` PRIMARY KEY (`e`) NOT ENFORCED");
         assertAlterTableSchema(
                 operation,
                 tableIdentifier,
@@ -2169,7 +2211,15 @@ public class SqlToOperationConverterTest {
                                 + "e int metadata virtual,\n"
                                 + "watermark for f as f,\n"
                                 + "g multiset<int> not null comment 'change g' first)");
-
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "ALTER TABLE cat1.db1.tb2\n"
+                                + "  MODIFY `ts` COMMENT 'change ts',\n"
+                                + "  MODIFY `ts` INT,\n"
+                                + "  MODIFY `f` TIMESTAMP(3) NOT NULL ,\n"
+                                + "  MODIFY `e` INT METADATA VIRTUAL ,\n"
+                                + "  MODIFY `g` MULTISET<INT> NOT NULL COMMENT 'change g' FIRST,\n"
+                                + "  MODIFY WATERMARK FOR `f`: TIMESTAMP(3) NOT NULL AS `f`");
         assertAlterTableSchema(
                 operation,
                 tableIdentifier,