You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/12/29 02:05:38 UTC

[flink] 01/01: [FLINK-22318][table] Support RENAME column name for ALTER TABLE statement, optimize the code implementation

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 94fba321d4e29f996c8234c9dcb3b7a93ad8a593
Author: fengli <ld...@163.com>
AuthorDate: Fri Apr 1 20:50:56 2022 +0800

    [FLINK-22318][table] Support RENAME column name for ALTER TABLE statement, optimize the code implementation
    
    This closes #19329
---
 .../src/test/resources/sql/table.q                 |   36 +-
 .../src/main/codegen/includes/parserImpls.ftl      |   11 +-
 .../sql/parser/ddl/SqlAlterTableRenameColumn.java  |   53 +-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |   32 +-
 .../operations/SqlToOperationConverter.java        |    0
 .../operations/SqlToOperationConverterTest.java    | 1548 --------------------
 .../planner/expressions/ColumnReferenceFinder.java |  108 ++
 .../planner/operations/AlterSchemaConverter.java   |  177 ++-
 .../operations/SqlToOperationConverter.java        |   15 +
 .../planner/utils/OperationConverterUtils.java     |  125 +-
 .../table/planner/plan/utils/FlinkRexUtil.scala    |    2 +-
 .../operations/SqlToOperationConverterTest.java    |  180 ++-
 .../flink/table/api/TableEnvironmentTest.scala     |   24 +-
 13 files changed, 554 insertions(+), 1757 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index 61d944f7b29..36809085117 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -395,6 +395,30 @@ alter table orders2 reset ();
 org.apache.flink.table.api.ValidationException: ALTER TABLE RESET does not support empty key
 !error
 
+# ==========================================================================
+# test alter table rename column
+# ==========================================================================
+
+alter table orders2 rename amount to amount1;
+[INFO] Execute statement succeed.
+!info
+
+# verify table options using SHOW CREATE TABLE
+show create table orders2;
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+  `user` BIGINT NOT NULL,
+  `product` VARCHAR(32),
+  `amount1` INT,
+  `ts` TIMESTAMP(3),
+  `ptime` AS PROCTIME(),
+  WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
+  CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED
+) WITH (
+  'connector' = 'datagen'
+)
+
+!ok
+
 # ==========================================================================
 # test alter table add schema
 # ==========================================================================
@@ -424,7 +448,7 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` (
   `user` BIGINT NOT NULL,
   `product_id` BIGINT NOT NULL,
   `product` VARCHAR(32),
-  `amount` INT,
+  `amount1` INT,
   `ts` TIMESTAMP(3),
   `ptime` AS PROCTIME(),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
@@ -449,7 +473,7 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` (
   `product_id` BIGINT NOT NULL,
   `product` VARCHAR(32),
   `cleaned_product` AS COALESCE(`product`, 'missing_sku'),
-  `amount` INT,
+  `amount1` INT,
   `ts` TIMESTAMP(3),
   `ptime` AS PROCTIME(),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
@@ -477,7 +501,7 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` (
   `product_id` BIGINT NOT NULL,
   `product` VARCHAR(32),
   `cleaned_product` AS COALESCE(`product`, 'missing_sku'),
-  `amount` INT,
+  `amount1` INT,
   `ts` TIMESTAMP(3),
   `ptime` AS PROCTIME(),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
@@ -503,7 +527,7 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` (
   `product_id` BIGINT NOT NULL,
   `product` VARCHAR(32),
   `cleaned_product` AS COALESCE(`product`, 'missing_sku'),
-  `amount` INT,
+  `amount1` INT,
   `ptime` AS PROCTIME(),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE,
   CONSTRAINT `order_constraint` PRIMARY KEY (`trade_order_id`) NOT ENFORCED
@@ -530,7 +554,7 @@ describe orders2;
 |      product_id |                      BIGINT | FALSE |                     |                                       |                            |
 |         product |                 VARCHAR(32) |  TRUE |                     |                                       |                            |
 | cleaned_product |                 VARCHAR(32) | FALSE |                     | AS COALESCE(`product`, 'missing_sku') |                            |
-|          amount |                         INT |  TRUE |                     |                                       |                            |
+|         amount1 |                         INT |  TRUE |                     |                                       |                            |
 |           ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |                     |                         AS PROCTIME() |                            |
 +-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
 9 rows in set
@@ -548,7 +572,7 @@ desc orders2;
 |      product_id |                      BIGINT | FALSE |                     |                                       |                            |
 |         product |                 VARCHAR(32) |  TRUE |                     |                                       |                            |
 | cleaned_product |                 VARCHAR(32) | FALSE |                     | AS COALESCE(`product`, 'missing_sku') |                            |
-|          amount |                         INT |  TRUE |                     |                                       |                            |
+|         amount1 |                         INT |  TRUE |                     |                                       |                            |
 |           ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |                     |                         AS PROCTIME() |                            |
 +-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
 9 rows in set
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index d03b987b9b1..57424abc495 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -595,8 +595,8 @@ SqlAlterTable SqlAlterTable() :
     SqlNodeList partitionSpec = null;
     SqlIdentifier constraintName;
     SqlTableConstraint constraint;
-    SqlIdentifier originColumnName;
-    SqlIdentifier newColumnName;
+    SqlIdentifier originColumnIdentifier;
+    SqlIdentifier newColumnIdentifier;
     AlterTableContext ctx = new AlterTableContext();
 }
 {
@@ -614,14 +614,15 @@ SqlAlterTable SqlAlterTable() :
         }
     |
         <RENAME>
-            originColumnName = SimpleIdentifier()
+            originColumnIdentifier = CompoundIdentifier()
         <TO>
-            newColumnName = SimpleIdentifier()
+            newColumnIdentifier = CompoundIdentifier()
         {
             return new SqlAlterTableRenameColumn(
                     startPos.plus(getPos()),
                     tableIdentifier,
-                    originColumnName,newColumnName);
+                    originColumnIdentifier,
+                    newColumnIdentifier);
         }
     |
         <RESET>
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
index c75e2ae1280..28196183ff7 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.sql.parser.ddl;
 
 import org.apache.calcite.sql.SqlIdentifier;
@@ -9,34 +27,35 @@ import org.apache.calcite.util.ImmutableNullableList;
 import java.util.List;
 
 /**
- * ALTER TABLE [[catalogName.] dataBasesName].tableName
- * RENAME  originColumnName TO newColumnName
+ * ALTER TABLE [[catalogName.] dataBasesName].tableName RENAME originColumnName TO newColumnName.
  */
 public class SqlAlterTableRenameColumn extends SqlAlterTable {
 
-    private final SqlIdentifier originColumnNameIdentifier;
-    private final SqlIdentifier newColumnNameIdentifier;
+    private final SqlIdentifier originColumnIdentifier;
+    private final SqlIdentifier newColumnIdentifier;
 
-    public SqlAlterTableRenameColumn(SqlParserPos pos,
-                                     SqlIdentifier tableName,
-                                     SqlIdentifier originColumnName,
-                                     SqlIdentifier newColumnName) {
+    public SqlAlterTableRenameColumn(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlIdentifier originColumnIdentifier,
+            SqlIdentifier newColumnIdentifier) {
         super(pos, tableName, null);
-        this.originColumnNameIdentifier = originColumnName;
-        this.newColumnNameIdentifier = newColumnName;
+        this.originColumnIdentifier = originColumnIdentifier;
+        this.newColumnIdentifier = newColumnIdentifier;
     }
 
     @Override
     public List<SqlNode> getOperandList() {
-        return ImmutableNullableList.of(tableIdentifier, originColumnNameIdentifier, newColumnNameIdentifier);
+        return ImmutableNullableList.of(
+                tableIdentifier, originColumnIdentifier, newColumnIdentifier);
     }
 
-    public SqlIdentifier getOriginColumnNameIdentifier() {
-        return originColumnNameIdentifier;
+    public SqlIdentifier getOriginColumnIdentifier() {
+        return originColumnIdentifier;
     }
 
-    public SqlIdentifier getNewColumnNameIdentifier() {
-        return newColumnNameIdentifier;
+    public SqlIdentifier getNewColumnIdentifier() {
+        return newColumnIdentifier;
     }
 
     @Override
@@ -44,8 +63,8 @@ public class SqlAlterTableRenameColumn extends SqlAlterTable {
         writer.keyword("ALTER TABLE");
         tableIdentifier.unparse(writer, leftPrec, rightPrec);
         writer.keyword("RENAME");
-        originColumnNameIdentifier.unparse(writer, leftPrec, rightPrec);
+        originColumnIdentifier.unparse(writer, leftPrec, rightPrec);
         writer.keyword("TO");
-        newColumnNameIdentifier.unparse(writer, leftPrec, rightPrec);
+        newColumnIdentifier.unparse(writer, leftPrec, rightPrec);
     }
 }
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 065743bcbad..fea81a9fe2d 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -315,26 +315,18 @@ class FlinkSqlParserImplTest extends SqlParserTest {
     void testAlterTable() {
         sql("alter table t1 rename to t2").ok("ALTER TABLE `T1` RENAME TO `T2`");
         sql("alter table c1.d1.t1 rename to t2").ok("ALTER TABLE `C1`.`D1`.`T1` RENAME TO `T2`");
-        final String sql0 = "alter table t1 set ('key1'='value1')";
-        final String expected0 = "ALTER TABLE `T1` SET (\n" + "  'key1' = 'value1'\n" + ")";
-        sql(sql0).ok(expected0);
-        final String sql1 = "alter table t1 " + "add constraint ct1 primary key(a, b) not enforced";
-        final String expected1 =
-                "ALTER TABLE `T1` ADD (\n"
-                        + "  CONSTRAINT `CT1` PRIMARY KEY (`A`, `B`) NOT ENFORCED\n"
-                        + ")";
-        sql(sql1).ok(expected1);
-        final String sql2 = "alter table t1 " + "add unique(a, b)";
-        final String expected2 = "ALTER TABLE `T1` ADD (\n" + "  UNIQUE (`A`, `B`)\n" + ")";
-        sql(sql2).ok(expected2);
-        final String sql3 = "alter table t1 drop constraint ct1";
-        final String expected3 = "ALTER TABLE `T1` DROP CONSTRAINT `CT1`";
-        sql(sql3).ok(expected3);
-
-        final String sql4 = "alter table t1 rename a to b";
-        final String expected4 = "ALTER TABLE `T1` RENAME `A` TO `B`";
-        sql(sql4).ok(expected4);
-
+        sql("alter table t1 set ('key1'='value1')")
+                .ok("ALTER TABLE `T1` SET (\n" + "  'key1' = 'value1'\n" + ")");
+        sql("alter table t1 add constraint ct1 primary key(a, b) not enforced")
+                .ok(
+                        "ALTER TABLE `T1` ADD (\n"
+                                + "  CONSTRAINT `CT1` PRIMARY KEY (`A`, `B`) NOT ENFORCED\n"
+                                + ")");
+        sql("alter table t1 " + "add unique(a, b)")
+                .ok("ALTER TABLE `T1` ADD (\n" + "  UNIQUE (`A`, `B`)\n" + ")");
+        sql("alter table t1 drop constraint ct1").ok("ALTER TABLE `T1` DROP CONSTRAINT `CT1`");
+        sql("alter table t1 rename a to b").ok("ALTER TABLE `T1` RENAME `A` TO `B`");
+        sql("alter table t1 rename a.x to a.y").ok("ALTER TABLE `T1` RENAME `A`.`X` TO `A`.`Y`");
     }
 
     @Test
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
deleted file mode 100644
index e69de29bb2d..00000000000
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
deleted file mode 100644
index 1ccd855972b..00000000000
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ /dev/null
@@ -1,1548 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.operations;
-
-import org.apache.flink.sql.parser.ddl.SqlCreateTable;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.SqlDialect;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableColumn.ComputedColumn;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.config.TableConfigOptions;
-import org.apache.flink.table.api.constraints.UniqueConstraint;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogFunctionImpl;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.expressions.SqlCallExpression;
-import org.apache.flink.table.module.ModuleManager;
-import org.apache.flink.table.operations.BeginStatementSetOperation;
-import org.apache.flink.table.operations.CatalogSinkModifyOperation;
-import org.apache.flink.table.operations.EndStatementSetOperation;
-import org.apache.flink.table.operations.LoadModuleOperation;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.ShowFunctionsOperation;
-import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope;
-import org.apache.flink.table.operations.ShowModulesOperation;
-import org.apache.flink.table.operations.UnloadModuleOperation;
-import org.apache.flink.table.operations.UseCatalogOperation;
-import org.apache.flink.table.operations.UseDatabaseOperation;
-import org.apache.flink.table.operations.UseModulesOperation;
-import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
-import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
-import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
-import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
-import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
-import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.operations.ddl.CreateViewOperation;
-import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
-import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
-import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
-import org.apache.flink.table.planner.delegation.ParserImpl;
-import org.apache.flink.table.planner.delegation.PlannerContext;
-import org.apache.flink.table.planner.expressions.utils.Func0$;
-import org.apache.flink.table.planner.expressions.utils.Func1$;
-import org.apache.flink.table.planner.expressions.utils.Func8$;
-import org.apache.flink.table.planner.parse.CalciteParser;
-import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.utils.CatalogManagerMocks;
-import org.apache.flink.table.utils.ExpressionResolverMocks;
-
-import org.apache.calcite.sql.SqlNode;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import javax.annotation.Nullable;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
-import static org.apache.flink.table.planner.utils.OperationMatchers.entry;
-import static org.apache.flink.table.planner.utils.OperationMatchers.isCreateTableOperation;
-import static org.apache.flink.table.planner.utils.OperationMatchers.partitionedBy;
-import static org.apache.flink.table.planner.utils.OperationMatchers.withOptions;
-import static org.apache.flink.table.planner.utils.OperationMatchers.withSchema;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-/** Test cases for {@link SqlToOperationConverter}. */
-public class SqlToOperationConverterTest {
-    private final boolean isStreamingMode = false;
-    private final TableConfig tableConfig = new TableConfig();
-    private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default");
-    private final CatalogManager catalogManager =
-            CatalogManagerMocks.preparedCatalogManager().defaultCatalog("builtin", catalog).build();
-    private final ModuleManager moduleManager = new ModuleManager();
-    private final FunctionCatalog functionCatalog =
-            new FunctionCatalog(tableConfig, catalogManager, moduleManager);
-    private final Supplier<FlinkPlannerImpl> plannerSupplier =
-            () ->
-                    getPlannerContext()
-                            .createFlinkPlanner(
-                                    catalogManager.getCurrentCatalog(),
-                                    catalogManager.getCurrentDatabase());
-    private final Parser parser =
-            new ParserImpl(
-                    catalogManager,
-                    plannerSupplier,
-                    () -> plannerSupplier.get().parser(),
-                    t ->
-                            getPlannerContext()
-                                    .createSqlExprToRexConverter(
-                                            getPlannerContext()
-                                                    .getTypeFactory()
-                                                    .buildRelNodeRowType(t)));
-    private final PlannerContext plannerContext =
-            new PlannerContext(
-                    tableConfig,
-                    functionCatalog,
-                    catalogManager,
-                    asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
-                    Collections.emptyList());
-
-    private PlannerContext getPlannerContext() {
-        return plannerContext;
-    }
-
-    @Rule public ExpectedException thrown = ExpectedException.none();
-
-    @Before
-    public void before() throws TableAlreadyExistException, DatabaseNotExistException {
-        catalogManager.initSchemaResolver(
-                isStreamingMode,
-                ExpressionResolverMocks.basicResolver(catalogManager, functionCatalog, parser));
-
-        final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
-        final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
-        final TableSchema tableSchema =
-                TableSchema.builder()
-                        .field("a", DataTypes.BIGINT())
-                        .field("b", DataTypes.VARCHAR(Integer.MAX_VALUE))
-                        .field("c", DataTypes.INT())
-                        .field("d", DataTypes.VARCHAR(Integer.MAX_VALUE))
-                        .build();
-        Map<String, String> options = new HashMap<>();
-        options.put("connector", "COLLECTION");
-        final CatalogTable catalogTable = new CatalogTableImpl(tableSchema, options, "");
-        catalog.createTable(path1, catalogTable, true);
-        catalog.createTable(path2, catalogTable, true);
-    }
-
-    @After
-    public void after() throws TableNotExistException {
-        final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
-        final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
-        catalog.dropTable(path1, true);
-        catalog.dropTable(path2, true);
-    }
-
-    @Test
-    public void testUseCatalog() {
-        final String sql = "USE CATALOG cat1";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof UseCatalogOperation;
-        assertEquals("cat1", ((UseCatalogOperation) operation).getCatalogName());
-    }
-
-    @Test
-    public void testUseDatabase() {
-        final String sql1 = "USE db1";
-        Operation operation1 = parse(sql1, SqlDialect.DEFAULT);
-        assert operation1 instanceof UseDatabaseOperation;
-        assertEquals("builtin", ((UseDatabaseOperation) operation1).getCatalogName());
-        assertEquals("db1", ((UseDatabaseOperation) operation1).getDatabaseName());
-
-        final String sql2 = "USE cat1.db1";
-        Operation operation2 = parse(sql2, SqlDialect.DEFAULT);
-        assert operation2 instanceof UseDatabaseOperation;
-        assertEquals("cat1", ((UseDatabaseOperation) operation2).getCatalogName());
-        assertEquals("db1", ((UseDatabaseOperation) operation2).getDatabaseName());
-    }
-
-    @Test(expected = ValidationException.class)
-    public void testUseDatabaseWithException() {
-        final String sql = "USE cat1.db1.tbl1";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-    }
-
-    @Test
-    public void testCreateDatabase() {
-        final String[] createDatabaseSqls =
-                new String[] {
-                    "create database db1",
-                    "create database if not exists cat1.db1",
-                    "create database cat1.db1 comment 'db1_comment'",
-                    "create database cat1.db1 comment 'db1_comment' with ('k1' = 'v1', 'K2' = 'V2')"
-                };
-        final String[] expectedCatalogs = new String[] {"builtin", "cat1", "cat1", "cat1"};
-        final String expectedDatabase = "db1";
-        final String[] expectedComments = new String[] {null, null, "db1_comment", "db1_comment"};
-        final boolean[] expectedIgnoreIfExists = new boolean[] {false, true, false, false};
-        Map<String, String> properties = new HashMap<>();
-        properties.put("k1", "v1");
-        properties.put("K2", "V2");
-        final Map[] expectedProperties =
-                new Map[] {
-                    new HashMap<String, String>(),
-                    new HashMap<String, String>(),
-                    new HashMap<String, String>(),
-                    new HashMap(properties)
-                };
-
-        for (int i = 0; i < createDatabaseSqls.length; i++) {
-            Operation operation = parse(createDatabaseSqls[i], SqlDialect.DEFAULT);
-            assert operation instanceof CreateDatabaseOperation;
-            final CreateDatabaseOperation createDatabaseOperation =
-                    (CreateDatabaseOperation) operation;
-            assertEquals(expectedCatalogs[i], createDatabaseOperation.getCatalogName());
-            assertEquals(expectedDatabase, createDatabaseOperation.getDatabaseName());
-            assertEquals(
-                    expectedComments[i], createDatabaseOperation.getCatalogDatabase().getComment());
-            assertEquals(expectedIgnoreIfExists[i], createDatabaseOperation.isIgnoreIfExists());
-            assertEquals(
-                    expectedProperties[i],
-                    createDatabaseOperation.getCatalogDatabase().getProperties());
-        }
-    }
-
-    @Test
-    public void testDropDatabase() {
-        final String[] dropDatabaseSqls =
-                new String[] {
-                    "drop database db1",
-                    "drop database if exists db1",
-                    "drop database if exists cat1.db1 CASCADE",
-                    "drop database if exists cat1.db1 RESTRICT"
-                };
-        final String[] expectedCatalogs = new String[] {"builtin", "builtin", "cat1", "cat1"};
-        final String expectedDatabase = "db1";
-        final boolean[] expectedIfExists = new boolean[] {false, true, true, true};
-        final boolean[] expectedIsCascades = new boolean[] {false, false, true, false};
-
-        for (int i = 0; i < dropDatabaseSqls.length; i++) {
-            Operation operation = parse(dropDatabaseSqls[i], SqlDialect.DEFAULT);
-            assert operation instanceof DropDatabaseOperation;
-            final DropDatabaseOperation dropDatabaseOperation = (DropDatabaseOperation) operation;
-            assertEquals(expectedCatalogs[i], dropDatabaseOperation.getCatalogName());
-            assertEquals(expectedDatabase, dropDatabaseOperation.getDatabaseName());
-            assertEquals(expectedIfExists[i], dropDatabaseOperation.isIfExists());
-            assertEquals(expectedIsCascades[i], dropDatabaseOperation.isCascade());
-        }
-    }
-
-    @Test
-    public void testAlterDatabase() throws Exception {
-        catalogManager.registerCatalog("cat1", new GenericInMemoryCatalog("default", "default"));
-        catalogManager
-                .getCatalog("cat1")
-                .get()
-                .createDatabase(
-                        "db1", new CatalogDatabaseImpl(new HashMap<>(), "db1_comment"), true);
-        final String sql = "alter database cat1.db1 set ('k1'='v1', 'K2'='V2')";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof AlterDatabaseOperation;
-        Map<String, String> properties = new HashMap<>();
-        properties.put("k1", "v1");
-        properties.put("K2", "V2");
-        assertEquals("db1", ((AlterDatabaseOperation) operation).getDatabaseName());
-        assertEquals("cat1", ((AlterDatabaseOperation) operation).getCatalogName());
-        assertEquals(
-                "db1_comment",
-                ((AlterDatabaseOperation) operation).getCatalogDatabase().getComment());
-        assertEquals(
-                properties,
-                ((AlterDatabaseOperation) operation).getCatalogDatabase().getProperties());
-    }
-
-    @Test
-    public void testLoadModule() {
-        final String sql = "LOAD MODULE dummy WITH ('k1' = 'v1', 'k2' = 'v2')";
-        final String expectedModuleName = "dummy";
-        final Map<String, String> expectedProperties = new HashMap<>();
-        expectedProperties.put("k1", "v1");
-        expectedProperties.put("k2", "v2");
-
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof LoadModuleOperation;
-        final LoadModuleOperation loadModuleOperation = (LoadModuleOperation) operation;
-
-        assertEquals(expectedModuleName, loadModuleOperation.getModuleName());
-        assertEquals(expectedProperties, loadModuleOperation.getProperties());
-    }
-
-    @Test
-    public void testUnloadModule() {
-        final String sql = "UNLOAD MODULE dummy";
-        final String expectedModuleName = "dummy";
-
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof UnloadModuleOperation;
-        final UnloadModuleOperation unloadModuleOperation = (UnloadModuleOperation) operation;
-
-        assertEquals(expectedModuleName, unloadModuleOperation.getModuleName());
-    }
-
-    @Test
-    public void testUseOneModule() {
-        final String sql = "USE MODULES dummy";
-        final List<String> expectedModuleNames = Collections.singletonList("dummy");
-
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof UseModulesOperation;
-        final UseModulesOperation useModulesOperation = (UseModulesOperation) operation;
-
-        assertEquals(expectedModuleNames, useModulesOperation.getModuleNames());
-        assertEquals("USE MODULES: [dummy]", useModulesOperation.asSummaryString());
-    }
-
-    @Test
-    public void testUseMultipleModules() {
-        final String sql = "USE MODULES x, y, z";
-        final List<String> expectedModuleNames = Arrays.asList("x", "y", "z");
-
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof UseModulesOperation;
-        final UseModulesOperation useModulesOperation = (UseModulesOperation) operation;
-
-        assertEquals(expectedModuleNames, useModulesOperation.getModuleNames());
-        assertEquals("USE MODULES: [x, y, z]", useModulesOperation.asSummaryString());
-    }
-
-    @Test
-    public void testShowModules() {
-        final String sql = "SHOW MODULES";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof ShowModulesOperation;
-        final ShowModulesOperation showModulesOperation = (ShowModulesOperation) operation;
-
-        assertFalse(showModulesOperation.requireFull());
-        assertEquals("SHOW MODULES", showModulesOperation.asSummaryString());
-    }
-
-    @Test
-    public void testShowFullModules() {
-        final String sql = "SHOW FULL MODULES";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof ShowModulesOperation;
-        final ShowModulesOperation showModulesOperation = (ShowModulesOperation) operation;
-
-        assertTrue(showModulesOperation.requireFull());
-        assertEquals("SHOW FULL MODULES", showModulesOperation.asSummaryString());
-    }
-
-    @Test
-    public void testShowFunctions() {
-        final String sql1 = "SHOW FUNCTIONS";
-        assertShowFunctions(sql1, sql1, FunctionScope.ALL);
-
-        final String sql2 = "SHOW USER FUNCTIONS";
-        assertShowFunctions(sql2, sql2, FunctionScope.USER);
-    }
-
-    @Test
-    public void testCreateTable() {
-        final String sql =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a bigint,\n"
-                        + "  b varchar, \n"
-                        + "  c int, \n"
-                        + "  d varchar"
-                        + ")\n"
-                        + "  PARTITIONED BY (a, d)\n"
-                        + "  with (\n"
-                        + "    'connector' = 'kafka', \n"
-                        + "    'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assert operation instanceof CreateTableOperation;
-        CreateTableOperation op = (CreateTableOperation) operation;
-        CatalogTable catalogTable = op.getCatalogTable();
-        assertEquals(Arrays.asList("a", "d"), catalogTable.getPartitionKeys());
-        assertArrayEquals(
-                catalogTable.getSchema().getFieldNames(), new String[] {"a", "b", "c", "d"});
-        assertArrayEquals(
-                catalogTable.getSchema().getFieldDataTypes(),
-                new DataType[] {
-                    DataTypes.BIGINT(),
-                    DataTypes.VARCHAR(Integer.MAX_VALUE),
-                    DataTypes.INT(),
-                    DataTypes.VARCHAR(Integer.MAX_VALUE)
-                });
-    }
-
-    @Test
-    public void testCreateTableWithPrimaryKey() {
-        final String sql =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a bigint,\n"
-                        + "  b varchar, \n"
-                        + "  c int, \n"
-                        + "  d varchar, \n"
-                        + "  constraint ct1 primary key(a, b) not enforced\n"
-                        + ") with (\n"
-                        + "  'connector' = 'kafka', \n"
-                        + "  'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assert operation instanceof CreateTableOperation;
-        CreateTableOperation op = (CreateTableOperation) operation;
-        CatalogTable catalogTable = op.getCatalogTable();
-        TableSchema tableSchema = catalogTable.getSchema();
-        assertThat(
-                tableSchema
-                        .getPrimaryKey()
-                        .map(UniqueConstraint::asSummaryString)
-                        .orElse("fakeVal"),
-                is("CONSTRAINT ct1 PRIMARY KEY (a, b)"));
-        assertArrayEquals(new String[] {"a", "b", "c", "d"}, tableSchema.getFieldNames());
-        assertArrayEquals(
-                new DataType[] {
-                    DataTypes.BIGINT().notNull(),
-                    DataTypes.STRING().notNull(),
-                    DataTypes.INT(),
-                    DataTypes.STRING()
-                },
-                tableSchema.getFieldDataTypes());
-    }
-
-    @Test
-    public void testCreateTableWithPrimaryKeyEnforced() {
-        final String sql =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a bigint,\n"
-                        + "  b varchar, \n"
-                        + "  c int, \n"
-                        + "  d varchar, \n"
-                        +
-                        // Default is enforced.
-                        "  constraint ct1 primary key(a, b)\n"
-                        + ") with (\n"
-                        + "  'connector' = 'kafka', \n"
-                        + "  'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Flink doesn't support ENFORCED mode for PRIMARY KEY "
-                        + "constaint. ENFORCED/NOT ENFORCED  controls if the constraint "
-                        + "checks are performed on the incoming/outgoing data. "
-                        + "Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode");
-        parse(sql, planner, parser);
-    }
-
-    @Test
-    public void testCreateTableWithUniqueKey() {
-        final String sql =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a bigint,\n"
-                        + "  b varchar, \n"
-                        + "  c int, \n"
-                        + "  d varchar, \n"
-                        + "  constraint ct1 unique (a, b) not enforced\n"
-                        + ") with (\n"
-                        + "  'connector' = 'kafka', \n"
-                        + "  'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        thrown.expect(UnsupportedOperationException.class);
-        thrown.expectMessage("UNIQUE constraint is not supported yet");
-        parse(sql, planner, parser);
-    }
-
-    @Test
-    public void testPrimaryKeyOnGeneratedColumn() {
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Could not create a PRIMARY KEY with column 'c' at line 5, column 34.\n"
-                        + "A PRIMARY KEY constraint must be declared on physical columns.");
-        final String sql2 =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a bigint not null,\n"
-                        + "  b varchar not null,\n"
-                        + "  c as 2 * (a + 1),\n"
-                        + "  constraint ct1 primary key (b, c) not enforced"
-                        + ") with (\n"
-                        + "    'connector' = 'kafka',\n"
-                        + "    'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-        parseAndConvert(sql2);
-    }
-
-    @Test
-    public void testPrimaryKeyNonExistentColumn() {
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Primary key column 'd' is not defined in the schema at line 5, column 34");
-        final String sql2 =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a bigint not null,\n"
-                        + "  b varchar not null,\n"
-                        + "  c as 2 * (a + 1),\n"
-                        + "  constraint ct1 primary key (b, d) not enforced"
-                        + ") with (\n"
-                        + "    'connector' = 'kafka',\n"
-                        + "    'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-        parseAndConvert(sql2);
-    }
-
-    @Test
-    public void testCreateTableWithMinusInOptionKey() {
-        final String sql =
-                "create table source_table(\n"
-                        + "  a int,\n"
-                        + "  b bigint,\n"
-                        + "  c varchar\n"
-                        + ") with (\n"
-                        + "  'a-B-c-d124' = 'Ab',\n"
-                        + "  'a.b-c-d.e-f.g' = 'ada',\n"
-                        + "  'a.b-c-d.e-f1231.g' = 'ada',\n"
-                        + "  'a.b-c-d.*' = 'adad')\n";
-        final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        SqlNode node = parser.parse(sql);
-        assert node instanceof SqlCreateTable;
-        Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
-        assert operation instanceof CreateTableOperation;
-        CreateTableOperation op = (CreateTableOperation) operation;
-        CatalogTable catalogTable = op.getCatalogTable();
-        Map<String, String> options =
-                catalogTable.getOptions().entrySet().stream()
-                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-        Map<String, String> sortedProperties = new TreeMap<>(options);
-        final String expected =
-                "{a-B-c-d124=Ab, "
-                        + "a.b-c-d.*=adad, "
-                        + "a.b-c-d.e-f.g=ada, "
-                        + "a.b-c-d.e-f1231.g=ada}";
-        assertEquals(expected, sortedProperties.toString());
-    }
-
-    @Test
-    public void testCreateTableWithWatermark()
-            throws FunctionAlreadyExistException, DatabaseNotExistException {
-        CatalogFunction cf =
-                new CatalogFunctionImpl(JavaUserDefinedScalarFunctions.JavaFunc5.class.getName());
-        catalog.createFunction(ObjectPath.fromString("default.myfunc"), cf, true);
-
-        final String sql =
-                "create table source_table(\n"
-                        + "  a int,\n"
-                        + "  b bigint,\n"
-                        + "  c timestamp(3),\n"
-                        + "  watermark for `c` as myfunc(c, 1) - interval '5' second\n"
-                        + ") with (\n"
-                        + "  'connector.type' = 'kafka')\n";
-        final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        SqlNode node = parser.parse(sql);
-        assert node instanceof SqlCreateTable;
-        Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
-        assert operation instanceof CreateTableOperation;
-        CreateTableOperation op = (CreateTableOperation) operation;
-        CatalogTable catalogTable = op.getCatalogTable();
-        Map<String, String> properties = catalogTable.toProperties();
-        Map<String, String> expected = new HashMap<>();
-        expected.put("schema.0.name", "a");
-        expected.put("schema.0.data-type", "INT");
-        expected.put("schema.1.name", "b");
-        expected.put("schema.1.data-type", "BIGINT");
-        expected.put("schema.2.name", "c");
-        expected.put("schema.2.data-type", "TIMESTAMP(3)");
-        expected.put("schema.watermark.0.rowtime", "c");
-        expected.put(
-                "schema.watermark.0.strategy.expr",
-                "`builtin`.`default`.`myfunc`(`c`, 1) - INTERVAL '5' SECOND");
-        expected.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
-        expected.put("connector.type", "kafka");
-        assertEquals(expected, properties);
-    }
-
-    @Test
-    public void testBasicCreateTableLike() {
-        Map<String, String> sourceProperties = new HashMap<>();
-        sourceProperties.put("format.type", "json");
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("f0", DataTypes.INT().notNull())
-                                .column("f1", DataTypes.TIMESTAMP(3))
-                                .build(),
-                        null,
-                        Collections.emptyList(),
-                        sourceProperties);
-
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
-        final String sql =
-                "create table derivedTable(\n"
-                        + "  a int,\n"
-                        + "  watermark for f1 as `f1` - interval '5' second\n"
-                        + ")\n"
-                        + "PARTITIONED BY (a, f0)\n"
-                        + "with (\n"
-                        + "  'connector.type' = 'kafka'"
-                        + ")\n"
-                        + "like sourceTable";
-        Operation operation = parseAndConvert(sql);
-
-        assertThat(
-                operation,
-                isCreateTableOperation(
-                        withSchema(
-                                Schema.newBuilder()
-                                        .column("f0", DataTypes.INT().notNull())
-                                        .column("f1", DataTypes.TIMESTAMP(3))
-                                        .column("a", DataTypes.INT())
-                                        .watermark("f1", "`f1` - INTERVAL '5' SECOND")
-                                        .build()),
-                        withOptions(entry("connector.type", "kafka"), entry("format.type", "json")),
-                        partitionedBy("a", "f0")));
-    }
-
-    @Test
-    public void testCreateTableLikeWithFullPath() {
-        Map<String, String> sourceProperties = new HashMap<>();
-        sourceProperties.put("connector.type", "kafka");
-        sourceProperties.put("format.type", "json");
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("f0", DataTypes.INT().notNull())
-                                .column("f1", DataTypes.TIMESTAMP(3))
-                                .build(),
-                        null,
-                        Collections.emptyList(),
-                        sourceProperties);
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-        final String sql = "create table mytable like `builtin`.`default`.sourceTable";
-        Operation operation = parseAndConvert(sql);
-
-        assertThat(
-                operation,
-                isCreateTableOperation(
-                        withSchema(
-                                Schema.newBuilder()
-                                        .column("f0", DataTypes.INT().notNull())
-                                        .column("f1", DataTypes.TIMESTAMP(3))
-                                        .build()),
-                        withOptions(
-                                entry("connector.type", "kafka"), entry("format.type", "json"))));
-    }
-
-    @Test
-    public void testMergingCreateTableLike() {
-        Map<String, String> sourceProperties = new HashMap<>();
-        sourceProperties.put("format.type", "json");
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("f0", DataTypes.INT().notNull())
-                                .column("f1", DataTypes.TIMESTAMP(3))
-                                .columnByExpression("f2", "`f0` + 12345")
-                                .watermark("f1", "`f1` - interval '1' second")
-                                .build(),
-                        null,
-                        Arrays.asList("f0", "f1"),
-                        sourceProperties);
-
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
-        final String sql =
-                "create table derivedTable(\n"
-                        + "  a int,\n"
-                        + "  watermark for f1 as `f1` - interval '5' second\n"
-                        + ")\n"
-                        + "PARTITIONED BY (a, f0)\n"
-                        + "with (\n"
-                        + "  'connector.type' = 'kafka'"
-                        + ")\n"
-                        + "like sourceTable (\n"
-                        + "   EXCLUDING GENERATED\n"
-                        + "   EXCLUDING PARTITIONS\n"
-                        + "   OVERWRITING OPTIONS\n"
-                        + "   OVERWRITING WATERMARKS"
-                        + ")";
-        Operation operation = parseAndConvert(sql);
-
-        assertThat(
-                operation,
-                isCreateTableOperation(
-                        withSchema(
-                                Schema.newBuilder()
-                                        .column("f0", DataTypes.INT().notNull())
-                                        .column("f1", DataTypes.TIMESTAMP(3))
-                                        .column("a", DataTypes.INT())
-                                        .watermark("f1", "`f1` - INTERVAL '5' SECOND")
-                                        .build()),
-                        withOptions(entry("connector.type", "kafka"), entry("format.type", "json")),
-                        partitionedBy("a", "f0")));
-    }
-
-    @Test
-    public void testCreateTableInvalidPartition() {
-        final String sql =
-                "create table derivedTable(\n" + "  a int\n" + ")\n" + "PARTITIONED BY (f3)";
-
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Partition column 'f3' not defined in the table schema. Available columns: ['a']");
-        parseAndConvert(sql);
-    }
-
-    @Test
-    public void testCreateTableLikeInvalidPartition() {
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder().column("f0", DataTypes.INT().notNull()).build(),
-                        null,
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
-        final String sql =
-                "create table derivedTable(\n"
-                        + "  a int\n"
-                        + ")\n"
-                        + "PARTITIONED BY (f3)\n"
-                        + "like sourceTable";
-
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Partition column 'f3' not defined in the table schema. Available columns: ['f0', 'a']");
-        parseAndConvert(sql);
-    }
-
-    @Test
-    public void testCreateTableInvalidWatermark() {
-        final String sql =
-                "create table derivedTable(\n"
-                        + "  a int,\n"
-                        + "  watermark for f1 as `f1` - interval '5' second\n"
-                        + ")";
-
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "The rowtime attribute field 'f1' is not defined in the table schema,"
-                        + " at line 3, column 17\n"
-                        + "Available fields: ['a']");
-        parseAndConvert(sql);
-    }
-
-    @Test
-    public void testCreateTableLikeInvalidWatermark() {
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder().column("f0", DataTypes.INT().notNull()).build(),
-                        null,
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
-        final String sql =
-                "create table derivedTable(\n"
-                        + "  a int,\n"
-                        + "  watermark for f1 as `f1` - interval '5' second\n"
-                        + ")\n"
-                        + "like sourceTable";
-
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "The rowtime attribute field 'f1' is not defined in the table schema,"
-                        + " at line 3, column 17\n"
-                        + "Available fields: ['f0', 'a']");
-        parseAndConvert(sql);
-    }
-
-    @Test
-    public void testCreateTableLikeNestedWatermark() {
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("f0", DataTypes.INT().notNull())
-                                .column(
-                                        "f1",
-                                        DataTypes.ROW(
-                                                DataTypes.FIELD("tmstmp", DataTypes.TIMESTAMP(3))))
-                                .build(),
-                        null,
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
-        final String sql =
-                "create table derivedTable(\n"
-                        + "  a int,\n"
-                        + "  watermark for f1.t as f1.t - interval '5' second\n"
-                        + ")\n"
-                        + "like sourceTable";
-
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "The rowtime attribute field 'f1.t' is not defined in the table schema,"
-                        + " at line 3, column 20\n"
-                        + "Nested field 't' was not found in a composite type:"
-                        + " ROW<`tmstmp` TIMESTAMP(3)>.");
-        parseAndConvert(sql);
-    }
-
-    @Test
-    public void testSqlInsertWithStaticPartition() {
-        final String sql = "insert into t1 partition(a=1) select b, c, d from t2";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assert operation instanceof CatalogSinkModifyOperation;
-        CatalogSinkModifyOperation sinkModifyOperation = (CatalogSinkModifyOperation) operation;
-        final Map<String, String> expectedStaticPartitions = new HashMap<>();
-        expectedStaticPartitions.put("a", "1");
-        assertEquals(expectedStaticPartitions, sinkModifyOperation.getStaticPartitions());
-    }
-
-    @Test
-    public void testSqlInsertWithDynamicTableOptions() {
-        final String sql =
-                "insert into t1 /*+ OPTIONS('k1'='v1', 'k2'='v2') */\n"
-                        + "select a, b, c, d from t2";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assert operation instanceof CatalogSinkModifyOperation;
-        CatalogSinkModifyOperation sinkModifyOperation = (CatalogSinkModifyOperation) operation;
-        Map<String, String> dynamicOptions = sinkModifyOperation.getDynamicOptions();
-        assertNotNull(dynamicOptions);
-        assertThat(dynamicOptions.size(), is(2));
-        assertThat(dynamicOptions.toString(), is("{k1=v1, k2=v2}"));
-    }
-
-    @Test
-    public void testDynamicTableWithInvalidOptions() {
-        final String sql = "select * from t1 /*+ OPTIONS('opt1', 'opt2') */";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        thrown.expect(AssertionError.class);
-        thrown.expectMessage("Hint [OPTIONS] only support " + "non empty key value options");
-        parse(sql, planner, parser);
-    }
-
-    @Test // TODO: tweak the tests when FLINK-13604 is fixed.
-    public void testCreateTableWithFullDataTypes() {
-        final List<TestItem> testItems =
-                Arrays.asList(
-                        createTestItem("CHAR", DataTypes.CHAR(1)),
-                        createTestItem("CHAR NOT NULL", DataTypes.CHAR(1).notNull()),
-                        createTestItem("CHAR NULL", DataTypes.CHAR(1)),
-                        createTestItem("CHAR(33)", DataTypes.CHAR(33)),
-                        createTestItem("VARCHAR", DataTypes.STRING()),
-                        createTestItem("VARCHAR(33)", DataTypes.VARCHAR(33)),
-                        createTestItem("STRING", DataTypes.STRING()),
-                        createTestItem("BOOLEAN", DataTypes.BOOLEAN()),
-                        createTestItem("BINARY", DataTypes.BINARY(1)),
-                        createTestItem("BINARY(33)", DataTypes.BINARY(33)),
-                        createTestItem("VARBINARY", DataTypes.BYTES()),
-                        createTestItem("VARBINARY(33)", DataTypes.VARBINARY(33)),
-                        createTestItem("BYTES", DataTypes.BYTES()),
-                        createTestItem("DECIMAL", DataTypes.DECIMAL(10, 0)),
-                        createTestItem("DEC", DataTypes.DECIMAL(10, 0)),
-                        createTestItem("NUMERIC", DataTypes.DECIMAL(10, 0)),
-                        createTestItem("DECIMAL(10)", DataTypes.DECIMAL(10, 0)),
-                        createTestItem("DEC(10)", DataTypes.DECIMAL(10, 0)),
-                        createTestItem("NUMERIC(10)", DataTypes.DECIMAL(10, 0)),
-                        createTestItem("DECIMAL(10, 3)", DataTypes.DECIMAL(10, 3)),
-                        createTestItem("DEC(10, 3)", DataTypes.DECIMAL(10, 3)),
-                        createTestItem("NUMERIC(10, 3)", DataTypes.DECIMAL(10, 3)),
-                        createTestItem("TINYINT", DataTypes.TINYINT()),
-                        createTestItem("SMALLINT", DataTypes.SMALLINT()),
-                        createTestItem("INTEGER", DataTypes.INT()),
-                        createTestItem("INT", DataTypes.INT()),
-                        createTestItem("BIGINT", DataTypes.BIGINT()),
-                        createTestItem("FLOAT", DataTypes.FLOAT()),
-                        createTestItem("DOUBLE", DataTypes.DOUBLE()),
-                        createTestItem("DOUBLE PRECISION", DataTypes.DOUBLE()),
-                        createTestItem("DATE", DataTypes.DATE()),
-                        createTestItem("TIME", DataTypes.TIME()),
-                        createTestItem("TIME WITHOUT TIME ZONE", DataTypes.TIME()),
-                        // Expect to be TIME(3).
-                        createTestItem("TIME(3)", DataTypes.TIME()),
-                        // Expect to be TIME(3).
-                        createTestItem("TIME(3) WITHOUT TIME ZONE", DataTypes.TIME()),
-                        createTestItem("TIMESTAMP", DataTypes.TIMESTAMP(6)),
-                        createTestItem("TIMESTAMP WITHOUT TIME ZONE", DataTypes.TIMESTAMP(6)),
-                        createTestItem("TIMESTAMP(3)", DataTypes.TIMESTAMP(3)),
-                        createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE", DataTypes.TIMESTAMP(3)),
-                        createTestItem(
-                                "TIMESTAMP WITH LOCAL TIME ZONE",
-                                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)),
-                        createTestItem(
-                                "TIMESTAMP(3) WITH LOCAL TIME ZONE",
-                                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
-                        createTestItem(
-                                "ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>",
-                                DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))),
-                        createTestItem(
-                                "ARRAY<INT NOT NULL>", DataTypes.ARRAY(DataTypes.INT().notNull())),
-                        createTestItem("INT ARRAY", DataTypes.ARRAY(DataTypes.INT())),
-                        createTestItem(
-                                "INT NOT NULL ARRAY", DataTypes.ARRAY(DataTypes.INT().notNull())),
-                        createTestItem(
-                                "INT ARRAY NOT NULL", DataTypes.ARRAY(DataTypes.INT()).notNull()),
-                        createTestItem(
-                                "MULTISET<INT NOT NULL>",
-                                DataTypes.MULTISET(DataTypes.INT().notNull())),
-                        createTestItem("INT MULTISET", DataTypes.MULTISET(DataTypes.INT())),
-                        createTestItem(
-                                "INT NOT NULL MULTISET",
-                                DataTypes.MULTISET(DataTypes.INT().notNull())),
-                        createTestItem(
-                                "INT MULTISET NOT NULL",
-                                DataTypes.MULTISET(DataTypes.INT()).notNull()),
-                        createTestItem(
-                                "MAP<BIGINT, BOOLEAN>",
-                                DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BOOLEAN())),
-                        // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
-                        createTestItem(
-                                "ROW<f0 INT NOT NULL, f1 BOOLEAN>",
-                                DataTypes.ROW(
-                                        DataTypes.FIELD("f0", DataTypes.INT()),
-                                        DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
-                        // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
-                        createTestItem(
-                                "ROW(f0 INT NOT NULL, f1 BOOLEAN)",
-                                DataTypes.ROW(
-                                        DataTypes.FIELD("f0", DataTypes.INT()),
-                                        DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
-                        createTestItem(
-                                "ROW<`f0` INT>",
-                                DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))),
-                        createTestItem(
-                                "ROW(`f0` INT)",
-                                DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))),
-                        createTestItem("ROW<>", DataTypes.ROW()),
-                        createTestItem("ROW()", DataTypes.ROW()),
-                        // Expect to be ROW<`f0` INT NOT NULL '...', `f1` BOOLEAN '...'>.
-                        createTestItem(
-                                "ROW<f0 INT NOT NULL 'This is a comment.',"
-                                        + " f1 BOOLEAN 'This as well.'>",
-                                DataTypes.ROW(
-                                        DataTypes.FIELD("f0", DataTypes.INT()),
-                                        DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
-                        createTestItem(
-                                "ARRAY<ROW<f0 INT, f1 BOOLEAN>>",
-                                DataTypes.ARRAY(
-                                        DataTypes.ROW(
-                                                DataTypes.FIELD("f0", DataTypes.INT()),
-                                                DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
-                        createTestItem(
-                                "ROW<f0 INT, f1 BOOLEAN> MULTISET",
-                                DataTypes.MULTISET(
-                                        DataTypes.ROW(
-                                                DataTypes.FIELD("f0", DataTypes.INT()),
-                                                DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
-                        createTestItem(
-                                "MULTISET<ROW<f0 INT, f1 BOOLEAN>>",
-                                DataTypes.MULTISET(
-                                        DataTypes.ROW(
-                                                DataTypes.FIELD("f0", DataTypes.INT()),
-                                                DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
-                        createTestItem(
-                                "ROW<f0 Row<f00 INT, f01 BOOLEAN>, "
-                                        + "f1 INT ARRAY, "
-                                        + "f2 BOOLEAN MULTISET>",
-                                DataTypes.ROW(
-                                        DataTypes.FIELD(
-                                                "f0",
-                                                DataTypes.ROW(
-                                                        DataTypes.FIELD("f00", DataTypes.INT()),
-                                                        DataTypes.FIELD(
-                                                                "f01", DataTypes.BOOLEAN()))),
-                                        DataTypes.FIELD("f1", DataTypes.ARRAY(DataTypes.INT())),
-                                        DataTypes.FIELD(
-                                                "f2", DataTypes.MULTISET(DataTypes.BOOLEAN())))));
-        StringBuilder buffer = new StringBuilder("create table t1(\n");
-        for (int i = 0; i < testItems.size(); i++) {
-            buffer.append("f").append(i).append(" ").append(testItems.get(i).testExpr);
-            if (i == testItems.size() - 1) {
-                buffer.append(")");
-            } else {
-                buffer.append(",\n");
-            }
-        }
-        final String sql = buffer.toString();
-        final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        SqlNode node = parser.parse(sql);
-        assert node instanceof SqlCreateTable;
-        Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
-        TableSchema schema = ((CreateTableOperation) operation).getCatalogTable().getSchema();
-        Object[] expectedDataTypes = testItems.stream().map(item -> item.expectedType).toArray();
-        assertArrayEquals(expectedDataTypes, schema.getFieldDataTypes());
-    }
-
-    @Test
-    public void testCreateTableWithComputedColumn() {
-        final String sql =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a int,\n"
-                        + "  b varchar, \n"
-                        + "  c as a - 1, \n"
-                        + "  d as b || '$$', \n"
-                        + "  e as my_udf1(a),"
-                        + "  f as `default`.my_udf2(a) + 1,"
-                        + "  g as builtin.`default`.my_udf3(a) || '##'\n"
-                        + ")\n"
-                        + "  with (\n"
-                        + "    'connector' = 'kafka', \n"
-                        + "    'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-        functionCatalog.registerTempCatalogScalarFunction(
-                ObjectIdentifier.of("builtin", "default", "my_udf1"), Func0$.MODULE$);
-        functionCatalog.registerTempCatalogScalarFunction(
-                ObjectIdentifier.of("builtin", "default", "my_udf2"), Func1$.MODULE$);
-        functionCatalog.registerTempCatalogScalarFunction(
-                ObjectIdentifier.of("builtin", "default", "my_udf3"), Func8$.MODULE$);
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, getParserBySqlDialect(SqlDialect.DEFAULT));
-        assert operation instanceof CreateTableOperation;
-        CreateTableOperation op = (CreateTableOperation) operation;
-        CatalogTable catalogTable = op.getCatalogTable();
-        assertArrayEquals(
-                new String[] {"a", "b", "c", "d", "e", "f", "g"},
-                catalogTable.getSchema().getFieldNames());
-        assertArrayEquals(
-                new DataType[] {
-                    DataTypes.INT(),
-                    DataTypes.STRING(),
-                    DataTypes.INT(),
-                    DataTypes.STRING(),
-                    DataTypes.INT().notNull(),
-                    DataTypes.INT(),
-                    DataTypes.STRING()
-                },
-                catalogTable.getSchema().getFieldDataTypes());
-        String[] columnExpressions =
-                catalogTable.getSchema().getTableColumns().stream()
-                        .filter(ComputedColumn.class::isInstance)
-                        .map(ComputedColumn.class::cast)
-                        .map(ComputedColumn::getExpression)
-                        .toArray(String[]::new);
-        String[] expected =
-                new String[] {
-                    "`a` - 1",
-                    "`b` || '$$'",
-                    "`builtin`.`default`.`my_udf1`(`a`)",
-                    "`builtin`.`default`.`my_udf2`(`a`) + 1",
-                    "`builtin`.`default`.`my_udf3`(`a`) || '##'"
-                };
-        assertArrayEquals(expected, columnExpressions);
-    }
-
-    @Test
-    public void testCreateTableWithMetadataColumn() {
-        final String sql =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a INT,\n"
-                        + "  b STRING,\n"
-                        + "  c INT METADATA,\n"
-                        + "  d INT METADATA FROM 'other.key',\n"
-                        + "  e INT METADATA VIRTUAL\n"
-                        + ")\n"
-                        + "  WITH (\n"
-                        + "    'connector' = 'kafka',\n"
-                        + "    'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-
-        final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final Operation operation = parse(sql, planner, getParserBySqlDialect(SqlDialect.DEFAULT));
-        assert operation instanceof CreateTableOperation;
-        final CreateTableOperation op = (CreateTableOperation) operation;
-        final TableSchema actualSchema = op.getCatalogTable().getSchema();
-
-        final TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("a", DataTypes.INT()))
-                        .add(TableColumn.physical("b", DataTypes.STRING()))
-                        .add(TableColumn.metadata("c", DataTypes.INT()))
-                        .add(TableColumn.metadata("d", DataTypes.INT(), "other.key"))
-                        .add(TableColumn.metadata("e", DataTypes.INT(), true))
-                        .build();
-
-        assertEquals(expectedSchema, actualSchema);
-    }
-
-    @Test
-    public void testAlterTable() throws Exception {
-        Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        catalogManager.registerCatalog("cat1", catalog);
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder().column("a", DataTypes.STRING()).build(),
-                        "tb1",
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.setCurrentCatalog("cat1");
-        catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
-        final String[] renameTableSqls =
-                new String[] {
-                    "alter table cat1.db1.tb1 rename to tb2",
-                    "alter table db1.tb1 rename to tb2",
-                    "alter table tb1 rename to cat1.db1.tb2",
-                };
-        final ObjectIdentifier expectedIdentifier = ObjectIdentifier.of("cat1", "db1", "tb1");
-        final ObjectIdentifier expectedNewIdentifier = ObjectIdentifier.of("cat1", "db1", "tb2");
-        // test rename table converter
-        for (int i = 0; i < renameTableSqls.length; i++) {
-            Operation operation = parse(renameTableSqls[i], SqlDialect.DEFAULT);
-            assert operation instanceof AlterTableRenameOperation;
-            final AlterTableRenameOperation alterTableRenameOperation =
-                    (AlterTableRenameOperation) operation;
-            assertEquals(expectedIdentifier, alterTableRenameOperation.getTableIdentifier());
-            assertEquals(expectedNewIdentifier, alterTableRenameOperation.getNewTableIdentifier());
-        }
-        // test alter table options
-        Operation operation =
-                parse(
-                        "alter table cat1.db1.tb1 set ('k1' = 'v1', 'K2' = 'V2')",
-                        SqlDialect.DEFAULT);
-        assert operation instanceof AlterTableOptionsOperation;
-        final AlterTableOptionsOperation alterTableOptionsOperation =
-                (AlterTableOptionsOperation) operation;
-        assertEquals(expectedIdentifier, alterTableOptionsOperation.getTableIdentifier());
-        assertEquals(2, alterTableOptionsOperation.getCatalogTable().getOptions().size());
-        Map<String, String> options = new HashMap<>();
-        options.put("k1", "v1");
-        options.put("K2", "V2");
-        assertEquals(options, alterTableOptionsOperation.getCatalogTable().getOptions());
-    }
-
-    @Test
-    public void testAlterTableAddPkConstraint() throws Exception {
-        Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        catalogManager.registerCatalog("cat1", catalog);
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.STRING().notNull())
-                                .column("b", DataTypes.BIGINT().notNull())
-                                .column("c", DataTypes.BIGINT())
-                                .build(),
-                        "tb1",
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.setCurrentCatalog("cat1");
-        catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
-        // Test alter add table constraint.
-        Operation operation =
-                parse(
-                        "alter table tb1 add constraint ct1 primary key(a, b) not enforced",
-                        SqlDialect.DEFAULT);
-        assert operation instanceof AlterTableAddConstraintOperation;
-        AlterTableAddConstraintOperation addConstraintOperation =
-                (AlterTableAddConstraintOperation) operation;
-        assertThat(
-                addConstraintOperation.asSummaryString(),
-                is(
-                        "ALTER TABLE ADD CONSTRAINT: (identifier: [`cat1`.`db1`.`tb1`], "
-                                + "constraintName: [ct1], columns: [a, b])"));
-        // Test alter table add pk on nullable column
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage("Could not create a PRIMARY KEY 'ct1'. Column 'c' is nullable.");
-        parse("alter table tb1 add constraint ct1 primary key(c) not enforced", SqlDialect.DEFAULT);
-    }
-
-    @Test
-    public void testAlterTableAddPkConstraintEnforced() throws Exception {
-        Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        catalogManager.registerCatalog("cat1", catalog);
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.STRING().notNull())
-                                .column("b", DataTypes.BIGINT().notNull())
-                                .column("c", DataTypes.BIGINT())
-                                .build(),
-                        "tb1",
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.setCurrentCatalog("cat1");
-        catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
-        // Test alter table add enforced
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Flink doesn't support ENFORCED mode for PRIMARY KEY constaint. "
-                        + "ENFORCED/NOT ENFORCED  controls if the constraint checks are performed on the "
-                        + "incoming/outgoing data. Flink does not own the data therefore the "
-                        + "only supported mode is the NOT ENFORCED mode");
-        parse("alter table tb1 add constraint ct1 primary key(a, b)", SqlDialect.DEFAULT);
-    }
-
-    @Test
-    public void testAlterTableAddUniqueConstraint() throws Exception {
-        Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        catalogManager.registerCatalog("cat1", catalog);
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.STRING().notNull())
-                                .column("b", DataTypes.BIGINT().notNull())
-                                .build(),
-                        "tb1",
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.setCurrentCatalog("cat1");
-        catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
-        // Test alter add table constraint.
-        thrown.expect(UnsupportedOperationException.class);
-        thrown.expectMessage("UNIQUE constraint is not supported yet");
-        parse("alter table tb1 add constraint ct1 unique(a, b) not enforced", SqlDialect.DEFAULT);
-    }
-
-    @Test
-    public void testAlterTableRenameColumn() throws Exception {
-        Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        catalogManager.registerCatalog("cat1", catalog);
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.STRING().notNull())
-                                .column("b", DataTypes.BIGINT().notNull())
-                                .column("c", DataTypes.TIMESTAMP(3).notNull())
-                                .watermark("c", "c - INTERVAL '5' SECOND")
-                                .primaryKey("b")
-                                .build(),
-                        "tb1",
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.setCurrentCatalog("cat1");
-        catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
-        // Test alter table rename
-        Operation operation = parse("alter table tb1 rename c to c1", SqlDialect.DEFAULT);
-        assert operation instanceof AlterTableSchemaOperation;
-        Schema unresolvedSchema =
-                ((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema();
-        String[] newColumns =
-                unresolvedSchema.getColumns().stream()
-                        .map(Schema.UnresolvedColumn::getName)
-                        .toArray(String[]::new);
-        // test rename column name
-        assertArrayEquals(newColumns, new String[] {"a", "b", "c1"});
-        // test watermark expression
-        Schema.UnresolvedWatermarkSpec unresolvedWatermarkSpec =
-                unresolvedSchema.getWatermarkSpecs().get(0);
-        assertThat(unresolvedWatermarkSpec.getColumnName(), is("c1"));
-
-        String expression =
-                ((SqlCallExpression) unresolvedWatermarkSpec.getWatermarkExpression())
-                        .getSqlExpression();
-        assertThat(expression, is("c1 - INTERVAL '5' SECOND"));
-    }
-
-    @Test
-    public void testAlterTableAddUniqueConstraintEnforced() throws Exception {
-        Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        catalogManager.registerCatalog("cat1", catalog);
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.STRING().notNull())
-                                .column("b", DataTypes.BIGINT().notNull())
-                                .column("c", DataTypes.BIGINT())
-                                .build(),
-                        "tb1",
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.setCurrentCatalog("cat1");
-        catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
-        // Test alter table add enforced
-        thrown.expect(UnsupportedOperationException.class);
-        thrown.expectMessage("UNIQUE constraint is not supported yet");
-        parse("alter table tb1 add constraint ct1 unique(a, b)", SqlDialect.DEFAULT);
-    }
-
-    @Test
-    public void testAlterTableDropConstraint() throws Exception {
-        Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        catalogManager.registerCatalog("cat1", catalog);
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.STRING().notNull())
-                                .column("b", DataTypes.BIGINT().notNull())
-                                .column("c", DataTypes.BIGINT())
-                                .primaryKeyNamed("ct1", "a", "b")
-                                .build(),
-                        "tb1",
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.setCurrentCatalog("cat1");
-        catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
-        // Test alter table add enforced
-        Operation operation = parse("alter table tb1 drop constraint ct1", SqlDialect.DEFAULT);
-        assert operation instanceof AlterTableDropConstraintOperation;
-        AlterTableDropConstraintOperation dropConstraint =
-                (AlterTableDropConstraintOperation) operation;
-        assertThat(
-                dropConstraint.asSummaryString(),
-                is("ALTER TABLE `cat1`.`db1`.`tb1` DROP CONSTRAINT ct1"));
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage("CONSTRAINT [ct2] does not exist");
-        parse("alter table tb1 drop constraint ct2", SqlDialect.DEFAULT);
-    }
-
-    @Test
-    public void testCreateViewWithMatchRecognize() {
-        Map<String, String> prop = new HashMap<>();
-        prop.put("connector", "values");
-        prop.put("bounded", "true");
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("id", DataTypes.INT().notNull())
-                                .column("measurement", DataTypes.BIGINT().notNull())
-                                .column(
-                                        "ts",
-                                        DataTypes.ROW(
-                                                DataTypes.FIELD("tmstmp", DataTypes.TIMESTAMP(3))))
-                                .build(),
-                        null,
-                        Collections.emptyList(),
-                        prop);
-
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "events"), false);
-
-        final String sql =
-                ""
-                        + "CREATE TEMPORARY VIEW foo AS "
-                        + "SELECT * "
-                        + "FROM events MATCH_RECOGNIZE ("
-                        + "    PARTITION BY id "
-                        + "    ORDER BY ts ASC "
-                        + "    MEASURES "
-                        + "      next_step.measurement - this_step.measurement AS diff "
-                        + "    AFTER MATCH SKIP TO NEXT ROW "
-                        + "    PATTERN (this_step next_step)"
-                        + "    DEFINE "
-                        + "         this_step AS TRUE,"
-                        + "         next_step AS TRUE"
-                        + ")";
-
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assertThat(operation, instanceOf(CreateViewOperation.class));
-    }
-
-    @Test
-    public void testCreateViewWithDynamicTableOptions() {
-        tableConfig
-                .getConfiguration()
-                .setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
-        Map<String, String> prop = new HashMap<>();
-        prop.put("connector", "values");
-        prop.put("bounded", "true");
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("f0", DataTypes.INT())
-                                .column("f1", DataTypes.VARCHAR(20))
-                                .build(),
-                        null,
-                        Collections.emptyList(),
-                        prop);
-
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "sourceA"), false);
-
-        final String sql =
-                ""
-                        + "create view test_view as\n"
-                        + "select *\n"
-                        + "from sourceA /*+ OPTIONS('changelog-mode'='I') */";
-
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assertThat(operation, instanceOf(CreateViewOperation.class));
-    }
-
-    @Test
-    public void testBeginStatementSet() {
-        final String sql = "BEGIN STATEMENT SET";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof BeginStatementSetOperation;
-        final BeginStatementSetOperation beginStatementSetOperation =
-                (BeginStatementSetOperation) operation;
-
-        assertEquals("BEGIN STATEMENT SET", beginStatementSetOperation.asSummaryString());
-    }
-
-    @Test
-    public void testEnd() {
-        final String sql = "END";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof EndStatementSetOperation;
-        final EndStatementSetOperation endStatementSetOperation =
-                (EndStatementSetOperation) operation;
-
-        assertEquals("END", endStatementSetOperation.asSummaryString());
-    }
-
-    // ~ Tool Methods ----------------------------------------------------------
-
-    private static TestItem createTestItem(Object... args) {
-        assert args.length == 2;
-        final String testExpr = (String) args[0];
-        TestItem testItem = TestItem.fromTestExpr(testExpr);
-        if (args[1] instanceof String) {
-            testItem.withExpectedError((String) args[1]);
-        } else {
-            testItem.withExpectedType(args[1]);
-        }
-        return testItem;
-    }
-
-    private void assertShowFunctions(
-            String sql, String expectedSummary, FunctionScope expectedScope) {
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof ShowFunctionsOperation;
-        final ShowFunctionsOperation showFunctionsOperation = (ShowFunctionsOperation) operation;
-
-        assertEquals(expectedScope, showFunctionsOperation.getFunctionScope());
-        assertEquals(expectedSummary, showFunctionsOperation.asSummaryString());
-    }
-
-    private Operation parse(String sql, FlinkPlannerImpl planner, CalciteParser parser) {
-        SqlNode node = parser.parse(sql);
-        return SqlToOperationConverter.convert(planner, catalogManager, node).get();
-    }
-
-    private Operation parse(String sql, SqlDialect sqlDialect) {
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(sqlDialect);
-        final CalciteParser parser = getParserBySqlDialect(sqlDialect);
-        SqlNode node = parser.parse(sql);
-        return SqlToOperationConverter.convert(planner, catalogManager, node).get();
-    }
-
-    private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
-        tableConfig.setSqlDialect(sqlDialect);
-        return plannerContext.createFlinkPlanner(
-                catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase());
-    }
-
-    private CalciteParser getParserBySqlDialect(SqlDialect sqlDialect) {
-        tableConfig.setSqlDialect(sqlDialect);
-        return plannerContext.createCalciteParser();
-    }
-
-    // ~ Inner Classes ----------------------------------------------------------
-
-    private static class TestItem {
-        private final String testExpr;
-        @Nullable private Object expectedType;
-        @Nullable private String expectedError;
-
-        private TestItem(String testExpr) {
-            this.testExpr = testExpr;
-        }
-
-        static TestItem fromTestExpr(String testExpr) {
-            return new TestItem(testExpr);
-        }
-
-        TestItem withExpectedType(Object expectedType) {
-            this.expectedType = expectedType;
-            return this;
-        }
-
-        TestItem withExpectedError(String expectedError) {
-            this.expectedError = expectedError;
-            return this;
-        }
-
-        @Override
-        public String toString() {
-            return this.testExpr;
-        }
-    }
-
-    private Operation parseAndConvert(String sql) {
-        final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-
-        SqlNode node = parser.parse(sql);
-        return SqlToOperationConverter.convert(planner, catalogManager, node).get();
-    }
-}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
new file mode 100644
index 00000000000..7b22246de8d
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
@@ -0,0 +1,108 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+
+import org.apache.calcite.rex.RexInputRef;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** A finder used to look up referenced column name in a {@link ResolvedExpression}. */
+public class ColumnReferenceFinder {
+
+    private ColumnReferenceFinder() {}
+
+    public static Set<String> findReferencedColumn(
+            ResolvedExpression resolvedExpression, List<String> tableColumns) {
+        ColumnReferenceVisitor visitor = new ColumnReferenceVisitor(tableColumns);
+        visitor.visit(resolvedExpression);
+        return visitor.referencedColumns;
+    }
+
+    private static class ColumnReferenceVisitor extends ExpressionDefaultVisitor<Void> {
+        private final List<String> tableColumns;
+        private final Set<String> referencedColumns;
+
+        public ColumnReferenceVisitor(List<String> tableColumns) {
+            this.tableColumns = tableColumns;
+            this.referencedColumns = new HashSet<>();
+        }
+
+        @Override
+        public Void visit(Expression expression) {
+            if (expression instanceof LocalReferenceExpression) {
+                return visit((LocalReferenceExpression) expression);
+            } else if (expression instanceof FieldReferenceExpression) {
+                return visit((FieldReferenceExpression) expression);
+            } else if (expression instanceof RexNodeExpression) {
+                return visit((RexNodeExpression) expression);
+            } else if (expression instanceof CallExpression) {
+                return visit((CallExpression) expression);
+            } else {
+                return super.visit(expression);
+            }
+        }
+
+        @Override
+        public Void visit(FieldReferenceExpression fieldReference) {
+            referencedColumns.add(fieldReference.getName());
+            return null;
+        }
+
+        public Void visit(LocalReferenceExpression localReference) {
+            referencedColumns.add(localReference.getName());
+            return null;
+        }
+
+        public Void visit(RexNodeExpression rexNode) {
+            // get the referenced column ref in table
+            Set<RexInputRef> inputRefs = FlinkRexUtil.findAllInputRefs(rexNode.getRexNode());
+            // get the referenced column name by index
+            inputRefs.forEach(
+                    inputRef -> {
+                        int index = inputRef.getIndex();
+                        referencedColumns.add(tableColumns.get(index));
+                    });
+            return null;
+        }
+
+        @Override
+        public Void visit(CallExpression call) {
+            for (Expression expression : call.getChildren()) {
+                visit(expression);
+            }
+            return null;
+        }
+
+        @Override
+        protected Void defaultMethod(Expression expression) {
+            throw new TableException("Unexpected expression: " + expression);
+        }
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
index 24acefd2930..db4cfc1e9cf 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.operations;
 
 import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableModify;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableRenameColumn;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn;
 import org.apache.flink.sql.parser.ddl.SqlWatermark;
@@ -27,9 +28,15 @@ import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
 import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.SchemaResolver;
+import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.expressions.SqlCallExpression;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.expressions.ColumnReferenceFinder;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
@@ -45,6 +52,7 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -64,6 +72,8 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDa
  */
 public class AlterSchemaConverter {
 
+    private static final String EX_MSG_PREFIX = "Failed to execute ALTER TABLE statement.\n";
+
     private final SqlValidator sqlValidator;
     private final Function<SqlNode, String> escapeExpression;
     private final Consumer<SqlTableConstraint> constraintValidator;
@@ -104,9 +114,83 @@ public class AlterSchemaConverter {
         return converter.convert();
     }
 
-    private abstract static class SchemaConverter {
+    public Schema applySchemaChange(
+            SqlAlterTableRenameColumn renameColumn, ContextResolvedTable originalTable) {
+        String oldColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
+        String newColumnName = getColumnName(renameColumn.getNewColumnIdentifier());
+        List<String> tableColumns =
+                originalTable.getResolvedSchema().getColumns().stream()
+                        .map(Column::getName)
+                        .collect(Collectors.toList());
+        // validate old column is exists or new column isn't duplicated or old column isn't
+        // referenced by computed column
+        validateColumnName(
+                oldColumnName,
+                newColumnName,
+                tableColumns,
+                originalTable.getResolvedSchema(),
+                ((CatalogTable) originalTable.getResolvedTable()).getPartitionKeys());
+
+        // validate old column isn't referenced by watermark
+        List<WatermarkSpec> watermarkSpecs = originalTable.getResolvedSchema().getWatermarkSpecs();
+        watermarkSpecs.forEach(
+                watermarkSpec -> {
+                    String rowtimeAttribute = watermarkSpec.getRowtimeAttribute();
+                    Set<String> referencedColumns =
+                            ColumnReferenceFinder.findReferencedColumn(
+                                    watermarkSpec.getWatermarkExpression(), tableColumns);
+                    if (oldColumnName.equals(rowtimeAttribute)
+                            || referencedColumns.contains(oldColumnName)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Old column %s is referred by watermark expression %s, "
+                                                + "currently doesn't allow to rename column which is "
+                                                + "referred by watermark expression.",
+                                        oldColumnName, watermarkSpec.asSummaryString()));
+                    }
+                });
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        Schema originSchema = originalTable.getTable().getUnresolvedSchema();
+        originSchema
+                .getColumns()
+                .forEach(
+                        column -> {
+                            if (oldColumnName.equals(column.getName())) {
+                                buildNewColumnFromOriginColumn(builder, column, newColumnName);
+                            } else {
+                                builder.fromColumns(Collections.singletonList(column));
+                            }
+                        });
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames();
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> newPrimaryKeyNames =
+                    originPrimaryKeyNames.stream()
+                            .map(pkName -> pkName.equals(oldColumnName) ? newColumnName : pkName)
+                            .collect(Collectors.toList());
+            builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
+        }
+
+        // build watermark
+        originSchema
+                .getWatermarkSpecs()
+                .forEach(
+                        watermarkSpec ->
+                                builder.watermark(
+                                        watermarkSpec.getColumnName(),
+                                        watermarkSpec.getWatermarkExpression()));
+
+        // generate new schema
+        return builder.build();
+    }
+
+    // --------------------------------------------------------------------------------------------
 
-        static final String EX_MSG_PREFIX = "Failed to execute ALTER TABLE statement.\n";
+    private abstract static class SchemaConverter {
 
         List<String> sortedColumnNames = new ArrayList<>();
         Set<String> alterColNames = new HashSet<>();
@@ -278,13 +362,7 @@ public class AlterSchemaConverter {
             for (SqlNode alterColumn : alterColumns) {
                 SqlTableColumnPosition columnPosition = (SqlTableColumnPosition) alterColumn;
                 SqlTableColumn column = columnPosition.getColumn();
-                if (!column.getName().isSimple()) {
-                    throw new UnsupportedOperationException(
-                            String.format(
-                                    "%sAlter nested row type is not supported yet.",
-                                    EX_MSG_PREFIX));
-                }
-                String columnName = column.getName().getSimple();
+                String columnName = getColumnName(column.getName());
                 if (!alterColNames.add(columnName)) {
                     throw new ValidationException(
                             String.format(
@@ -493,6 +571,87 @@ public class AlterSchemaConverter {
         }
     }
 
+    // --------------------------------------------------------------------------------------------
+
+    private void validateColumnName(
+            String originColumnName,
+            String newColumnName,
+            List<String> tableColumns,
+            ResolvedSchema originResolvedSchema,
+            List<String> partitionKeys) {
+        // validate old column
+        if (!tableColumns.contains(originColumnName)) {
+            throw new ValidationException(
+                    String.format(
+                            "Old column %s not found in table schema for RENAME COLUMN",
+                            originColumnName));
+        }
+
+        // validate new column
+        if (tableColumns.contains(newColumnName)) {
+            throw new ValidationException(
+                    String.format(
+                            "New column %s already existed in table schema for RENAME COLUMN",
+                            newColumnName));
+        }
+
+        // validate old column name isn't referred by computed column case
+        originResolvedSchema.getColumns().stream()
+                .filter(column -> column instanceof Column.ComputedColumn)
+                .forEach(
+                        column -> {
+                            Column.ComputedColumn computedColumn = (Column.ComputedColumn) column;
+                            Set<String> referencedColumn =
+                                    ColumnReferenceFinder.findReferencedColumn(
+                                            computedColumn.getExpression(), tableColumns);
+                            if (referencedColumn.contains(originColumnName)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Old column %s is referred by computed column %s, currently doesn't "
+                                                        + "allow to rename column which is referred by computed column.",
+                                                originColumnName,
+                                                computedColumn.asSummaryString()));
+                            }
+                        });
+        // validate partition keys doesn't contain the old column
+        if (partitionKeys.contains(originColumnName)) {
+            throw new ValidationException(
+                    String.format(
+                            "Can not rename column %s because it is used as the partition keys.",
+                            originColumnName));
+        }
+    }
+
+    private void buildNewColumnFromOriginColumn(
+            Schema.Builder builder, Schema.UnresolvedColumn originColumn, String columnName) {
+        if (originColumn instanceof Schema.UnresolvedComputedColumn) {
+            builder.columnByExpression(
+                    columnName, ((Schema.UnresolvedComputedColumn) originColumn).getExpression());
+        } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
+            builder.column(
+                    columnName, ((Schema.UnresolvedPhysicalColumn) originColumn).getDataType());
+        } else if (originColumn instanceof Schema.UnresolvedMetadataColumn) {
+            Schema.UnresolvedMetadataColumn metadataColumn =
+                    (Schema.UnresolvedMetadataColumn) originColumn;
+            builder.columnByMetadata(
+                    columnName,
+                    metadataColumn.getDataType(),
+                    metadataColumn.getMetadataKey(),
+                    metadataColumn.isVirtual());
+        }
+        originColumn.getComment().ifPresent(builder::withComment);
+    }
+
+    private static String getColumnName(SqlIdentifier identifier) {
+        if (!identifier.isSimple()) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "%sAlter nested row type %s is not supported yet.",
+                            EX_MSG_PREFIX, identifier));
+        }
+        return identifier.getSimple();
+    }
+
     private AlterSchemaStrategy computeAlterSchemaStrategy(SqlAlterTableSchema alterTableSchema) {
         if (alterTableSchema instanceof SqlAlterTableAdd) {
             return AlterSchemaStrategy.ADD;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 237977c049c..3b4b196e991 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -28,6 +28,7 @@ import org.apache.flink.sql.parser.ddl.SqlAlterTableCompact;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableOptions;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableRename;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableRenameColumn;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableReset;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
 import org.apache.flink.sql.parser.ddl.SqlAlterView;
@@ -531,6 +532,20 @@ public class SqlToOperationConverter {
                     (SqlChangeColumn) sqlAlterTable,
                     (CatalogTable) baseTable,
                     flinkPlanner.getOrCreateSqlValidator());
+        } else if (sqlAlterTable instanceof SqlAlterTableRenameColumn) {
+            SqlAlterTableRenameColumn sqlAlterTableRenameColumn =
+                    (SqlAlterTableRenameColumn) sqlAlterTable;
+            Schema newSchema =
+                    alterSchemaConverter.applySchemaChange(
+                            sqlAlterTableRenameColumn, optionalCatalogTable.get());
+            CatalogTable baseCatalogTable = (CatalogTable) baseTable;
+            return new AlterTableSchemaOperation(
+                    tableIdentifier,
+                    CatalogTable.of(
+                            newSchema,
+                            baseCatalogTable.getComment(),
+                            baseCatalogTable.getPartitionKeys(),
+                            baseCatalogTable.getOptions()));
         } else if (sqlAlterTable instanceof SqlAddPartitions) {
             List<CatalogPartitionSpec> specs = new ArrayList<>();
             List<CatalogPartition> partitions = new ArrayList<>();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index 1f79acbb0b3..6b663e70ff9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.sql.parser.ddl.SqlChangeColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableOption;
-import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
@@ -32,8 +31,6 @@ import org.apache.flink.table.api.WatermarkSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.SqlCallExpression;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
@@ -41,12 +38,10 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.utils.TypeConversions;
 
-import com.google.common.collect.Lists;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.validate.SqlValidator;
-import org.apache.commons.lang3.StringUtils;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -67,12 +62,10 @@ public class OperationConverterUtils {
             CatalogTable catalogTable,
             SqlValidator sqlValidator) {
         // This is only used by the Hive dialect at the moment. In Hive, only non-partition columns
-        // can be
-        // added/replaced and users will only define non-partition columns in the new column list.
-        // Therefore, we require
-        // that partitions columns must appear last in the schema (which is inline with Hive).
-        // Otherwise, we won't be
-        // able to determine the column positions after the non-partition columns are replaced.
+        // can be added/replaced and users will only define non-partition columns in the new column
+        // list. Therefore, we require that partitions columns must appear last in the schema (which
+        // is inline with Hive). Otherwise, we won't be able to determine the column positions after
+        // the non-partition columns are replaced.
         TableSchema oldSchema = catalogTable.getSchema();
         int numPartCol = catalogTable.getPartitionKeys().size();
         Set<String> lastCols =
@@ -149,116 +142,6 @@ public class OperationConverterUtils {
         // TODO: handle watermark and constraints
     }
 
-    public static Operation convertRenameColumn(
-            ObjectIdentifier tableIdentifier,
-            String originColumnName,
-            String newColumnName,
-            CatalogTable catalogTable) {
-
-        Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();
-        validateColumnName(originColumnName, newColumnName, modifiedTableSchema);
-
-        Schema.Builder builder = Schema.newBuilder();
-        // build column
-        modifiedTableSchema.getColumns().stream()
-                .forEach(
-                        column -> {
-                            if (StringUtils.equals(column.getName(), originColumnName)) {
-                                buildNewColumnFromOriginColumn(builder, column, newColumnName);
-                            } else {
-                                buildNewColumnFromOriginColumn(builder, column, column.getName());
-                            }
-                        });
-        // build primary key column
-        List<String> originPrimaryKeyNames =
-                modifiedTableSchema
-                        .getPrimaryKey()
-                        .map(Schema.UnresolvedPrimaryKey::getColumnNames)
-                        .orElseGet(Lists::newArrayList);
-
-        List<String> newPrimaryKeyNames =
-                originPrimaryKeyNames.stream()
-                        .map(
-                                pkName ->
-                                        StringUtils.equals(pkName, originColumnName)
-                                                ? newColumnName
-                                                : pkName)
-                        .collect(Collectors.toList());
-
-        if (newPrimaryKeyNames.size() > 0) {
-            builder.primaryKey(newPrimaryKeyNames);
-        }
-        // build watermark
-        modifiedTableSchema.getWatermarkSpecs().stream()
-                .forEach(
-                        watermarkSpec -> {
-                            String watermarkRefColumnName = watermarkSpec.getColumnName();
-                            Expression watermarkExpression = watermarkSpec.getWatermarkExpression();
-                            if (StringUtils.equals(watermarkRefColumnName, originColumnName)) {
-                                String newWatermarkExpression =
-                                        ((SqlCallExpression) watermarkExpression)
-                                                .getSqlExpression()
-                                                .replace(watermarkRefColumnName, newColumnName);
-                                builder.watermark(newColumnName, newWatermarkExpression);
-                            } else {
-                                builder.watermark(watermarkRefColumnName, watermarkExpression);
-                            }
-                        });
-        // build partition key
-        List<String> newPartitionKeys =
-                catalogTable.getPartitionKeys().stream()
-                        .map(
-                                name ->
-                                        StringUtils.equals(name, originColumnName)
-                                                ? newColumnName
-                                                : name)
-                        .collect(Collectors.toList());
-        // generate new schema
-        Schema newSchema = builder.build();
-
-        return new AlterTableSchemaOperation(
-                tableIdentifier,
-                CatalogTable.of(
-                        newSchema,
-                        catalogTable.getComment(),
-                        newPartitionKeys,
-                        catalogTable.getOptions()));
-    }
-
-    private static void validateColumnName(
-            String originColumnName, String newColumnName, Schema modifiedTableSchema) {
-        List<String> columnNames =
-                modifiedTableSchema.getColumns().stream()
-                        .map(Schema.UnresolvedColumn::getName)
-                        .collect(Collectors.toList());
-
-        int originColumnIndex = columnNames.indexOf(originColumnName);
-        if (originColumnIndex < 0) {
-            throw new ValidationException(
-                    String.format("Old column %s not found for RENAME COLUMN ", originColumnName));
-        }
-
-        int sameColumnNameIndex = columnNames.indexOf(newColumnName);
-        if (sameColumnNameIndex >= 0) {
-            throw new ValidationException(
-                    String.format("New column %s existed for RENAME COLUMN ", newColumnName));
-        }
-    }
-
-    private static void buildNewColumnFromOriginColumn(
-            Schema.Builder builder, Schema.UnresolvedColumn originColumn, String columnName) {
-        if (originColumn instanceof Schema.UnresolvedComputedColumn) {
-            builder.columnByExpression(
-                    columnName, ((Schema.UnresolvedComputedColumn) originColumn).getExpression());
-        } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
-            builder.column(
-                    columnName, ((Schema.UnresolvedPhysicalColumn) originColumn).getDataType());
-        } else {
-            builder.columnByMetadata(
-                    columnName, ((Schema.UnresolvedMetadataColumn) originColumn).getDataType());
-        }
-    }
-
     // change a column in the old table schema and return the updated table schema
     public static TableSchema changeColumn(
             TableSchema oldSchema,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
index fc342e39b3b..81819208b9d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
@@ -313,7 +313,7 @@ object FlinkRexUtil {
    * @return
    *   InputRef HashSet.
    */
-  private[flink] def findAllInputRefs(node: RexNode): util.HashSet[RexInputRef] = {
+  def findAllInputRefs(node: RexNode): util.HashSet[RexInputRef] = {
     val set = new util.HashSet[RexInputRef]
     node.accept(new RexVisitorImpl[Void](true) {
       override def visitInputRef(inputRef: RexInputRef): Void = {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index c7ba3a5214c..1bbeb29d920 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -130,6 +130,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
+import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.planner.utils.OperationMatchers.entry;
 import static org.apache.flink.table.planner.utils.OperationMatchers.isCreateTableOperation;
 import static org.apache.flink.table.planner.utils.OperationMatchers.partitionedBy;
@@ -1268,6 +1269,121 @@ public class SqlToOperationConverterTest {
                 .hasMessageContaining("ALTER TABLE RESET does not support empty key");
     }
 
+    @Test
+    public void testAlterTableRenameColumn() throws Exception {
+        prepareTable("tb1", false, false, true, 3);
+        // rename pk column c
+        Operation operation = parse("alter table tb1 rename c to c1");
+        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema())
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.INT().notNull())
+                                .column("b", DataTypes.BIGINT().notNull())
+                                .column("c1", DataTypes.STRING().notNull())
+                                .withComment("column comment")
+                                .columnByExpression("d", "a*(b+2 + a*b)")
+                                .column(
+                                        "e",
+                                        DataTypes.ROW(
+                                                DataTypes.STRING(),
+                                                DataTypes.INT(),
+                                                DataTypes.ROW(
+                                                        DataTypes.DOUBLE(),
+                                                        DataTypes.ARRAY(DataTypes.FLOAT()))))
+                                .columnByExpression("f", "e.f1 + e.f2.f0")
+                                .columnByMetadata("g", DataTypes.STRING(), null, true)
+                                .column("ts", DataTypes.TIMESTAMP(3))
+                                .withComment("just a comment")
+                                .watermark("ts", "ts - interval '5' seconds")
+                                .primaryKeyNamed("ct1", "a", "b", "c1")
+                                .build());
+
+        // rename computed column
+        operation = parse("alter table tb1 rename f to f1");
+        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema())
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.INT().notNull())
+                                .column("b", DataTypes.BIGINT().notNull())
+                                .column("c", DataTypes.STRING().notNull())
+                                .withComment("column comment")
+                                .columnByExpression("d", "a*(b+2 + a*b)")
+                                .column(
+                                        "e",
+                                        DataTypes.ROW(
+                                                DataTypes.STRING(),
+                                                DataTypes.INT(),
+                                                DataTypes.ROW(
+                                                        DataTypes.DOUBLE(),
+                                                        DataTypes.ARRAY(DataTypes.FLOAT()))))
+                                .columnByExpression("f1", "e.f1 + e.f2.f0")
+                                .columnByMetadata("g", DataTypes.STRING(), null, true)
+                                .column("ts", DataTypes.TIMESTAMP(3))
+                                .withComment("just a comment")
+                                .watermark("ts", "ts - interval '5' seconds")
+                                .primaryKeyNamed("ct1", "a", "b", "c")
+                                .build());
+
+        // rename column c that is used in a computed column
+        assertThatThrownBy(() -> parse("alter table tb1 rename a to a1"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Old column a is referred by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b), "
+                                + "currently doesn't allow to rename column which is referred by computed column.");
+
+        // rename column used in the watermark expression
+        assertThatThrownBy(() -> parse("alter table tb1 rename ts to ts1"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Old column ts is referred by watermark expression WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds, "
+                                + "currently doesn't allow to rename column which is referred by watermark expression.");
+
+        // rename nested column
+        assertThatThrownBy(() -> parse("alter table tb1 rename e.f1 to e.f11"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Alter nested row type e.f1 is not supported yet.");
+
+        // rename column with duplicate name
+        assertThatThrownBy(() -> parse("alter table tb1 rename c to a"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "New column a already existed in table schema for RENAME COLUMN");
+
+        // rename column e test computed column expression is ApiExpression which doesn't implement
+        // the equals method
+        CatalogTable catalogTable2 =
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.STRING().notNull())
+                                .column("b", DataTypes.INT().notNull())
+                                .column("e", DataTypes.STRING())
+                                .columnByExpression("j", $("e").upperCase())
+                                .columnByExpression("g", "TO_TIMESTAMP(e)")
+                                .primaryKey("a", "b")
+                                .build(),
+                        "tb2",
+                        Collections.singletonList("a"),
+                        Collections.emptyMap());
+        catalogManager
+                .getCatalog("cat1")
+                .get()
+                .createTable(new ObjectPath("db1", "tb2"), catalogTable2, true);
+
+        assertThatThrownBy(() -> parse("alter table `cat1`.`db1`.`tb2` rename e to e1"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Old column e is referred by computed column `j` STRING AS upper(e), currently doesn't "
+                                + "allow to rename column which is referred by computed column.");
+
+        // rename column used as partition key
+        assertThatThrownBy(() -> parse("alter table tb2 rename a to a1"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Can not rename column a because it is used as the partition keys");
+    }
+
     @Test
     public void testAlterTableDropConstraint() throws Exception {
         prepareNonManagedTable(true);
@@ -1396,9 +1512,7 @@ public class SqlToOperationConverterTest {
         // add an inner field to a nested row
         assertThatThrownBy(() -> parse("alter table tb1 add (e.f3 string)"))
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Alter nested row type is not supported yet.");
+                .hasMessageContaining("Alter nested row type e.f3 is not supported yet.");
 
         // refer to a nested inner field
         assertThatThrownBy(() -> parse("alter table tb1 add (x string after e.f2)"))
@@ -1409,9 +1523,7 @@ public class SqlToOperationConverterTest {
 
         assertThatThrownBy(() -> parse("alter table tb1 add (e.f3 string after e.f1)"))
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Alter nested row type is not supported yet.");
+                .hasMessageContaining("Alter nested row type e.f3 is not supported yet.");
     }
 
     @Test
@@ -1430,7 +1542,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1464,7 +1576,7 @@ public class SqlToOperationConverterTest {
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
                                 + "  `i` AS [`b` * 2],\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1483,7 +1595,8 @@ public class SqlToOperationConverterTest {
                         .column("a", DataTypes.INT().notNull())
                         .column("b", DataTypes.BIGINT().notNull())
                         .columnByExpression("i", new SqlCallExpression("`b` * 2"))
-                        .column("c", DataTypes.STRING())
+                        .column("c", DataTypes.STRING().notNull())
+                        .withComment("column comment")
                         .columnByExpression("d", "a*(b+2 + a*b)")
                         .column(
                                 "e",
@@ -1514,7 +1627,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1649,7 +1762,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1671,7 +1784,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1696,7 +1809,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1720,7 +1833,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1799,7 +1912,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1820,7 +1933,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1849,7 +1962,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1879,7 +1992,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1970,9 +2083,7 @@ public class SqlToOperationConverterTest {
         // modify an inner field to a nested row
         assertThatThrownBy(() -> parse("alter table tb2 modify (e.f0 string)"))
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Alter nested row type is not supported yet.");
+                .hasMessageContaining("Alter nested row type e.f0 is not supported yet.");
 
         // refer to a nested inner field
         assertThatThrownBy(() -> parse("alter table tb2 modify (g string after e.f2)"))
@@ -1983,9 +2094,7 @@ public class SqlToOperationConverterTest {
 
         assertThatThrownBy(() -> parse("alter table tb2 modify (e.f0 string after e.f1)"))
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Alter nested row type is not supported yet.");
+                .hasMessageContaining("Alter nested row type e.f0 is not supported yet.");
     }
 
     @Test
@@ -2005,7 +2114,8 @@ public class SqlToOperationConverterTest {
                         .column("b", DataTypes.BIGINT().notNull())
                         .withComment("move b to first and add comment")
                         .column("a", DataTypes.INT().notNull())
-                        .column("c", DataTypes.STRING())
+                        .column("c", DataTypes.STRING().notNull())
+                        .withComment("column comment")
                         .columnByExpression("d", "a*(b+2 + a*b)")
                         .column(
                                 "e",
@@ -2030,7 +2140,8 @@ public class SqlToOperationConverterTest {
                 Schema.newBuilder()
                         .column("a", DataTypes.INT().notNull())
                         .column("b", DataTypes.BIGINT().notNull())
-                        .column("c", DataTypes.STRING())
+                        .column("c", DataTypes.STRING().notNull())
+                        .withComment("column comment")
                         .columnByExpression("d", "a*(b+2 + a*b)")
                         .column(
                                 "e",
@@ -2063,6 +2174,7 @@ public class SqlToOperationConverterTest {
                 tableIdentifier,
                 Schema.newBuilder()
                         .column("c", DataTypes.BIGINT())
+                        .withComment("column comment")
                         .column("a", DataTypes.INT().notNull())
                         .column("b", DataTypes.BIGINT().notNull())
                         .columnByExpression("d", "`a` + 2")
@@ -2097,7 +2209,8 @@ public class SqlToOperationConverterTest {
                         .withComment("change g")
                         .column("a", DataTypes.INT().notNull())
                         .column("b", DataTypes.BIGINT().notNull())
-                        .column("c", DataTypes.STRING())
+                        .column("c", DataTypes.STRING().notNull())
+                        .withComment("column comment")
                         .columnByExpression("d", "a*(b+2 + a*b)")
                         .columnByMetadata("e", DataTypes.INT(), null, true)
                         .column("f", DataTypes.TIMESTAMP(3).notNull())
@@ -2183,6 +2296,7 @@ public class SqlToOperationConverterTest {
                         .column("a", DataTypes.INT().notNull())
                         .column("b", DataTypes.BIGINT().notNull())
                         .column("c", DataTypes.STRING().notNull())
+                        .withComment("column comment")
                         .columnByExpression("d", "a*(b+2 + a*b)")
                         .column(
                                 "e",
@@ -2670,7 +2784,8 @@ public class SqlToOperationConverterTest {
                 Schema.newBuilder()
                         .column("a", DataTypes.INT().notNull())
                         .column("b", DataTypes.BIGINT().notNull())
-                        .column("c", DataTypes.STRING())
+                        .column("c", DataTypes.STRING().notNull())
+                        .withComment("column comment")
                         .columnByExpression("d", "a*(b+2 + a*b)")
                         .column(
                                 "e",
@@ -2689,10 +2804,17 @@ public class SqlToOperationConverterTest {
         if (!managedTable) {
             options.put("connector", "dummy");
         }
-        if (numOfPkFields == 1) {
+        if (numOfPkFields == 0) {
+            // do nothing
+        } else if (numOfPkFields == 1) {
             builder.primaryKeyNamed("ct1", "a");
         } else if (numOfPkFields == 2) {
             builder.primaryKeyNamed("ct1", "a", "b");
+        } else if (numOfPkFields == 3) {
+            builder.primaryKeyNamed("ct1", "a", "b", "c");
+        } else {
+            throw new IllegalArgumentException(
+                    String.format("Don't support to set pk with %s fields.", numOfPkFields));
         }
 
         if (hasWatermark) {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index d9a7e4cdd27..a78bb5a07cc 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -50,7 +50,7 @@ import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
 import org.junit.rules.{ExpectedException, TemporaryFolder}
 
 import java.io.File
-import java.util.UUID
+import java.util.{Collections, UUID}
 
 import scala.annotation.meta.getter
 
@@ -796,6 +796,28 @@ class TableEnvironmentTest {
     checkData(expectedResult.iterator(), tableResult.collect())
   }
 
+  @Test
+  def testAlterRenameColumn(): Unit = {
+    tableEnv.executeSql("""
+                          |CREATE TABLE MyTable (
+                          | a bigint
+                          |) WITH (
+                          |  'connector' = 'COLLECTION',
+                          |  'is-bounded' = 'false'
+                          |)
+                          |""".stripMargin)
+    tableEnv.executeSql("""
+                          |ALTER TABLE MyTable RENAME a TO b
+                          |""".stripMargin)
+    val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+    checkData(
+      Collections
+        .singletonList(Row.of("b", "BIGINT", Boolean.box(true), null, null, null))
+        .iterator(),
+      tableResult.collect())
+  }
+
   @Test
   def testAlterTableCompactOnNonManagedTable(): Unit = {
     val statement =