You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/02 15:53:43 UTC

[flink] 02/02: [FLINK-22137][table] Support DROP column/constraint/watermark for ALTER TABLE statement

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cc9540cacc2d775cc190ee6534794ec33fd07828
Author: Jane Chan <qi...@gmail.com>
AuthorDate: Thu Dec 29 14:34:32 2022 +0800

    [FLINK-22137][table] Support DROP column/constraint/watermark for ALTER TABLE statement
    
    This closes #21571
---
 .../src/test/resources/sql/table.q                 | 120 +++++--
 .../table/api/internal/TableEnvironmentImpl.java   |  22 --
 .../ddl/AlterTableDropConstraintOperation.java     |  43 ---
 .../planner/expressions/ColumnReferenceFinder.java |  86 +++--
 .../planner/operations/AlterSchemaConverter.java   | 381 +++++++++++++++------
 .../operations/SqlToOperationConverter.java        |  69 ++--
 .../expressions/ColumnReferenceFinderTest.java     |  80 +++++
 .../operations/SqlToOperationConverterTest.java    | 337 ++++++++++--------
 .../flink/table/api/TableEnvironmentTest.scala     | 100 ++++++
 9 files changed, 856 insertions(+), 382 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index 36809085117..1d1764ac5e0 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -537,45 +537,109 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` (
 
 !ok
 
+# ==========================================================================
+# test alter table drop column
+# ==========================================================================
+
+alter table orders2 drop (amount1, product, cleaned_product);
+[INFO] Execute statement succeed.
+!info
+
+# verify table options using SHOW CREATE TABLE
+show create table orders2;
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+  `trade_order_id` BIGINT NOT NULL,
+  `ts` TIMESTAMP(3) NOT NULL,
+  `user` VARCHAR(2147483647),
+  `user_email` VARCHAR(2147483647) NOT NULL,
+  `product_id` BIGINT NOT NULL,
+  `ptime` AS PROCTIME(),
+  WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE,
+  CONSTRAINT `order_constraint` PRIMARY KEY (`trade_order_id`) NOT ENFORCED
+) WITH (
+  'connector' = 'datagen'
+)
+
+!ok
+
+# ==========================================================================
+# test alter table drop primary key
+# ==========================================================================
+
+alter table orders2 drop primary key;
+[INFO] Execute statement succeed.
+!info
+
+# verify table options using SHOW CREATE TABLE
+show create table orders2;
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+  `trade_order_id` BIGINT NOT NULL,
+  `ts` TIMESTAMP(3) NOT NULL,
+  `user` VARCHAR(2147483647),
+  `user_email` VARCHAR(2147483647) NOT NULL,
+  `product_id` BIGINT NOT NULL,
+  `ptime` AS PROCTIME(),
+  WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE
+) WITH (
+  'connector' = 'datagen'
+)
+
+!ok
 
+# ==========================================================================
+# test alter table drop watermark
+# ==========================================================================
+
+alter table orders2 drop watermark;
+[INFO] Execute statement succeed.
+!info
+
+# verify table options using SHOW CREATE TABLE
+show create table orders2;
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+  `trade_order_id` BIGINT NOT NULL,
+  `ts` TIMESTAMP(3) NOT NULL,
+  `user` VARCHAR(2147483647),
+  `user_email` VARCHAR(2147483647) NOT NULL,
+  `product_id` BIGINT NOT NULL,
+  `ptime` AS PROCTIME()
+) WITH (
+  'connector' = 'datagen'
+)
+
+!ok
 
 # ==========================================================================
 # test describe table
 # ==========================================================================
 
 describe orders2;
-+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
-|            name |                        type |  null |                 key |                                extras |                  watermark |
-+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
-|  trade_order_id |                      BIGINT | FALSE | PRI(trade_order_id) |                                       |                            |
-|              ts |      TIMESTAMP(3) *ROWTIME* | FALSE |                     |                                       | `ts` - INTERVAL '1' MINUTE |
-|            user |                      STRING |  TRUE |                     |                                       |                            |
-|      user_email |                      STRING | FALSE |                     |                                       |                            |
-|      product_id |                      BIGINT | FALSE |                     |                                       |                            |
-|         product |                 VARCHAR(32) |  TRUE |                     |                                       |                            |
-| cleaned_product |                 VARCHAR(32) | FALSE |                     | AS COALESCE(`product`, 'missing_sku') |                            |
-|         amount1 |                         INT |  TRUE |                     |                                       |                            |
-|           ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |                     |                         AS PROCTIME() |                            |
-+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
-9 rows in set
++----------------+-----------------------------+-------+-----+---------------+-----------+
+|           name |                        type |  null | key |        extras | watermark |
++----------------+-----------------------------+-------+-----+---------------+-----------+
+| trade_order_id |                      BIGINT | FALSE |     |               |           |
+|             ts |                TIMESTAMP(3) | FALSE |     |               |           |
+|           user |                      STRING |  TRUE |     |               |           |
+|     user_email |                      STRING | FALSE |     |               |           |
+|     product_id |                      BIGINT | FALSE |     |               |           |
+|          ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |           |
++----------------+-----------------------------+-------+-----+---------------+-----------+
+6 rows in set
 !ok
 
 # test desc table
 desc orders2;
-+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
-|            name |                        type |  null |                 key |                                extras |                  watermark |
-+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
-|  trade_order_id |                      BIGINT | FALSE | PRI(trade_order_id) |                                       |                            |
-|              ts |      TIMESTAMP(3) *ROWTIME* | FALSE |                     |                                       | `ts` - INTERVAL '1' MINUTE |
-|            user |                      STRING |  TRUE |                     |                                       |                            |
-|      user_email |                      STRING | FALSE |                     |                                       |                            |
-|      product_id |                      BIGINT | FALSE |                     |                                       |                            |
-|         product |                 VARCHAR(32) |  TRUE |                     |                                       |                            |
-| cleaned_product |                 VARCHAR(32) | FALSE |                     | AS COALESCE(`product`, 'missing_sku') |                            |
-|         amount1 |                         INT |  TRUE |                     |                                       |                            |
-|           ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |                     |                         AS PROCTIME() |                            |
-+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
-9 rows in set
++----------------+-----------------------------+-------+-----+---------------+-----------+
+|           name |                        type |  null | key |        extras | watermark |
++----------------+-----------------------------+-------+-----+---------------+-----------+
+| trade_order_id |                      BIGINT | FALSE |     |               |           |
+|             ts |                TIMESTAMP(3) | FALSE |     |               |           |
+|           user |                      STRING |  TRUE |     |               |           |
+|     user_email |                      STRING | FALSE |     |               |           |
+|     product_id |                      BIGINT | FALSE |     |               |           |
+|          ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |           |
++----------------+-----------------------------+-------+-----+---------------+-----------+
+6 rows in set
 !ok
 
 # ==========================================================================
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index bffbef65c1c..4a1d000cc5e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -47,8 +47,6 @@ import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ContextResolvedTable;
@@ -125,7 +123,6 @@ import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
 import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
-import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
 import org.apache.flink.table.operations.ddl.AlterTableOperation;
 import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
@@ -160,7 +157,6 @@ import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.table.utils.print.PrintStyle;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkUserCodeClassLoaders;
@@ -993,24 +989,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
                             alterTablePropertiesOp.getCatalogTable(),
                             alterTablePropertiesOp.getTableIdentifier(),
                             false);
-                } else if (alterTableOperation instanceof AlterTableDropConstraintOperation) {
-                    AlterTableDropConstraintOperation dropConstraintOperation =
-                            (AlterTableDropConstraintOperation) operation;
-                    CatalogTable oriTable =
-                            catalogManager
-                                    .getTable(dropConstraintOperation.getTableIdentifier())
-                                    .get()
-                                    .getResolvedTable();
-                    CatalogTable newTable =
-                            new CatalogTableImpl(
-                                    TableSchemaUtils.dropConstraint(
-                                            oriTable.getSchema(),
-                                            dropConstraintOperation.getConstraintName()),
-                                    oriTable.getPartitionKeys(),
-                                    oriTable.getOptions(),
-                                    oriTable.getComment());
-                    catalogManager.alterTable(
-                            newTable, dropConstraintOperation.getTableIdentifier(), false);
                 } else if (alterTableOperation instanceof AlterPartitionPropertiesOperation) {
                     AlterPartitionPropertiesOperation alterPartPropsOp =
                             (AlterPartitionPropertiesOperation) operation;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java
deleted file mode 100644
index b3f7c7d448f..00000000000
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.operations.ddl;
-
-import org.apache.flink.table.catalog.ObjectIdentifier;
-
-/** Operation of "ALTER TABLE ADD [CONSTRAINT constraintName] ..." clause. * */
-public class AlterTableDropConstraintOperation extends AlterTableOperation {
-    private final String constraintName;
-
-    public AlterTableDropConstraintOperation(
-            ObjectIdentifier tableIdentifier, String constraintName) {
-        super(tableIdentifier);
-        this.constraintName = constraintName;
-    }
-
-    public String getConstraintName() {
-        return constraintName;
-    }
-
-    @Override
-    public String asSummaryString() {
-        return String.format(
-                "ALTER TABLE %s DROP CONSTRAINT %s",
-                tableIdentifier.asSummaryString(), constraintName);
-    }
-}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
index 7b22246de8d..d3593cb4d85 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
@@ -19,6 +19,9 @@
 package org.apache.flink.table.planner.expressions;
 
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
@@ -29,33 +32,68 @@ import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
 
 import org.apache.calcite.rex.RexInputRef;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /** A finder used to look up referenced column name in a {@link ResolvedExpression}. */
 public class ColumnReferenceFinder {
 
     private ColumnReferenceFinder() {}
 
-    public static Set<String> findReferencedColumn(
-            ResolvedExpression resolvedExpression, List<String> tableColumns) {
-        ColumnReferenceVisitor visitor = new ColumnReferenceVisitor(tableColumns);
-        visitor.visit(resolvedExpression);
-        return visitor.referencedColumns;
+    /**
+     * Find referenced column names that derive the computed column.
+     *
+     * @param columnName the name of the column
+     * @param schema the schema contains the computed column definition
+     * @return the referenced column names
+     */
+    public static Set<String> findReferencedColumn(String columnName, ResolvedSchema schema) {
+        Column column =
+                schema.getColumn(columnName)
+                        .orElseThrow(
+                                () ->
+                                        new ValidationException(
+                                                String.format(
+                                                        "The input column %s doesn't exist in the schema.",
+                                                        columnName)));
+        if (!(column instanceof Column.ComputedColumn)) {
+            return Collections.emptySet();
+        }
+        ColumnReferenceVisitor visitor =
+                new ColumnReferenceVisitor(
+                        // the input ref index is based on a projection of non-computed columns
+                        schema.getColumns().stream()
+                                .filter(c -> !(c instanceof Column.ComputedColumn))
+                                .map(Column::getName)
+                                .collect(Collectors.toList()));
+        return visitor.visit(((Column.ComputedColumn) column).getExpression());
+    }
+
+    /**
+     * Find referenced column names that derive the watermark expression.
+     *
+     * @param schema resolved columns contains the watermark expression.
+     * @return the referenced column names
+     */
+    public static Set<String> findWatermarkReferencedColumn(ResolvedSchema schema) {
+        ColumnReferenceVisitor visitor = new ColumnReferenceVisitor(schema.getColumnNames());
+        return schema.getWatermarkSpecs().stream()
+                .flatMap(spec -> visitor.visit(spec.getWatermarkExpression()).stream())
+                .collect(Collectors.toSet());
     }
 
-    private static class ColumnReferenceVisitor extends ExpressionDefaultVisitor<Void> {
+    private static class ColumnReferenceVisitor extends ExpressionDefaultVisitor<Set<String>> {
         private final List<String> tableColumns;
-        private final Set<String> referencedColumns;
 
         public ColumnReferenceVisitor(List<String> tableColumns) {
             this.tableColumns = tableColumns;
-            this.referencedColumns = new HashSet<>();
         }
 
         @Override
-        public Void visit(Expression expression) {
+        public Set<String> visit(Expression expression) {
             if (expression instanceof LocalReferenceExpression) {
                 return visit((LocalReferenceExpression) expression);
             } else if (expression instanceof FieldReferenceExpression) {
@@ -70,38 +108,34 @@ public class ColumnReferenceFinder {
         }
 
         @Override
-        public Void visit(FieldReferenceExpression fieldReference) {
-            referencedColumns.add(fieldReference.getName());
-            return null;
+        public Set<String> visit(FieldReferenceExpression fieldReference) {
+            return Collections.singleton(fieldReference.getName());
         }
 
-        public Void visit(LocalReferenceExpression localReference) {
-            referencedColumns.add(localReference.getName());
-            return null;
+        public Set<String> visit(LocalReferenceExpression localReference) {
+            return Collections.singleton(localReference.getName());
         }
 
-        public Void visit(RexNodeExpression rexNode) {
+        public Set<String> visit(RexNodeExpression rexNode) {
             // get the referenced column ref in table
             Set<RexInputRef> inputRefs = FlinkRexUtil.findAllInputRefs(rexNode.getRexNode());
             // get the referenced column name by index
-            inputRefs.forEach(
-                    inputRef -> {
-                        int index = inputRef.getIndex();
-                        referencedColumns.add(tableColumns.get(index));
-                    });
-            return null;
+            return inputRefs.stream()
+                    .map(inputRef -> tableColumns.get(inputRef.getIndex()))
+                    .collect(Collectors.toSet());
         }
 
         @Override
-        public Void visit(CallExpression call) {
+        public Set<String> visit(CallExpression call) {
+            Set<String> references = new HashSet<>();
             for (Expression expression : call.getChildren()) {
-                visit(expression);
+                references.addAll(visit(expression));
             }
-            return null;
+            return references;
         }
 
         @Override
-        protected Void defaultMethod(Expression expression) {
+        protected Set<String> defaultMethod(Expression expression) {
             throw new TableException("Unexpected expression: " + expression);
         }
     }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
index b00038ce20f..b0b5e65fdf1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
@@ -19,6 +19,10 @@
 package org.apache.flink.table.planner.operations;
 
 import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableDropColumn;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableDropPrimaryKey;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableDropWatermark;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableModify;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableRenameColumn;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
@@ -29,9 +33,8 @@ import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.SchemaResolver;
 import org.apache.flink.table.catalog.TableChange;
@@ -62,6 +65,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -93,6 +98,10 @@ public class AlterSchemaConverter {
         this.schemaResolver = schemaResolver;
     }
 
+    /**
+     * Convert ALTER TABLE ADD | MODIFY (&lt;schema_component&gt; [, &lt;schema_component&gt;, ...])
+     * to generate an updated Schema.
+     */
     public Schema applySchemaChange(
             SqlAlterTableSchema alterTableSchema,
             Schema originSchema,
@@ -101,7 +110,7 @@ public class AlterSchemaConverter {
         SchemaConverter converter =
                 strategy == AlterSchemaStrategy.ADD
                         ? new AddSchemaConverter(
-                                originalSchema,
+                                originSchema,
                                 (FlinkTypeFactory) sqlValidator.getTypeFactory(),
                                 sqlValidator,
                                 constraintValidator,
@@ -109,7 +118,7 @@ public class AlterSchemaConverter {
                                 schemaResolver,
                                 tableChangeCollector)
                         : new ModifySchemaConverter(
-                                originalSchema,
+                                originSchema,
                                 (FlinkTypeFactory) sqlValidator.getTypeFactory(),
                                 sqlValidator,
                                 constraintValidator,
@@ -122,78 +131,145 @@ public class AlterSchemaConverter {
         return converter.convert();
     }
 
+    /** Convert ALTER TABLE RENAME col_name to new_col_name to generate an updated Schema. */
     public Schema applySchemaChange(
-            SqlAlterTableRenameColumn renameColumn, ContextResolvedTable originalTable) {
-        String oldColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
+            SqlAlterTableRenameColumn renameColumn, ResolvedCatalogTable originTable) {
+        String originColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
         String newColumnName = getColumnName(renameColumn.getNewColumnIdentifier());
-        List<String> tableColumns =
-                originalTable.getResolvedSchema().getColumns().stream()
-                        .map(Column::getName)
-                        .collect(Collectors.toList());
-        // validate old column is exists or new column isn't duplicated or old column isn't
-        // referenced by computed column
+        // validate origin column is exists, new column name does not collide with existed column
+        // names, and origin column isn't referenced by computed column
         validateColumnName(
-                oldColumnName,
+                originColumnName,
                 newColumnName,
-                tableColumns,
-                originalTable.getResolvedSchema(),
-                ((CatalogTable) originalTable.getResolvedTable()).getPartitionKeys());
-
-        // validate old column isn't referenced by watermark
-        List<WatermarkSpec> watermarkSpecs = originalTable.getResolvedSchema().getWatermarkSpecs();
-        watermarkSpecs.forEach(
-                watermarkSpec -> {
-                    String rowtimeAttribute = watermarkSpec.getRowtimeAttribute();
-                    Set<String> referencedColumns =
-                            ColumnReferenceFinder.findReferencedColumn(
-                                    watermarkSpec.getWatermarkExpression(), tableColumns);
-                    if (oldColumnName.equals(rowtimeAttribute)
-                            || referencedColumns.contains(oldColumnName)) {
-                        throw new ValidationException(
-                                String.format(
-                                        "Old column %s is referred by watermark expression %s, "
-                                                + "currently doesn't allow to rename column which is "
-                                                + "referred by watermark expression.",
-                                        oldColumnName, watermarkSpec.asSummaryString()));
+                originTable.getResolvedSchema(),
+                originTable.getPartitionKeys());
+        validateWatermark(originTable, originColumnName);
+
+        // generate new schema
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> {
+                    if (column.getName().equals(originColumnName)) {
+                        buildNewColumnFromOriginColumn(builder, column, newColumnName);
+                    } else {
+                        builder.fromColumns(Collections.singletonList(column));
                     }
                 });
+        buildUpdatedPrimaryKey(
+                schemaBuilder,
+                originTable,
+                (pk) -> pk.equals(originColumnName) ? newColumnName : pk);
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
 
-        Schema.Builder builder = Schema.newBuilder();
-        // build column
-        Schema originSchema = originalTable.getTable().getUnresolvedSchema();
-        originSchema
-                .getColumns()
+    /** Convert ALTER TABLE DROP (col1 [, col2, ...]) to generate an updated Schema. */
+    public Schema applySchemaChange(
+            SqlAlterTableDropColumn dropColumn, ResolvedCatalogTable originTable) {
+        Set<String> columnsToDrop = new HashSet<>();
+        dropColumn
+                .getColumnList()
                 .forEach(
-                        column -> {
-                            if (oldColumnName.equals(column.getName())) {
-                                buildNewColumnFromOriginColumn(builder, column, newColumnName);
-                            } else {
-                                builder.fromColumns(Collections.singletonList(column));
+                        identifier -> {
+                            String name = getColumnName((SqlIdentifier) identifier);
+                            if (!columnsToDrop.add(name)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "%sDuplicate column `%s`.", EX_MSG_PREFIX, name));
                             }
                         });
-        // build primary key
-        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
-        if (originPrimaryKey.isPresent()) {
-            List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames();
-            String constrainName = originPrimaryKey.get().getConstraintName();
-            List<String> newPrimaryKeyNames =
-                    originPrimaryKeyNames.stream()
-                            .map(pkName -> pkName.equals(oldColumnName) ? newColumnName : pkName)
-                            .collect(Collectors.toList());
-            builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
-        }
-
-        // build watermark
-        originSchema
-                .getWatermarkSpecs()
-                .forEach(
-                        watermarkSpec ->
-                                builder.watermark(
-                                        watermarkSpec.getColumnName(),
-                                        watermarkSpec.getWatermarkExpression()));
 
-        // generate new schema
-        return builder.build();
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        for (SqlNode columnIdentifier : dropColumn.getColumnList()) {
+            String columnToDrop = getColumnName((SqlIdentifier) columnIdentifier);
+            // validate the column to drop exists in the table schema, is not a primary key and
+            // does not derive any computed column
+            validateColumnName(
+                    columnToDrop,
+                    originTable.getResolvedSchema(),
+                    originTable.getPartitionKeys(),
+                    columnsToDrop);
+            validateWatermark(originTable, columnToDrop);
+        }
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> {
+                    if (!columnsToDrop.contains(column.getName())) {
+                        builder.fromColumns(Collections.singletonList(column));
+                    }
+                });
+        buildUpdatedPrimaryKey(schemaBuilder, originTable, Function.identity());
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
+
+    /** Convert ALTER TABLE DROP PRIMARY KEY to generate an updated Schema. */
+    public Schema applySchemaChange(
+            SqlAlterTableDropPrimaryKey dropPrimaryKey, ResolvedCatalogTable originTable) {
+        Optional<UniqueConstraint> pkConstraint = originTable.getResolvedSchema().getPrimaryKey();
+        if (!pkConstraint.isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "%sThe base table does not define any primary key.", EX_MSG_PREFIX));
+        }
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> builder.fromColumns(Collections.singletonList(column)));
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
+
+    /**
+     * Convert ALTER TABLE DROP CONSTRAINT constraint_name to generate an updated {@link Schema}.
+     */
+    public Schema applySchemaChange(
+            SqlAlterTableDropConstraint dropConstraint, ResolvedCatalogTable originTable) {
+        Optional<UniqueConstraint> pkConstraint = originTable.getResolvedSchema().getPrimaryKey();
+        if (!pkConstraint.isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "%sThe base table does not define any primary key.", EX_MSG_PREFIX));
+        }
+        SqlIdentifier constraintIdentifier = dropConstraint.getConstraintName();
+        String constraintName = pkConstraint.get().getName();
+        if (constraintIdentifier != null
+                && !constraintIdentifier.getSimple().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "%sThe base table does not define a primary key constraint named '%s'. "
+                                    + "Available constraint name: ['%s'].",
+                            EX_MSG_PREFIX, constraintIdentifier.getSimple(), constraintName));
+        }
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> builder.fromColumns(Collections.singletonList(column)));
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
+
+    /** Convert ALTER TABLE DROP WATERMARK to generate an updated {@link Schema}. */
+    public Schema applySchemaChange(
+            SqlAlterTableDropWatermark dropWatermark, ResolvedCatalogTable originTable) {
+        if (originTable.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "%sThe base table does not define any watermark strategy.",
+                            EX_MSG_PREFIX));
+        }
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> builder.fromColumns(Collections.singletonList(column)));
+        buildUpdatedPrimaryKey(schemaBuilder, originTable, Function.identity());
+        return schemaBuilder.build();
     }
 
     // --------------------------------------------------------------------------------------------
@@ -216,7 +292,7 @@ public class AlterSchemaConverter {
         List<Function<ResolvedSchema, TableChange>> changeBuilders = new ArrayList<>();
 
         SchemaConverter(
-                Schema originalSchema,
+                Schema originSchema,
                 FlinkTypeFactory typeFactory,
                 SqlValidator sqlValidator,
                 Consumer<SqlTableConstraint> constraintValidator,
@@ -229,13 +305,13 @@ public class AlterSchemaConverter {
             this.escapeExpressions = escapeExpressions;
             this.schemaResolver = schemaResolver;
             this.changesCollector = changesCollector;
-            populateColumnsFromSourceTable(originalSchema);
-            populatePrimaryKeyFromSourceTable(originalSchema);
-            populateWatermarkFromSourceTable(originalSchema);
+            populateColumnsFromSourceTable(originSchema);
+            populatePrimaryKeyFromSourceTable(originSchema);
+            populateWatermarkFromSourceTable(originSchema);
         }
 
-        private void populateColumnsFromSourceTable(Schema originalSchema) {
-            originalSchema
+        private void populateColumnsFromSourceTable(Schema originSchema) {
+            originSchema
                     .getColumns()
                     .forEach(
                             column -> {
@@ -245,15 +321,15 @@ public class AlterSchemaConverter {
                             });
         }
 
-        private void populatePrimaryKeyFromSourceTable(Schema originalSchema) {
-            if (originalSchema.getPrimaryKey().isPresent()) {
-                primaryKey = originalSchema.getPrimaryKey().get();
+        private void populatePrimaryKeyFromSourceTable(Schema originSchema) {
+            if (originSchema.getPrimaryKey().isPresent()) {
+                primaryKey = originSchema.getPrimaryKey().get();
             }
         }
 
-        private void populateWatermarkFromSourceTable(Schema originalSchema) {
+        private void populateWatermarkFromSourceTable(Schema originSchema) {
             for (Schema.UnresolvedWatermarkSpec sourceWatermarkSpec :
-                    originalSchema.getWatermarkSpecs()) {
+                    originSchema.getWatermarkSpecs()) {
                 watermarkSpec = sourceWatermarkSpec;
             }
         }
@@ -303,13 +379,13 @@ public class AlterSchemaConverter {
         private void updatePrimaryKeyNullability(String columnName) {
             Schema.UnresolvedColumn column = columns.get(columnName);
             if (column instanceof Schema.UnresolvedPhysicalColumn) {
-                AbstractDataType<?> originalType =
+                AbstractDataType<?> originType =
                         ((Schema.UnresolvedPhysicalColumn) column).getDataType();
                 columns.put(
                         columnName,
                         new Schema.UnresolvedPhysicalColumn(
                                 columnName,
-                                originalType.notNull(),
+                                originType.notNull(),
                                 column.getComment().orElse(null)));
             }
         }
@@ -455,7 +531,7 @@ public class AlterSchemaConverter {
     private static class AddSchemaConverter extends SchemaConverter {
 
         AddSchemaConverter(
-                Schema originalSchema,
+                Schema originSchema,
                 FlinkTypeFactory typeFactory,
                 SqlValidator sqlValidator,
                 Consumer<SqlTableConstraint> constraintValidator,
@@ -463,7 +539,7 @@ public class AlterSchemaConverter {
                 SchemaResolver schemaResolver,
                 List<TableChange> changeCollector) {
             super(
-                    originalSchema,
+                    originSchema,
                     typeFactory,
                     sqlValidator,
                     constraintValidator,
@@ -536,7 +612,7 @@ public class AlterSchemaConverter {
     private static class ModifySchemaConverter extends SchemaConverter {
 
         ModifySchemaConverter(
-                Schema originalSchema,
+                Schema originSchema,
                 FlinkTypeFactory typeFactory,
                 SqlValidator sqlValidator,
                 Consumer<SqlTableConstraint> constraintValidator,
@@ -544,7 +620,7 @@ public class AlterSchemaConverter {
                 SchemaResolver schemaResolver,
                 List<TableChange> tableChangeCollector) {
             super(
-                    originalSchema,
+                    originSchema,
                     typeFactory,
                     sqlValidator,
                     constraintValidator,
@@ -611,52 +687,151 @@ public class AlterSchemaConverter {
     private void validateColumnName(
             String originColumnName,
             String newColumnName,
-            List<String> tableColumns,
-            ResolvedSchema originResolvedSchema,
+            ResolvedSchema originSchemas,
             List<String> partitionKeys) {
-        // validate old column
-        if (!tableColumns.contains(originColumnName)) {
+        validateColumnName(
+                originColumnName,
+                originSchemas,
+                partitionKeys,
+                // fail the operation of renaming column, once the column derives a computed column
+                (referencedColumn, computedColumn) -> referencedColumn.contains(originColumnName));
+        // validate new column
+        if (originSchemas.getColumn(newColumnName).isPresent()) {
             throw new ValidationException(
                     String.format(
-                            "Old column %s not found in table schema for RENAME COLUMN",
-                            originColumnName));
+                            "%sThe column `%s` already existed in table schema.",
+                            EX_MSG_PREFIX, newColumnName));
         }
+    }
 
-        // validate new column
-        if (tableColumns.contains(newColumnName)) {
+    private void validateColumnName(
+            String columnToDrop,
+            ResolvedSchema originSchema,
+            List<String> partitionKeys,
+            Set<String> columnsToDrop) {
+        validateColumnName(
+                columnToDrop,
+                originSchema,
+                partitionKeys,
+                // fail the operation of dropping column, only if the column derives a computed
+                // column, and the computed column is not being dropped along with the origin column
+                (referencedColumn, computedColumn) ->
+                        referencedColumn.contains(columnToDrop)
+                                && !columnsToDrop.contains(computedColumn.getName()));
+        originSchema
+                .getPrimaryKey()
+                .ifPresent(
+                        pk -> {
+                            if (pk.getColumns().contains(columnToDrop)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "%sThe column `%s` is used as the primary key.",
+                                                EX_MSG_PREFIX, columnToDrop));
+                            }
+                        });
+    }
+
+    private void validateColumnName(
+            String columnToAlter,
+            ResolvedSchema originSchema,
+            List<String> partitionKeys,
+            BiFunction<Set<String>, Column.ComputedColumn, Boolean> computedColumnChecker) {
+        // validate origin column
+        Set<String> tableColumns = new HashSet<>(originSchema.getColumnNames());
+        if (!tableColumns.contains(columnToAlter)) {
             throw new ValidationException(
                     String.format(
-                            "New column %s already existed in table schema for RENAME COLUMN",
-                            newColumnName));
+                            "%sThe column `%s` does not exist in the base table.",
+                            EX_MSG_PREFIX, columnToAlter));
         }
 
-        // validate old column name isn't referred by computed column case
-        originResolvedSchema.getColumns().stream()
+        // validate origin column name isn't referred by computed column case
+        originSchema.getColumns().stream()
                 .filter(column -> column instanceof Column.ComputedColumn)
                 .forEach(
                         column -> {
                             Column.ComputedColumn computedColumn = (Column.ComputedColumn) column;
                             Set<String> referencedColumn =
                                     ColumnReferenceFinder.findReferencedColumn(
-                                            computedColumn.getExpression(), tableColumns);
-                            if (referencedColumn.contains(originColumnName)) {
+                                            computedColumn.getName(), originSchema);
+                            if (computedColumnChecker.apply(referencedColumn, computedColumn)) {
                                 throw new ValidationException(
                                         String.format(
-                                                "Old column %s is referred by computed column %s, currently doesn't "
-                                                        + "allow to rename column which is referred by computed column.",
-                                                originColumnName,
+                                                "%sThe column `%s` is referenced by computed column %s.",
+                                                EX_MSG_PREFIX,
+                                                columnToAlter,
                                                 computedColumn.asSummaryString()));
                             }
                         });
-        // validate partition keys doesn't contain the old column
-        if (partitionKeys.contains(originColumnName)) {
+        // validate partition keys doesn't contain the origin column
+        if (partitionKeys.contains(columnToAlter)) {
+            throw new ValidationException(
+                    String.format(
+                            "%sThe column `%s` is used as the partition keys.",
+                            EX_MSG_PREFIX, columnToAlter));
+        }
+    }
+
+    private void validateWatermark(ResolvedCatalogTable originTable, String columnToAlter) {
+        // validate origin column isn't referenced by watermark
+        List<WatermarkSpec> watermarkSpecs = originTable.getResolvedSchema().getWatermarkSpecs();
+        Set<String> referencedColumns =
+                ColumnReferenceFinder.findWatermarkReferencedColumn(
+                        originTable.getResolvedSchema());
+        Set<String> rowtimeAttributes =
+                originTable.getResolvedSchema().getWatermarkSpecs().stream()
+                        .map(WatermarkSpec::getRowtimeAttribute)
+                        .collect(Collectors.toSet());
+        if (rowtimeAttributes.contains(columnToAlter)
+                || referencedColumns.contains(columnToAlter)) {
             throw new ValidationException(
                     String.format(
-                            "Can not rename column %s because it is used as the partition keys.",
-                            originColumnName));
+                            "%sThe column `%s` is referenced by watermark expression %s.",
+                            EX_MSG_PREFIX, columnToAlter, watermarkSpecs));
         }
     }
 
+    private void buildUpdatedColumn(
+            Schema.Builder builder,
+            ResolvedCatalogTable originTable,
+            BiConsumer<Schema.Builder, Schema.UnresolvedColumn> columnConsumer) {
+        // build column
+        originTable
+                .getUnresolvedSchema()
+                .getColumns()
+                .forEach(column -> columnConsumer.accept(builder, column));
+    }
+
+    private void buildUpdatedPrimaryKey(
+            Schema.Builder builder,
+            ResolvedCatalogTable originTable,
+            Function<String, String> columnRenamer) {
+        originTable
+                .getUnresolvedSchema()
+                .getPrimaryKey()
+                .ifPresent(
+                        pk -> {
+                            List<String> originPrimaryKeyNames = pk.getColumnNames();
+                            String constrainName = pk.getConstraintName();
+                            List<String> newPrimaryKeyNames =
+                                    originPrimaryKeyNames.stream()
+                                            .map(columnRenamer)
+                                            .collect(Collectors.toList());
+                            builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
+                        });
+    }
+
+    private void buildUpdatedWatermark(Schema.Builder builder, ResolvedCatalogTable originTable) {
+        originTable
+                .getUnresolvedSchema()
+                .getWatermarkSpecs()
+                .forEach(
+                        watermarkSpec ->
+                                builder.watermark(
+                                        watermarkSpec.getColumnName(),
+                                        watermarkSpec.getWatermarkExpression()));
+    }
+
     private void buildNewColumnFromOriginColumn(
             Schema.Builder builder, Schema.UnresolvedColumn originColumn, String columnName) {
         if (originColumn instanceof Schema.UnresolvedComputedColumn) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 3ae34126d33..acb46f60fc9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -25,7 +25,10 @@ import org.apache.flink.sql.parser.ddl.SqlAlterDatabase;
 import org.apache.flink.sql.parser.ddl.SqlAlterFunction;
 import org.apache.flink.sql.parser.ddl.SqlAlterTable;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableCompact;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableDropColumn;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableDropPrimaryKey;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableDropWatermark;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableOptions;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableRename;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableRenameColumn;
@@ -88,7 +91,6 @@ import org.apache.flink.sql.parser.dql.SqlUnloadModule;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -163,7 +165,6 @@ import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
 import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
-import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
 import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
 import org.apache.flink.table.operations.ddl.AlterViewAsOperation;
@@ -488,6 +489,7 @@ public class SqlToOperationConverter {
         if (baseTable instanceof CatalogView) {
             throw new ValidationException("ALTER TABLE for a view is not allowed");
         }
+        ResolvedCatalogTable resolvedCatalogTable = (ResolvedCatalogTable) baseTable;
         if (sqlAlterTable instanceof SqlAlterTableRename) {
             UnresolvedIdentifier newUnresolvedIdentifier =
                     UnresolvedIdentifier.of(
@@ -503,23 +505,45 @@ public class SqlToOperationConverter {
         } else if (sqlAlterTable instanceof SqlAlterTableReset) {
             return convertAlterTableReset(
                     tableIdentifier, (CatalogTable) baseTable, (SqlAlterTableReset) sqlAlterTable);
+        } else if (sqlAlterTable instanceof SqlAlterTableDropColumn) {
+            return new AlterTableSchemaOperation(
+                    tableIdentifier,
+                    CatalogTable.of(
+                            alterSchemaConverter.applySchemaChange(
+                                    (SqlAlterTableDropColumn) sqlAlterTable, resolvedCatalogTable),
+                            resolvedCatalogTable.getComment(),
+                            resolvedCatalogTable.getPartitionKeys(),
+                            resolvedCatalogTable.getOptions()));
+        } else if (sqlAlterTable instanceof SqlAlterTableDropPrimaryKey) {
+            return new AlterTableSchemaOperation(
+                    tableIdentifier,
+                    CatalogTable.of(
+                            alterSchemaConverter.applySchemaChange(
+                                    (SqlAlterTableDropPrimaryKey) sqlAlterTable,
+                                    resolvedCatalogTable),
+                            resolvedCatalogTable.getComment(),
+                            resolvedCatalogTable.getPartitionKeys(),
+                            resolvedCatalogTable.getOptions()));
         } else if (sqlAlterTable instanceof SqlAlterTableDropConstraint) {
-            SqlAlterTableDropConstraint dropConstraint =
-                    ((SqlAlterTableDropConstraint) sqlAlterTable);
-            String constraintName = dropConstraint.getConstraintName().getSimple();
-            TableSchema oriSchema =
-                    TableSchema.fromResolvedSchema(
-                            baseTable
-                                    .getUnresolvedSchema()
-                                    .resolve(catalogManager.getSchemaResolver()));
-            if (!oriSchema
-                    .getPrimaryKey()
-                    .filter(pk -> pk.getName().equals(constraintName))
-                    .isPresent()) {
-                throw new ValidationException(
-                        String.format("CONSTRAINT [%s] does not exist", constraintName));
-            }
-            return new AlterTableDropConstraintOperation(tableIdentifier, constraintName);
+            return new AlterTableSchemaOperation(
+                    tableIdentifier,
+                    CatalogTable.of(
+                            alterSchemaConverter.applySchemaChange(
+                                    (SqlAlterTableDropConstraint) sqlAlterTable,
+                                    resolvedCatalogTable),
+                            resolvedCatalogTable.getComment(),
+                            resolvedCatalogTable.getPartitionKeys(),
+                            resolvedCatalogTable.getOptions()));
+        } else if (sqlAlterTable instanceof SqlAlterTableDropWatermark) {
+            return new AlterTableSchemaOperation(
+                    tableIdentifier,
+                    CatalogTable.of(
+                            alterSchemaConverter.applySchemaChange(
+                                    (SqlAlterTableDropWatermark) sqlAlterTable,
+                                    resolvedCatalogTable),
+                            resolvedCatalogTable.getComment(),
+                            resolvedCatalogTable.getPartitionKeys(),
+                            resolvedCatalogTable.getOptions()));
         } else if (sqlAlterTable instanceof SqlAddReplaceColumns) {
             return OperationConverterUtils.convertAddReplaceColumns(
                     tableIdentifier,
@@ -537,15 +561,14 @@ public class SqlToOperationConverter {
                     (SqlAlterTableRenameColumn) sqlAlterTable;
             Schema newSchema =
                     alterSchemaConverter.applySchemaChange(
-                            sqlAlterTableRenameColumn, optionalCatalogTable.get());
-            CatalogTable baseCatalogTable = (CatalogTable) baseTable;
+                            sqlAlterTableRenameColumn, resolvedCatalogTable);
             return new AlterTableSchemaOperation(
                     tableIdentifier,
                     CatalogTable.of(
                             newSchema,
-                            baseCatalogTable.getComment(),
-                            baseCatalogTable.getPartitionKeys(),
-                            baseCatalogTable.getOptions()));
+                            resolvedCatalogTable.getComment(),
+                            resolvedCatalogTable.getPartitionKeys(),
+                            resolvedCatalogTable.getOptions()));
         } else if (sqlAlterTable instanceof SqlAddPartitions) {
             List<CatalogPartitionSpec> specs = new ArrayList<>();
             List<CatalogPartition> partitions = new ArrayList<>();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java
new file mode 100644
index 00000000000..0444afea947
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ColumnReferenceFinder}. */
+public class ColumnReferenceFinderTest extends TableTestBase {
+
+    private final StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
+    private ResolvedSchema resolvedSchema;
+
+    @BeforeEach
+    public void beforeEach() {
+        resolvedSchema =
+                util.testingTableEnv()
+                        .getCatalogManager()
+                        .getSchemaResolver()
+                        .resolve(
+                                Schema.newBuilder()
+                                        .columnByExpression("a", "b || '_001'")
+                                        .column("b", DataTypes.STRING())
+                                        .columnByExpression("c", "d * e + 2")
+                                        .column("d", DataTypes.DOUBLE())
+                                        .columnByMetadata("e", DataTypes.INT(), null, true)
+                                        .column(
+                                                "tuple",
+                                                DataTypes.ROW(
+                                                        DataTypes.TIMESTAMP(3), DataTypes.INT()))
+                                        .columnByExpression("ts", "tuple.f0")
+                                        .watermark("ts", "ts - interval '5' day")
+                                        .build());
+    }
+
+    @Test
+    public void testFindReferencedColumn() {
+        assertThat(ColumnReferenceFinder.findReferencedColumn("b", resolvedSchema))
+                .isEqualTo(Collections.emptySet());
+
+        assertThat(ColumnReferenceFinder.findReferencedColumn("a", resolvedSchema))
+                .containsExactlyInAnyOrder("b");
+
+        assertThat(ColumnReferenceFinder.findReferencedColumn("c", resolvedSchema))
+                .containsExactlyInAnyOrder("d", "e");
+
+        assertThat(ColumnReferenceFinder.findReferencedColumn("ts", resolvedSchema))
+                .containsExactlyInAnyOrder("tuple");
+
+        assertThat(ColumnReferenceFinder.findWatermarkReferencedColumn(resolvedSchema))
+                .containsExactlyInAnyOrder("ts");
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 4dedea89f21..5ad6bce141a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -81,7 +81,6 @@ import org.apache.flink.table.operations.command.SetOperation;
 import org.apache.flink.table.operations.command.ShowJarsOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
-import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
 import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
@@ -1275,70 +1274,50 @@ public class SqlToOperationConverterTest {
         // rename pk column c
         Operation operation = parse("alter table tb1 rename c to c1");
         assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
-        assertThat(((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema())
+        assertThat(operation.asSummaryString())
                 .isEqualTo(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.INT().notNull())
-                                .column("b", DataTypes.BIGINT().notNull())
-                                .column("c1", DataTypes.STRING().notNull())
-                                .withComment("column comment")
-                                .columnByExpression("d", "a*(b+2 + a*b)")
-                                .column(
-                                        "e",
-                                        DataTypes.ROW(
-                                                DataTypes.STRING(),
-                                                DataTypes.INT(),
-                                                DataTypes.ROW(
-                                                        DataTypes.DOUBLE(),
-                                                        DataTypes.ARRAY(DataTypes.FLOAT()))))
-                                .columnByExpression("f", "e.f1 + e.f2.f0")
-                                .columnByMetadata("g", DataTypes.STRING(), null, true)
-                                .column("ts", DataTypes.TIMESTAMP(3))
-                                .withComment("just a comment")
-                                .watermark("ts", "ts - interval '5' seconds")
-                                .primaryKeyNamed("ct1", "a", "b", "c1")
-                                .build());
+                        "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT NOT NULL,\n"
+                                + "  `c1` STRING NOT NULL COMMENT 'column comment',\n"
+                                + "  `d` AS [a*(b+2 + a*b)],\n"
+                                + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+                                + "  `f` AS [e.f1 + e.f2.f0],\n"
+                                + "  `g` METADATA VIRTUAL,\n"
+                                + "  `ts` TIMESTAMP(3) COMMENT 'just a comment',\n"
+                                + "  WATERMARK FOR `ts` AS [ts - interval '5' seconds],\n"
+                                + "  CONSTRAINT `ct1` PRIMARY KEY (`a`, `b`, `c1`) NOT ENFORCED\n"
+                                + ")");
 
         // rename computed column
         operation = parse("alter table tb1 rename f to f1");
         assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
-        assertThat(((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema())
+        assertThat(operation.asSummaryString())
                 .isEqualTo(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.INT().notNull())
-                                .column("b", DataTypes.BIGINT().notNull())
-                                .column("c", DataTypes.STRING().notNull())
-                                .withComment("column comment")
-                                .columnByExpression("d", "a*(b+2 + a*b)")
-                                .column(
-                                        "e",
-                                        DataTypes.ROW(
-                                                DataTypes.STRING(),
-                                                DataTypes.INT(),
-                                                DataTypes.ROW(
-                                                        DataTypes.DOUBLE(),
-                                                        DataTypes.ARRAY(DataTypes.FLOAT()))))
-                                .columnByExpression("f1", "e.f1 + e.f2.f0")
-                                .columnByMetadata("g", DataTypes.STRING(), null, true)
-                                .column("ts", DataTypes.TIMESTAMP(3))
-                                .withComment("just a comment")
-                                .watermark("ts", "ts - interval '5' seconds")
-                                .primaryKeyNamed("ct1", "a", "b", "c")
-                                .build());
+                        "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT NOT NULL,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
+                                + "  `d` AS [a*(b+2 + a*b)],\n"
+                                + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+                                + "  `f1` AS [e.f1 + e.f2.f0],\n"
+                                + "  `g` METADATA VIRTUAL,\n"
+                                + "  `ts` TIMESTAMP(3) COMMENT 'just a comment',\n"
+                                + "  WATERMARK FOR `ts` AS [ts - interval '5' seconds],\n"
+                                + "  CONSTRAINT `ct1` PRIMARY KEY (`a`, `b`, `c`) NOT ENFORCED\n"
+                                + ")");
 
         // rename column c that is used in a computed column
         assertThatThrownBy(() -> parse("alter table tb1 rename a to a1"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Old column a is referred by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b), "
-                                + "currently doesn't allow to rename column which is referred by computed column.");
+                        "The column `a` is referenced by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b).");
 
         // rename column used in the watermark expression
         assertThatThrownBy(() -> parse("alter table tb1 rename ts to ts1"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Old column ts is referred by watermark expression WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds, "
-                                + "currently doesn't allow to rename column which is referred by watermark expression.");
+                        "The column `ts` is referenced by watermark expression [WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds].");
 
         // rename nested column
         assertThatThrownBy(() -> parse("alter table tb1 rename e.f1 to e.f11"))
@@ -1348,8 +1327,7 @@ public class SqlToOperationConverterTest {
         // rename column with duplicate name
         assertThatThrownBy(() -> parse("alter table tb1 rename c to a"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "New column a already existed in table schema for RENAME COLUMN");
+                .hasMessageContaining("The column `a` already existed in table schema.");
 
         // rename column e test computed column expression is ApiExpression which doesn't implement
         // the equals method
@@ -1374,29 +1352,152 @@ public class SqlToOperationConverterTest {
         assertThatThrownBy(() -> parse("alter table `cat1`.`db1`.`tb2` rename e to e1"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Old column e is referred by computed column `j` STRING AS upper(e), currently doesn't "
-                                + "allow to rename column which is referred by computed column.");
+                        "Failed to execute ALTER TABLE statement.\nThe column `e` is referenced by computed column `j` STRING AS upper(e).");
 
         // rename column used as partition key
         assertThatThrownBy(() -> parse("alter table tb2 rename a to a1"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Can not rename column a because it is used as the partition keys");
+                        "Failed to execute ALTER TABLE statement.\nThe column `a` is used as the partition keys.");
+    }
+
+    @Test
+    public void testFailedToAlterTableDropColumn() throws Exception {
+        prepareTable("tb1", false, false, true, 3);
+
+        // drop a nonexistent column
+        assertThatThrownBy(() -> parse("alter table tb1 drop x"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("The column `x` does not exist in the base table.");
+
+        assertThatThrownBy(() -> parse("alter table tb1 drop (g, x)"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("The column `x` does not exist in the base table.");
+
+        // duplicate column
+        assertThatThrownBy(() -> parse("alter table tb1 drop (g, c, g)"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Duplicate column `g`.");
+
+        // drop a nested column
+        assertThatThrownBy(() -> parse("alter table tb1 drop e.f2"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Alter nested row type e.f2 is not supported yet.");
+
+        // drop a column which generates a computed column
+        assertThatThrownBy(() -> parse("alter table tb1 drop a"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The column `a` is referenced by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b).");
+
+        // drop a column which is pk
+        assertThatThrownBy(() -> parse("alter table tb1 drop c"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("The column `c` is used as the primary key.");
+
+        // drop a column which defines watermark
+        assertThatThrownBy(() -> parse("alter table tb1 drop ts"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The column `ts` is referenced by watermark expression [WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds].");
+    }
+
+    @Test
+    public void testAlterTableDropColumn() throws Exception {
+        prepareNonManagedTable(false);
+        // drop a single column
+        Operation operation = parse("alter table tb1 drop c");
+        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT NOT NULL,\n"
+                                + "  `d` AS [a*(b+2 + a*b)],\n"
+                                + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+                                + "  `f` AS [e.f1 + e.f2.f0],\n"
+                                + "  `g` METADATA VIRTUAL,\n"
+                                + "  `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
+                                + ")");
+
+        // drop computed column and referenced columns together
+        operation = parse("alter table tb1 drop (f, e, b, d)");
+        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
+                                + "  `g` METADATA VIRTUAL,\n"
+                                + "  `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
+                                + ")");
+    }
+
+    @Test
+    public void testFailedToAlterTableDropConstraint() throws Exception {
+        prepareNonManagedTable("tb1", 0);
+        assertThatThrownBy(() -> parse("alter table tb1 drop primary key"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("The base table does not define any primary key.");
+        assertThatThrownBy(() -> parse("alter table tb1 drop constraint ct"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("The base table does not define any primary key.");
+        prepareNonManagedTable("tb2", 1);
+        assertThatThrownBy(() -> parse("alter table tb2 drop constraint ct2"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The base table does not define a primary key constraint named 'ct2'. Available constraint name: ['ct1'].");
     }
 
     @Test
     public void testAlterTableDropConstraint() throws Exception {
         prepareNonManagedTable(true);
-        // Test alter table add enforced
+        String expectedSummaryString =
+                "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+                        + "  `a` INT NOT NULL,\n"
+                        + "  `b` BIGINT NOT NULL,\n"
+                        + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
+                        + "  `d` AS [a*(b+2 + a*b)],\n"
+                        + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+                        + "  `f` AS [e.f1 + e.f2.f0],\n"
+                        + "  `g` METADATA VIRTUAL,\n"
+                        + "  `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
+                        + ")";
+
         Operation operation = parse("alter table tb1 drop constraint ct1");
-        assertThat(operation).isInstanceOf(AlterTableDropConstraintOperation.class);
-        AlterTableDropConstraintOperation dropConstraint =
-                (AlterTableDropConstraintOperation) operation;
-        assertThat(dropConstraint.asSummaryString())
-                .isEqualTo("ALTER TABLE cat1.db1.tb1 DROP CONSTRAINT ct1");
-        assertThatThrownBy(() -> parse("alter table tb1 drop constraint ct2"))
+        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
+
+        operation = parse("alter table tb1 drop primary key");
+        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
+    }
+
+    @Test
+    public void testFailedToAlterTableDropWatermark() throws Exception {
+        prepareNonManagedTable("tb1", false);
+        assertThatThrownBy(() -> parse("alter table tb1 drop watermark"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("CONSTRAINT [ct2] does not exist");
+                .hasMessageContaining("The base table does not define any watermark strategy.");
+    }
+
+    @Test
+    public void testAlterTableDropWatermark() throws Exception {
+        prepareNonManagedTable("tb1", true);
+        Operation operation = parse("alter table tb1 drop watermark");
+        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT NOT NULL,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
+                                + "  `d` AS [a*(b+2 + a*b)],\n"
+                                + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+                                + "  `f` AS [e.f1 + e.f2.f0],\n"
+                                + "  `g` METADATA VIRTUAL,\n"
+                                + "  `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
+                                + ")");
     }
 
     @Test
@@ -1464,50 +1565,38 @@ public class SqlToOperationConverterTest {
         // try to add a column with duplicated name
         assertThatThrownBy(() -> parse("alter table tb1 add a bigint"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Try to add a column `a` which already exists in the table.");
+                .hasMessageContaining("Try to add a column `a` which already exists in the table.");
 
         // try to add multiple columns with duplicated column name
         assertThatThrownBy(() -> parse("alter table tb1 add (x array<string>, x string)"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Encounter duplicate column `x`.");
+                .hasMessageContaining("Encounter duplicate column `x`.");
 
         // refer to a nonexistent column
         assertThatThrownBy(() -> parse("alter table tb1 add x bigint after y"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Referenced column `y` by 'AFTER' does not exist in the table.");
+                        "Referenced column `y` by 'AFTER' does not exist in the table.");
 
         // refer to a new added column that appears in the post position
         assertThatThrownBy(() -> parse("alter table tb1 add (x bigint after y, y string first)"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Referenced column `y` by 'AFTER' does not exist in the table.");
+                        "Referenced column `y` by 'AFTER' does not exist in the table.");
 
         // add a computed column based on nonexistent column
         assertThatThrownBy(() -> parse("alter table tb1 add m as n + 2"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Invalid expression for computed column 'm'.");
+                .hasMessageContaining("Invalid expression for computed column 'm'.");
 
         // add a computed column based on another computed column
         assertThatThrownBy(() -> parse("alter table tb1 add (m as b * 2, n as m + 2)"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Invalid expression for computed column 'n'.");
+                .hasMessageContaining("Invalid expression for computed column 'n'.");
         // invalid expression
         assertThatThrownBy(() -> parse("alter table tb1 add (m as 'hello' || b)"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Invalid expression for computed column 'm'.");
+                .hasMessageContaining("Invalid expression for computed column 'm'.");
 
         // add an inner field to a nested row
         assertThatThrownBy(() -> parse("alter table tb1 add (e.f3 string)"))
@@ -1517,9 +1606,7 @@ public class SqlToOperationConverterTest {
         // refer to a nested inner field
         assertThatThrownBy(() -> parse("alter table tb1 add (x string after e.f2)"))
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Alter nested row type is not supported yet.");
+                .hasMessageContaining("Alter nested row type is not supported yet.");
 
         assertThatThrownBy(() -> parse("alter table tb1 add (e.f3 string after e.f1)"))
                 .isInstanceOf(UnsupportedOperationException.class)
@@ -1647,8 +1734,7 @@ public class SqlToOperationConverterTest {
         assertThatThrownBy(() -> parse("alter table tb1 add primary key(c) not enforced"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "The base table has already defined the primary key constraint [`a`]. "
+                        "The base table has already defined the primary key constraint [`a`]. "
                                 + "You might want to drop it before adding a new one.");
 
         assertThatThrownBy(
@@ -1657,8 +1743,7 @@ public class SqlToOperationConverterTest {
                                         "alter table tb1 add x string not null primary key not enforced"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "The base table has already defined the primary key constraint [`a`]. "
+                        "The base table has already defined the primary key constraint [`a`]. "
                                 + "You might want to drop it before adding a new one");
 
         // the original table has composite pk
@@ -1667,8 +1752,7 @@ public class SqlToOperationConverterTest {
         assertThatThrownBy(() -> parse("alter table tb2 add primary key(c) not enforced"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "The base table has already defined the primary key constraint [`a`, `b`]. "
+                        "The base table has already defined the primary key constraint [`a`, `b`]. "
                                 + "You might want to drop it before adding a new one");
 
         assertThatThrownBy(
@@ -1677,8 +1761,7 @@ public class SqlToOperationConverterTest {
                                         "alter table tb2 add x string not null primary key not enforced"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "The base table has already defined the primary key constraint [`a`, `b`]. "
+                        "The base table has already defined the primary key constraint [`a`, `b`]. "
                                 + "You might want to drop it before adding a new one");
 
         // the original table does not define pk
@@ -1687,8 +1770,7 @@ public class SqlToOperationConverterTest {
         // specify a nonexistent column as pk
         assertThatThrownBy(() -> parse("alter table tb3 add primary key (x) not enforced"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nInvalid primary key 'PK_x'. Column 'x' does not exist.");
+                .hasMessageContaining("Invalid primary key 'PK_x'. Column 'x' does not exist.");
 
         // add unique constraint
         assertThatThrownBy(() -> parse("alter table tb3 add unique(b)"))
@@ -1710,15 +1792,13 @@ public class SqlToOperationConverterTest {
                                                 + "  primary key (d, x) not enforced)"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Invalid primary key 'PK_d_x'. Column 'd' is not a physical column.");
+                        "Invalid primary key 'PK_d_x'. Column 'd' is not a physical column.");
 
         // add a pk which is metadata column
         assertThatThrownBy(() -> parse("alter table tb3 add (primary key (g) not enforced)"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Invalid primary key 'PK_g'. Column 'g' is not a physical column.");
+                        "Invalid primary key 'PK_g'. Column 'g' is not a physical column.");
     }
 
     @Test
@@ -1799,16 +1879,14 @@ public class SqlToOperationConverterTest {
         assertThatThrownBy(() -> parse("alter table tb1 add watermark for x as x"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Invalid column name 'x' for rowtime attribute in watermark declaration. "
+                        "Invalid column name 'x' for rowtime attribute in watermark declaration. "
                                 + "Available columns are: [a, b, c, d, e, f, g, ts]");
 
         // add watermark with invalid type
         assertThatThrownBy(() -> parse("alter table tb1 add watermark for b as b"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Invalid data type of time field for watermark definition. "
+                        "Invalid data type of time field for watermark definition. "
                                 + "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), "
                                 + "the supported precision 'p' is from 0 to 3, but the time field type is BIGINT NOT NULL");
 
@@ -1818,9 +1896,7 @@ public class SqlToOperationConverterTest {
                                 parse(
                                         "alter table tb1 add (x row<f0 string, f1 timestamp(3)>, watermark for x.f1 as x.f1)"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Watermark strategy on nested column is not supported yet.");
+                .hasMessageContaining("Watermark strategy on nested column is not supported yet.");
 
         // add watermark to the table which already has watermark defined
         prepareNonManagedTable("tb2", true);
@@ -1828,8 +1904,7 @@ public class SqlToOperationConverterTest {
         assertThatThrownBy(() -> parse("alter table tb2 add watermark for ts as ts"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "The base table has already defined the watermark strategy "
+                        "The base table has already defined the watermark strategy "
                                 + "`ts` AS ts - interval '5' seconds. "
                                 + "You might want to drop it before adding a new one.");
     }
@@ -1924,47 +1999,42 @@ public class SqlToOperationConverterTest {
         // modify duplicated column same
         assertThatThrownBy(() -> parse("alter table tb1 modify (b int, b array<int not null>)"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nEncounter duplicate column `b`.");
+                .hasMessageContaining("Encounter duplicate column `b`.");
 
         // modify nonexistent column name
         assertThatThrownBy(() -> parse("alter table tb1 modify x bigint"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nTry to modify a column `x` which does not exist in the table.");
+                        "Try to modify a column `x` which does not exist in the table.");
 
         // refer to nonexistent column name
         assertThatThrownBy(() -> parse("alter table tb1 modify a bigint after x"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nReferenced column `x` by 'AFTER' does not exist in the table.");
+                        "Referenced column `x` by 'AFTER' does not exist in the table.");
 
         // modify physical columns which generates computed column
         assertThatThrownBy(() -> parse("alter table tb1 modify e array<int>"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nInvalid expression for computed column 'f'.");
+                .hasMessageContaining("Invalid expression for computed column 'f'.");
 
         assertThatThrownBy(() -> parse("alter table tb1 modify a string"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nInvalid expression for computed column 'd'.");
+                .hasMessageContaining("Invalid expression for computed column 'd'.");
 
         assertThatThrownBy(() -> parse("alter table tb1 modify b as a + 2"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nInvalid expression for computed column 'd'.");
+                .hasMessageContaining("Invalid expression for computed column 'd'.");
 
         assertThatThrownBy(() -> parse("alter table tb1 modify (a timestamp(3), b multiset<int>)"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nInvalid expression for computed column 'd'.");
+                .hasMessageContaining("Invalid expression for computed column 'd'.");
 
         // modify the rowtime field which defines watermark
         assertThatThrownBy(() -> parse("alter table tb1 modify ts int"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nInvalid data type of time field for watermark definition. "
+                        "Invalid data type of time field for watermark definition. "
                                 + "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), "
                                 + "the supported precision 'p' is from 0 to 3, but the time field type is INT");
 
@@ -1974,12 +2044,12 @@ public class SqlToOperationConverterTest {
         assertThatThrownBy(() -> parse("alter table tb2 modify (d int, a as b + 2)"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nInvalid primary key 'ct1'. Column 'a' is not a physical column.");
+                        "Invalid primary key 'ct1'. Column 'a' is not a physical column.");
 
         assertThatThrownBy(() -> parse("alter table tb2 modify (d string, a int metadata virtual)"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nInvalid primary key 'ct1'. Column 'a' is not a physical column.");
+                        "Invalid primary key 'ct1'. Column 'a' is not a physical column.");
 
         // modify an inner field to a nested row
         assertThatThrownBy(() -> parse("alter table tb2 modify (e.f0 string)"))
@@ -1989,9 +2059,7 @@ public class SqlToOperationConverterTest {
         // refer to a nested inner field
         assertThatThrownBy(() -> parse("alter table tb2 modify (g string after e.f2)"))
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Alter nested row type is not supported yet.");
+                .hasMessageContaining("Alter nested row type is not supported yet.");
 
         assertThatThrownBy(() -> parse("alter table tb2 modify (e.f0 string after e.f1)"))
                 .isInstanceOf(UnsupportedOperationException.class)
@@ -2132,8 +2200,7 @@ public class SqlToOperationConverterTest {
                                         "alter table tb1 modify constraint ct primary key (b) not enforced"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "The base table does not define any primary key constraint. You might want to add a new one.");
+                        "The base table does not define any primary key constraint. You might want to add a new one.");
 
         prepareNonManagedTable("tb2", 1);
 
@@ -2143,9 +2210,7 @@ public class SqlToOperationConverterTest {
                                 parse(
                                         "alter table tb2 modify constraint ct primary key (x) not enforced"))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Invalid primary key 'ct'. Column 'x' does not exist.");
+                .hasMessageContaining("Invalid primary key 'ct'. Column 'x' does not exist.");
 
         // specify computed column as pk
         assertThatThrownBy(
@@ -2154,8 +2219,7 @@ public class SqlToOperationConverterTest {
                                         "alter table tb2 modify constraint ct primary key (d) not enforced"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Invalid primary key 'ct'. Column 'd' is not a physical column.");
+                        "Invalid primary key 'ct'. Column 'd' is not a physical column.");
 
         // specify metadata column as pk
         assertThatThrownBy(
@@ -2164,8 +2228,7 @@ public class SqlToOperationConverterTest {
                                         "alter table tb2 modify constraint ct primary key (g) not enforced"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Invalid primary key 'ct'. Column 'g' is not a physical column.");
+                        "Invalid primary key 'ct'. Column 'g' is not a physical column.");
     }
 
     @Test
@@ -2226,7 +2289,7 @@ public class SqlToOperationConverterTest {
                                         "alter table tb1 modify watermark for a as to_timestamp(a) - interval '1' minute"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nThe base table does not define any watermark. You might want to add a new one.");
+                        "The base table does not define any watermark. You might want to add a new one.");
 
         prepareNonManagedTable("tb2", true);
 
@@ -2234,7 +2297,7 @@ public class SqlToOperationConverterTest {
         assertThatThrownBy(() -> parse("alter table tb2 modify watermark for a as a"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nInvalid data type of time field for watermark definition. "
+                        "Invalid data type of time field for watermark definition. "
                                 + "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, "
                                 + "but the time field type is INT NOT NULL");
 
@@ -2244,7 +2307,7 @@ public class SqlToOperationConverterTest {
                                         "alter table tb2 modify watermark for c as to_timestamp(c) - interval '1' day"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\nInvalid data type of time field for watermark definition. "
+                        "Invalid data type of time field for watermark definition. "
                                 + "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, "
                                 + "but the time field type is STRING");
     }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index a78bb5a07cc..0b2c47ce5ec 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -818,6 +818,106 @@ class TableEnvironmentTest {
       tableResult.collect())
   }
 
+  @Test
+  def testAlterTableDropColumn(): Unit = {
+    val statement =
+      """
+        |CREATE TABLE MyTable (
+        |  a BIGINT,
+        |  b INT,
+        |  d TIMESTAMP(3),
+        |  e ROW<e0 STRING, e1 TIMESTAMP(3)>,
+        |  WATERMARK FOR d AS d - INTERVAL '1' MINUTE
+        |) WITH (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(statement)
+
+    tableEnv.executeSql("ALTER TABLE MyTable DROP (e, a)")
+
+    val expectedResult = util.Arrays.asList(
+      Row.of("b", "INT", Boolean.box(true), null, null, null),
+      Row.of(
+        "d",
+        "TIMESTAMP(3) *ROWTIME*",
+        Boolean.box(true),
+        null,
+        null,
+        "`d` - INTERVAL '1' MINUTE")
+    )
+    val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+    checkData(expectedResult.iterator(), tableResult.collect())
+  }
+
+  @Test
+  def testAlterTableDropConstraint(): Unit = {
+    val statement =
+      """
+        |CREATE TABLE MyTable (
+        |  a BIGINT,
+        |  b INT,
+        |  d TIMESTAMP(3),
+        |  e ROW<e0 STRING, e1 TIMESTAMP(3)>,
+        |  CONSTRAINT ct PRIMARY KEY(a, b) NOT ENFORCED
+        |) WITH (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(statement)
+
+    tableEnv.executeSql("ALTER TABLE MyTable DROP CONSTRAINT ct")
+
+    val expectedResult = util.Arrays.asList(
+      Row.of("a", "BIGINT", Boolean.box(false), null, null, null),
+      Row.of("b", "INT", Boolean.box(false), null, null, null),
+      Row.of("d", "TIMESTAMP(3)", Boolean.box(true), null, null, null),
+      Row.of("e", "ROW<`e0` STRING, `e1` TIMESTAMP(3)>", Boolean.box(true), null, null, null)
+    )
+    val tableResult1 = tableEnv.executeSql("DESCRIBE MyTable")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult1.getResultKind)
+    checkData(expectedResult.iterator(), tableResult1.collect())
+
+    tableEnv.executeSql("ALTER TABLE MyTable ADD CONSTRAINT ct PRIMARY KEY(a) NOT ENFORCED")
+    tableEnv.executeSql("ALTER TABLE MyTable DROP PRIMARY KEY")
+    val tableResult2 = tableEnv.executeSql("DESCRIBE MyTable")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    checkData(expectedResult.iterator(), tableResult2.collect())
+  }
+
+  @Test
+  def testAlterTableDropWatermark(): Unit = {
+    val statement =
+      """
+        |CREATE TABLE MyTable (
+        |  a BIGINT,
+        |  b INT,
+        |  d TIMESTAMP(3),
+        |  e ROW<e0 STRING, e1 TIMESTAMP(3)>,
+        |  WATERMARK FOR d AS d - INTERVAL '1' MINUTE
+        |) WITH (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(statement)
+
+    tableEnv.executeSql("ALTER TABLE MyTable DROP WATERMARK")
+
+    val expectedResult = util.Arrays.asList(
+      Row.of("a", "BIGINT", Boolean.box(true), null, null, null),
+      Row.of("b", "INT", Boolean.box(true), null, null, null),
+      Row.of("d", "TIMESTAMP(3)", Boolean.box(true), null, null, null),
+      Row.of("e", "ROW<`e0` STRING, `e1` TIMESTAMP(3)>", Boolean.box(true), null, null, null)
+    )
+    val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+    checkData(expectedResult.iterator(), tableResult.collect())
+  }
+
   @Test
   def testAlterTableCompactOnNonManagedTable(): Unit = {
     val statement =