You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/12/29 02:05:37 UTC

[flink] branch master updated (4919dcf5e9d -> 94fba321d4e)

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 4919dcf5e9d [FLINK-22316][table] Support MODIFY column/constraint/watermark for ALTER TABLE statement
     add 291fee0e1e1 [FLINK-22318][table] Support RENAME column name for ALTER TABLE statement.
     new 94fba321d4e [FLINK-22318][table] Support RENAME column name for ALTER TABLE statement, optimize the code implementation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/test/resources/sql/table.q                 |  36 ++++-
 .../src/main/codegen/data/Parser.tdd               |   1 +
 .../src/main/codegen/includes/parserImpls.ftl      |  26 ++-
 ...Modules.java => SqlAlterTableRenameColumn.java} |  60 ++++---
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  27 ++--
 .../planner/expressions/ColumnReferenceFinder.java | 108 +++++++++++++
 .../planner/operations/AlterSchemaConverter.java   | 177 ++++++++++++++++++--
 .../operations/SqlToOperationConverter.java        |  15 ++
 .../planner/utils/OperationConverterUtils.java     |  10 +-
 .../table/planner/plan/utils/FlinkRexUtil.scala    |   2 +-
 .../operations/SqlToOperationConverterTest.java    | 180 +++++++++++++++++----
 .../flink/table/api/TableEnvironmentTest.scala     |  24 ++-
 12 files changed, 563 insertions(+), 103 deletions(-)
 copy flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/{SqlUseModules.java => SqlAlterTableRenameColumn.java} (51%)
 create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java


[flink] 01/01: [FLINK-22318][table] Support RENAME column name for ALTER TABLE statement, optimize the code implementation

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 94fba321d4e29f996c8234c9dcb3b7a93ad8a593
Author: fengli <ld...@163.com>
AuthorDate: Fri Apr 1 20:50:56 2022 +0800

    [FLINK-22318][table] Support RENAME column name for ALTER TABLE statement, optimize the code implementation
    
    This closes #19329
---
 .../src/test/resources/sql/table.q                 |   36 +-
 .../src/main/codegen/includes/parserImpls.ftl      |   11 +-
 .../sql/parser/ddl/SqlAlterTableRenameColumn.java  |   53 +-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |   32 +-
 .../operations/SqlToOperationConverter.java        |    0
 .../operations/SqlToOperationConverterTest.java    | 1548 --------------------
 .../planner/expressions/ColumnReferenceFinder.java |  108 ++
 .../planner/operations/AlterSchemaConverter.java   |  177 ++-
 .../operations/SqlToOperationConverter.java        |   15 +
 .../planner/utils/OperationConverterUtils.java     |  125 +-
 .../table/planner/plan/utils/FlinkRexUtil.scala    |    2 +-
 .../operations/SqlToOperationConverterTest.java    |  180 ++-
 .../flink/table/api/TableEnvironmentTest.scala     |   24 +-
 13 files changed, 554 insertions(+), 1757 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index 61d944f7b29..36809085117 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -395,6 +395,30 @@ alter table orders2 reset ();
 org.apache.flink.table.api.ValidationException: ALTER TABLE RESET does not support empty key
 !error
 
+# ==========================================================================
+# test alter table rename column
+# ==========================================================================
+
+alter table orders2 rename amount to amount1;
+[INFO] Execute statement succeed.
+!info
+
+# verify table options using SHOW CREATE TABLE
+show create table orders2;
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+  `user` BIGINT NOT NULL,
+  `product` VARCHAR(32),
+  `amount1` INT,
+  `ts` TIMESTAMP(3),
+  `ptime` AS PROCTIME(),
+  WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
+  CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED
+) WITH (
+  'connector' = 'datagen'
+)
+
+!ok
+
 # ==========================================================================
 # test alter table add schema
 # ==========================================================================
@@ -424,7 +448,7 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` (
   `user` BIGINT NOT NULL,
   `product_id` BIGINT NOT NULL,
   `product` VARCHAR(32),
-  `amount` INT,
+  `amount1` INT,
   `ts` TIMESTAMP(3),
   `ptime` AS PROCTIME(),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
@@ -449,7 +473,7 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` (
   `product_id` BIGINT NOT NULL,
   `product` VARCHAR(32),
   `cleaned_product` AS COALESCE(`product`, 'missing_sku'),
-  `amount` INT,
+  `amount1` INT,
   `ts` TIMESTAMP(3),
   `ptime` AS PROCTIME(),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
@@ -477,7 +501,7 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` (
   `product_id` BIGINT NOT NULL,
   `product` VARCHAR(32),
   `cleaned_product` AS COALESCE(`product`, 'missing_sku'),
-  `amount` INT,
+  `amount1` INT,
   `ts` TIMESTAMP(3),
   `ptime` AS PROCTIME(),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
@@ -503,7 +527,7 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` (
   `product_id` BIGINT NOT NULL,
   `product` VARCHAR(32),
   `cleaned_product` AS COALESCE(`product`, 'missing_sku'),
-  `amount` INT,
+  `amount1` INT,
   `ptime` AS PROCTIME(),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE,
   CONSTRAINT `order_constraint` PRIMARY KEY (`trade_order_id`) NOT ENFORCED
@@ -530,7 +554,7 @@ describe orders2;
 |      product_id |                      BIGINT | FALSE |                     |                                       |                            |
 |         product |                 VARCHAR(32) |  TRUE |                     |                                       |                            |
 | cleaned_product |                 VARCHAR(32) | FALSE |                     | AS COALESCE(`product`, 'missing_sku') |                            |
-|          amount |                         INT |  TRUE |                     |                                       |                            |
+|         amount1 |                         INT |  TRUE |                     |                                       |                            |
 |           ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |                     |                         AS PROCTIME() |                            |
 +-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
 9 rows in set
@@ -548,7 +572,7 @@ desc orders2;
 |      product_id |                      BIGINT | FALSE |                     |                                       |                            |
 |         product |                 VARCHAR(32) |  TRUE |                     |                                       |                            |
 | cleaned_product |                 VARCHAR(32) | FALSE |                     | AS COALESCE(`product`, 'missing_sku') |                            |
-|          amount |                         INT |  TRUE |                     |                                       |                            |
+|         amount1 |                         INT |  TRUE |                     |                                       |                            |
 |           ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |                     |                         AS PROCTIME() |                            |
 +-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
 9 rows in set
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index d03b987b9b1..57424abc495 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -595,8 +595,8 @@ SqlAlterTable SqlAlterTable() :
     SqlNodeList partitionSpec = null;
     SqlIdentifier constraintName;
     SqlTableConstraint constraint;
-    SqlIdentifier originColumnName;
-    SqlIdentifier newColumnName;
+    SqlIdentifier originColumnIdentifier;
+    SqlIdentifier newColumnIdentifier;
     AlterTableContext ctx = new AlterTableContext();
 }
 {
@@ -614,14 +614,15 @@ SqlAlterTable SqlAlterTable() :
         }
     |
         <RENAME>
-            originColumnName = SimpleIdentifier()
+            originColumnIdentifier = CompoundIdentifier()
         <TO>
-            newColumnName = SimpleIdentifier()
+            newColumnIdentifier = CompoundIdentifier()
         {
             return new SqlAlterTableRenameColumn(
                     startPos.plus(getPos()),
                     tableIdentifier,
-                    originColumnName,newColumnName);
+                    originColumnIdentifier,
+                    newColumnIdentifier);
         }
     |
         <RESET>
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
index c75e2ae1280..28196183ff7 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.sql.parser.ddl;
 
 import org.apache.calcite.sql.SqlIdentifier;
@@ -9,34 +27,35 @@ import org.apache.calcite.util.ImmutableNullableList;
 import java.util.List;
 
 /**
- * ALTER TABLE [[catalogName.] dataBasesName].tableName
- * RENAME  originColumnName TO newColumnName
+ * ALTER TABLE [[catalogName.] dataBasesName].tableName RENAME originColumnName TO newColumnName.
  */
 public class SqlAlterTableRenameColumn extends SqlAlterTable {
 
-    private final SqlIdentifier originColumnNameIdentifier;
-    private final SqlIdentifier newColumnNameIdentifier;
+    private final SqlIdentifier originColumnIdentifier;
+    private final SqlIdentifier newColumnIdentifier;
 
-    public SqlAlterTableRenameColumn(SqlParserPos pos,
-                                     SqlIdentifier tableName,
-                                     SqlIdentifier originColumnName,
-                                     SqlIdentifier newColumnName) {
+    public SqlAlterTableRenameColumn(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlIdentifier originColumnIdentifier,
+            SqlIdentifier newColumnIdentifier) {
         super(pos, tableName, null);
-        this.originColumnNameIdentifier = originColumnName;
-        this.newColumnNameIdentifier = newColumnName;
+        this.originColumnIdentifier = originColumnIdentifier;
+        this.newColumnIdentifier = newColumnIdentifier;
     }
 
     @Override
     public List<SqlNode> getOperandList() {
-        return ImmutableNullableList.of(tableIdentifier, originColumnNameIdentifier, newColumnNameIdentifier);
+        return ImmutableNullableList.of(
+                tableIdentifier, originColumnIdentifier, newColumnIdentifier);
     }
 
-    public SqlIdentifier getOriginColumnNameIdentifier() {
-        return originColumnNameIdentifier;
+    public SqlIdentifier getOriginColumnIdentifier() {
+        return originColumnIdentifier;
     }
 
-    public SqlIdentifier getNewColumnNameIdentifier() {
-        return newColumnNameIdentifier;
+    public SqlIdentifier getNewColumnIdentifier() {
+        return newColumnIdentifier;
     }
 
     @Override
@@ -44,8 +63,8 @@ public class SqlAlterTableRenameColumn extends SqlAlterTable {
         writer.keyword("ALTER TABLE");
         tableIdentifier.unparse(writer, leftPrec, rightPrec);
         writer.keyword("RENAME");
-        originColumnNameIdentifier.unparse(writer, leftPrec, rightPrec);
+        originColumnIdentifier.unparse(writer, leftPrec, rightPrec);
         writer.keyword("TO");
-        newColumnNameIdentifier.unparse(writer, leftPrec, rightPrec);
+        newColumnIdentifier.unparse(writer, leftPrec, rightPrec);
     }
 }
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 065743bcbad..fea81a9fe2d 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -315,26 +315,18 @@ class FlinkSqlParserImplTest extends SqlParserTest {
     void testAlterTable() {
         sql("alter table t1 rename to t2").ok("ALTER TABLE `T1` RENAME TO `T2`");
         sql("alter table c1.d1.t1 rename to t2").ok("ALTER TABLE `C1`.`D1`.`T1` RENAME TO `T2`");
-        final String sql0 = "alter table t1 set ('key1'='value1')";
-        final String expected0 = "ALTER TABLE `T1` SET (\n" + "  'key1' = 'value1'\n" + ")";
-        sql(sql0).ok(expected0);
-        final String sql1 = "alter table t1 " + "add constraint ct1 primary key(a, b) not enforced";
-        final String expected1 =
-                "ALTER TABLE `T1` ADD (\n"
-                        + "  CONSTRAINT `CT1` PRIMARY KEY (`A`, `B`) NOT ENFORCED\n"
-                        + ")";
-        sql(sql1).ok(expected1);
-        final String sql2 = "alter table t1 " + "add unique(a, b)";
-        final String expected2 = "ALTER TABLE `T1` ADD (\n" + "  UNIQUE (`A`, `B`)\n" + ")";
-        sql(sql2).ok(expected2);
-        final String sql3 = "alter table t1 drop constraint ct1";
-        final String expected3 = "ALTER TABLE `T1` DROP CONSTRAINT `CT1`";
-        sql(sql3).ok(expected3);
-
-        final String sql4 = "alter table t1 rename a to b";
-        final String expected4 = "ALTER TABLE `T1` RENAME `A` TO `B`";
-        sql(sql4).ok(expected4);
-
+        sql("alter table t1 set ('key1'='value1')")
+                .ok("ALTER TABLE `T1` SET (\n" + "  'key1' = 'value1'\n" + ")");
+        sql("alter table t1 add constraint ct1 primary key(a, b) not enforced")
+                .ok(
+                        "ALTER TABLE `T1` ADD (\n"
+                                + "  CONSTRAINT `CT1` PRIMARY KEY (`A`, `B`) NOT ENFORCED\n"
+                                + ")");
+        sql("alter table t1 " + "add unique(a, b)")
+                .ok("ALTER TABLE `T1` ADD (\n" + "  UNIQUE (`A`, `B`)\n" + ")");
+        sql("alter table t1 drop constraint ct1").ok("ALTER TABLE `T1` DROP CONSTRAINT `CT1`");
+        sql("alter table t1 rename a to b").ok("ALTER TABLE `T1` RENAME `A` TO `B`");
+        sql("alter table t1 rename a.x to a.y").ok("ALTER TABLE `T1` RENAME `A`.`X` TO `A`.`Y`");
     }
 
     @Test
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
deleted file mode 100644
index e69de29bb2d..00000000000
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
deleted file mode 100644
index 1ccd855972b..00000000000
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ /dev/null
@@ -1,1548 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.operations;
-
-import org.apache.flink.sql.parser.ddl.SqlCreateTable;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.SqlDialect;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableColumn.ComputedColumn;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.config.TableConfigOptions;
-import org.apache.flink.table.api.constraints.UniqueConstraint;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogFunctionImpl;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.expressions.SqlCallExpression;
-import org.apache.flink.table.module.ModuleManager;
-import org.apache.flink.table.operations.BeginStatementSetOperation;
-import org.apache.flink.table.operations.CatalogSinkModifyOperation;
-import org.apache.flink.table.operations.EndStatementSetOperation;
-import org.apache.flink.table.operations.LoadModuleOperation;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.ShowFunctionsOperation;
-import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope;
-import org.apache.flink.table.operations.ShowModulesOperation;
-import org.apache.flink.table.operations.UnloadModuleOperation;
-import org.apache.flink.table.operations.UseCatalogOperation;
-import org.apache.flink.table.operations.UseDatabaseOperation;
-import org.apache.flink.table.operations.UseModulesOperation;
-import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
-import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
-import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
-import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
-import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
-import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.operations.ddl.CreateViewOperation;
-import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
-import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
-import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
-import org.apache.flink.table.planner.delegation.ParserImpl;
-import org.apache.flink.table.planner.delegation.PlannerContext;
-import org.apache.flink.table.planner.expressions.utils.Func0$;
-import org.apache.flink.table.planner.expressions.utils.Func1$;
-import org.apache.flink.table.planner.expressions.utils.Func8$;
-import org.apache.flink.table.planner.parse.CalciteParser;
-import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.utils.CatalogManagerMocks;
-import org.apache.flink.table.utils.ExpressionResolverMocks;
-
-import org.apache.calcite.sql.SqlNode;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import javax.annotation.Nullable;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
-import static org.apache.flink.table.planner.utils.OperationMatchers.entry;
-import static org.apache.flink.table.planner.utils.OperationMatchers.isCreateTableOperation;
-import static org.apache.flink.table.planner.utils.OperationMatchers.partitionedBy;
-import static org.apache.flink.table.planner.utils.OperationMatchers.withOptions;
-import static org.apache.flink.table.planner.utils.OperationMatchers.withSchema;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-/** Test cases for {@link SqlToOperationConverter}. */
-public class SqlToOperationConverterTest {
-    private final boolean isStreamingMode = false;
-    private final TableConfig tableConfig = new TableConfig();
-    private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default");
-    private final CatalogManager catalogManager =
-            CatalogManagerMocks.preparedCatalogManager().defaultCatalog("builtin", catalog).build();
-    private final ModuleManager moduleManager = new ModuleManager();
-    private final FunctionCatalog functionCatalog =
-            new FunctionCatalog(tableConfig, catalogManager, moduleManager);
-    private final Supplier<FlinkPlannerImpl> plannerSupplier =
-            () ->
-                    getPlannerContext()
-                            .createFlinkPlanner(
-                                    catalogManager.getCurrentCatalog(),
-                                    catalogManager.getCurrentDatabase());
-    private final Parser parser =
-            new ParserImpl(
-                    catalogManager,
-                    plannerSupplier,
-                    () -> plannerSupplier.get().parser(),
-                    t ->
-                            getPlannerContext()
-                                    .createSqlExprToRexConverter(
-                                            getPlannerContext()
-                                                    .getTypeFactory()
-                                                    .buildRelNodeRowType(t)));
-    private final PlannerContext plannerContext =
-            new PlannerContext(
-                    tableConfig,
-                    functionCatalog,
-                    catalogManager,
-                    asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
-                    Collections.emptyList());
-
-    private PlannerContext getPlannerContext() {
-        return plannerContext;
-    }
-
-    @Rule public ExpectedException thrown = ExpectedException.none();
-
-    @Before
-    public void before() throws TableAlreadyExistException, DatabaseNotExistException {
-        catalogManager.initSchemaResolver(
-                isStreamingMode,
-                ExpressionResolverMocks.basicResolver(catalogManager, functionCatalog, parser));
-
-        final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
-        final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
-        final TableSchema tableSchema =
-                TableSchema.builder()
-                        .field("a", DataTypes.BIGINT())
-                        .field("b", DataTypes.VARCHAR(Integer.MAX_VALUE))
-                        .field("c", DataTypes.INT())
-                        .field("d", DataTypes.VARCHAR(Integer.MAX_VALUE))
-                        .build();
-        Map<String, String> options = new HashMap<>();
-        options.put("connector", "COLLECTION");
-        final CatalogTable catalogTable = new CatalogTableImpl(tableSchema, options, "");
-        catalog.createTable(path1, catalogTable, true);
-        catalog.createTable(path2, catalogTable, true);
-    }
-
-    @After
-    public void after() throws TableNotExistException {
-        final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
-        final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
-        catalog.dropTable(path1, true);
-        catalog.dropTable(path2, true);
-    }
-
-    @Test
-    public void testUseCatalog() {
-        final String sql = "USE CATALOG cat1";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof UseCatalogOperation;
-        assertEquals("cat1", ((UseCatalogOperation) operation).getCatalogName());
-    }
-
-    @Test
-    public void testUseDatabase() {
-        final String sql1 = "USE db1";
-        Operation operation1 = parse(sql1, SqlDialect.DEFAULT);
-        assert operation1 instanceof UseDatabaseOperation;
-        assertEquals("builtin", ((UseDatabaseOperation) operation1).getCatalogName());
-        assertEquals("db1", ((UseDatabaseOperation) operation1).getDatabaseName());
-
-        final String sql2 = "USE cat1.db1";
-        Operation operation2 = parse(sql2, SqlDialect.DEFAULT);
-        assert operation2 instanceof UseDatabaseOperation;
-        assertEquals("cat1", ((UseDatabaseOperation) operation2).getCatalogName());
-        assertEquals("db1", ((UseDatabaseOperation) operation2).getDatabaseName());
-    }
-
-    @Test(expected = ValidationException.class)
-    public void testUseDatabaseWithException() {
-        final String sql = "USE cat1.db1.tbl1";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-    }
-
-    @Test
-    public void testCreateDatabase() {
-        final String[] createDatabaseSqls =
-                new String[] {
-                    "create database db1",
-                    "create database if not exists cat1.db1",
-                    "create database cat1.db1 comment 'db1_comment'",
-                    "create database cat1.db1 comment 'db1_comment' with ('k1' = 'v1', 'K2' = 'V2')"
-                };
-        final String[] expectedCatalogs = new String[] {"builtin", "cat1", "cat1", "cat1"};
-        final String expectedDatabase = "db1";
-        final String[] expectedComments = new String[] {null, null, "db1_comment", "db1_comment"};
-        final boolean[] expectedIgnoreIfExists = new boolean[] {false, true, false, false};
-        Map<String, String> properties = new HashMap<>();
-        properties.put("k1", "v1");
-        properties.put("K2", "V2");
-        final Map[] expectedProperties =
-                new Map[] {
-                    new HashMap<String, String>(),
-                    new HashMap<String, String>(),
-                    new HashMap<String, String>(),
-                    new HashMap(properties)
-                };
-
-        for (int i = 0; i < createDatabaseSqls.length; i++) {
-            Operation operation = parse(createDatabaseSqls[i], SqlDialect.DEFAULT);
-            assert operation instanceof CreateDatabaseOperation;
-            final CreateDatabaseOperation createDatabaseOperation =
-                    (CreateDatabaseOperation) operation;
-            assertEquals(expectedCatalogs[i], createDatabaseOperation.getCatalogName());
-            assertEquals(expectedDatabase, createDatabaseOperation.getDatabaseName());
-            assertEquals(
-                    expectedComments[i], createDatabaseOperation.getCatalogDatabase().getComment());
-            assertEquals(expectedIgnoreIfExists[i], createDatabaseOperation.isIgnoreIfExists());
-            assertEquals(
-                    expectedProperties[i],
-                    createDatabaseOperation.getCatalogDatabase().getProperties());
-        }
-    }
-
-    @Test
-    public void testDropDatabase() {
-        final String[] dropDatabaseSqls =
-                new String[] {
-                    "drop database db1",
-                    "drop database if exists db1",
-                    "drop database if exists cat1.db1 CASCADE",
-                    "drop database if exists cat1.db1 RESTRICT"
-                };
-        final String[] expectedCatalogs = new String[] {"builtin", "builtin", "cat1", "cat1"};
-        final String expectedDatabase = "db1";
-        final boolean[] expectedIfExists = new boolean[] {false, true, true, true};
-        final boolean[] expectedIsCascades = new boolean[] {false, false, true, false};
-
-        for (int i = 0; i < dropDatabaseSqls.length; i++) {
-            Operation operation = parse(dropDatabaseSqls[i], SqlDialect.DEFAULT);
-            assert operation instanceof DropDatabaseOperation;
-            final DropDatabaseOperation dropDatabaseOperation = (DropDatabaseOperation) operation;
-            assertEquals(expectedCatalogs[i], dropDatabaseOperation.getCatalogName());
-            assertEquals(expectedDatabase, dropDatabaseOperation.getDatabaseName());
-            assertEquals(expectedIfExists[i], dropDatabaseOperation.isIfExists());
-            assertEquals(expectedIsCascades[i], dropDatabaseOperation.isCascade());
-        }
-    }
-
-    @Test
-    public void testAlterDatabase() throws Exception {
-        catalogManager.registerCatalog("cat1", new GenericInMemoryCatalog("default", "default"));
-        catalogManager
-                .getCatalog("cat1")
-                .get()
-                .createDatabase(
-                        "db1", new CatalogDatabaseImpl(new HashMap<>(), "db1_comment"), true);
-        final String sql = "alter database cat1.db1 set ('k1'='v1', 'K2'='V2')";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof AlterDatabaseOperation;
-        Map<String, String> properties = new HashMap<>();
-        properties.put("k1", "v1");
-        properties.put("K2", "V2");
-        assertEquals("db1", ((AlterDatabaseOperation) operation).getDatabaseName());
-        assertEquals("cat1", ((AlterDatabaseOperation) operation).getCatalogName());
-        assertEquals(
-                "db1_comment",
-                ((AlterDatabaseOperation) operation).getCatalogDatabase().getComment());
-        assertEquals(
-                properties,
-                ((AlterDatabaseOperation) operation).getCatalogDatabase().getProperties());
-    }
-
-    @Test
-    public void testLoadModule() {
-        final String sql = "LOAD MODULE dummy WITH ('k1' = 'v1', 'k2' = 'v2')";
-        final String expectedModuleName = "dummy";
-        final Map<String, String> expectedProperties = new HashMap<>();
-        expectedProperties.put("k1", "v1");
-        expectedProperties.put("k2", "v2");
-
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof LoadModuleOperation;
-        final LoadModuleOperation loadModuleOperation = (LoadModuleOperation) operation;
-
-        assertEquals(expectedModuleName, loadModuleOperation.getModuleName());
-        assertEquals(expectedProperties, loadModuleOperation.getProperties());
-    }
-
-    @Test
-    public void testUnloadModule() {
-        final String sql = "UNLOAD MODULE dummy";
-        final String expectedModuleName = "dummy";
-
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof UnloadModuleOperation;
-        final UnloadModuleOperation unloadModuleOperation = (UnloadModuleOperation) operation;
-
-        assertEquals(expectedModuleName, unloadModuleOperation.getModuleName());
-    }
-
-    @Test
-    public void testUseOneModule() {
-        final String sql = "USE MODULES dummy";
-        final List<String> expectedModuleNames = Collections.singletonList("dummy");
-
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof UseModulesOperation;
-        final UseModulesOperation useModulesOperation = (UseModulesOperation) operation;
-
-        assertEquals(expectedModuleNames, useModulesOperation.getModuleNames());
-        assertEquals("USE MODULES: [dummy]", useModulesOperation.asSummaryString());
-    }
-
-    @Test
-    public void testUseMultipleModules() {
-        final String sql = "USE MODULES x, y, z";
-        final List<String> expectedModuleNames = Arrays.asList("x", "y", "z");
-
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof UseModulesOperation;
-        final UseModulesOperation useModulesOperation = (UseModulesOperation) operation;
-
-        assertEquals(expectedModuleNames, useModulesOperation.getModuleNames());
-        assertEquals("USE MODULES: [x, y, z]", useModulesOperation.asSummaryString());
-    }
-
-    @Test
-    public void testShowModules() {
-        final String sql = "SHOW MODULES";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof ShowModulesOperation;
-        final ShowModulesOperation showModulesOperation = (ShowModulesOperation) operation;
-
-        assertFalse(showModulesOperation.requireFull());
-        assertEquals("SHOW MODULES", showModulesOperation.asSummaryString());
-    }
-
-    @Test
-    public void testShowFullModules() {
-        final String sql = "SHOW FULL MODULES";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof ShowModulesOperation;
-        final ShowModulesOperation showModulesOperation = (ShowModulesOperation) operation;
-
-        assertTrue(showModulesOperation.requireFull());
-        assertEquals("SHOW FULL MODULES", showModulesOperation.asSummaryString());
-    }
-
-    @Test
-    public void testShowFunctions() {
-        final String sql1 = "SHOW FUNCTIONS";
-        assertShowFunctions(sql1, sql1, FunctionScope.ALL);
-
-        final String sql2 = "SHOW USER FUNCTIONS";
-        assertShowFunctions(sql2, sql2, FunctionScope.USER);
-    }
-
-    @Test
-    public void testCreateTable() {
-        final String sql =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a bigint,\n"
-                        + "  b varchar, \n"
-                        + "  c int, \n"
-                        + "  d varchar"
-                        + ")\n"
-                        + "  PARTITIONED BY (a, d)\n"
-                        + "  with (\n"
-                        + "    'connector' = 'kafka', \n"
-                        + "    'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assert operation instanceof CreateTableOperation;
-        CreateTableOperation op = (CreateTableOperation) operation;
-        CatalogTable catalogTable = op.getCatalogTable();
-        assertEquals(Arrays.asList("a", "d"), catalogTable.getPartitionKeys());
-        assertArrayEquals(
-                catalogTable.getSchema().getFieldNames(), new String[] {"a", "b", "c", "d"});
-        assertArrayEquals(
-                catalogTable.getSchema().getFieldDataTypes(),
-                new DataType[] {
-                    DataTypes.BIGINT(),
-                    DataTypes.VARCHAR(Integer.MAX_VALUE),
-                    DataTypes.INT(),
-                    DataTypes.VARCHAR(Integer.MAX_VALUE)
-                });
-    }
-
-    @Test
-    public void testCreateTableWithPrimaryKey() {
-        final String sql =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a bigint,\n"
-                        + "  b varchar, \n"
-                        + "  c int, \n"
-                        + "  d varchar, \n"
-                        + "  constraint ct1 primary key(a, b) not enforced\n"
-                        + ") with (\n"
-                        + "  'connector' = 'kafka', \n"
-                        + "  'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assert operation instanceof CreateTableOperation;
-        CreateTableOperation op = (CreateTableOperation) operation;
-        CatalogTable catalogTable = op.getCatalogTable();
-        TableSchema tableSchema = catalogTable.getSchema();
-        assertThat(
-                tableSchema
-                        .getPrimaryKey()
-                        .map(UniqueConstraint::asSummaryString)
-                        .orElse("fakeVal"),
-                is("CONSTRAINT ct1 PRIMARY KEY (a, b)"));
-        assertArrayEquals(new String[] {"a", "b", "c", "d"}, tableSchema.getFieldNames());
-        assertArrayEquals(
-                new DataType[] {
-                    DataTypes.BIGINT().notNull(),
-                    DataTypes.STRING().notNull(),
-                    DataTypes.INT(),
-                    DataTypes.STRING()
-                },
-                tableSchema.getFieldDataTypes());
-    }
-
-    @Test
-    public void testCreateTableWithPrimaryKeyEnforced() {
-        final String sql =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a bigint,\n"
-                        + "  b varchar, \n"
-                        + "  c int, \n"
-                        + "  d varchar, \n"
-                        +
-                        // Default is enforced.
-                        "  constraint ct1 primary key(a, b)\n"
-                        + ") with (\n"
-                        + "  'connector' = 'kafka', \n"
-                        + "  'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Flink doesn't support ENFORCED mode for PRIMARY KEY "
-                        + "constaint. ENFORCED/NOT ENFORCED  controls if the constraint "
-                        + "checks are performed on the incoming/outgoing data. "
-                        + "Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode");
-        parse(sql, planner, parser);
-    }
-
-    @Test
-    public void testCreateTableWithUniqueKey() {
-        final String sql =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a bigint,\n"
-                        + "  b varchar, \n"
-                        + "  c int, \n"
-                        + "  d varchar, \n"
-                        + "  constraint ct1 unique (a, b) not enforced\n"
-                        + ") with (\n"
-                        + "  'connector' = 'kafka', \n"
-                        + "  'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        thrown.expect(UnsupportedOperationException.class);
-        thrown.expectMessage("UNIQUE constraint is not supported yet");
-        parse(sql, planner, parser);
-    }
-
-    @Test
-    public void testPrimaryKeyOnGeneratedColumn() {
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Could not create a PRIMARY KEY with column 'c' at line 5, column 34.\n"
-                        + "A PRIMARY KEY constraint must be declared on physical columns.");
-        final String sql2 =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a bigint not null,\n"
-                        + "  b varchar not null,\n"
-                        + "  c as 2 * (a + 1),\n"
-                        + "  constraint ct1 primary key (b, c) not enforced"
-                        + ") with (\n"
-                        + "    'connector' = 'kafka',\n"
-                        + "    'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-        parseAndConvert(sql2);
-    }
-
-    @Test
-    public void testPrimaryKeyNonExistentColumn() {
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Primary key column 'd' is not defined in the schema at line 5, column 34");
-        final String sql2 =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a bigint not null,\n"
-                        + "  b varchar not null,\n"
-                        + "  c as 2 * (a + 1),\n"
-                        + "  constraint ct1 primary key (b, d) not enforced"
-                        + ") with (\n"
-                        + "    'connector' = 'kafka',\n"
-                        + "    'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-        parseAndConvert(sql2);
-    }
-
-    @Test
-    public void testCreateTableWithMinusInOptionKey() {
-        final String sql =
-                "create table source_table(\n"
-                        + "  a int,\n"
-                        + "  b bigint,\n"
-                        + "  c varchar\n"
-                        + ") with (\n"
-                        + "  'a-B-c-d124' = 'Ab',\n"
-                        + "  'a.b-c-d.e-f.g' = 'ada',\n"
-                        + "  'a.b-c-d.e-f1231.g' = 'ada',\n"
-                        + "  'a.b-c-d.*' = 'adad')\n";
-        final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        SqlNode node = parser.parse(sql);
-        assert node instanceof SqlCreateTable;
-        Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
-        assert operation instanceof CreateTableOperation;
-        CreateTableOperation op = (CreateTableOperation) operation;
-        CatalogTable catalogTable = op.getCatalogTable();
-        Map<String, String> options =
-                catalogTable.getOptions().entrySet().stream()
-                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-        Map<String, String> sortedProperties = new TreeMap<>(options);
-        final String expected =
-                "{a-B-c-d124=Ab, "
-                        + "a.b-c-d.*=adad, "
-                        + "a.b-c-d.e-f.g=ada, "
-                        + "a.b-c-d.e-f1231.g=ada}";
-        assertEquals(expected, sortedProperties.toString());
-    }
-
-    @Test
-    public void testCreateTableWithWatermark()
-            throws FunctionAlreadyExistException, DatabaseNotExistException {
-        CatalogFunction cf =
-                new CatalogFunctionImpl(JavaUserDefinedScalarFunctions.JavaFunc5.class.getName());
-        catalog.createFunction(ObjectPath.fromString("default.myfunc"), cf, true);
-
-        final String sql =
-                "create table source_table(\n"
-                        + "  a int,\n"
-                        + "  b bigint,\n"
-                        + "  c timestamp(3),\n"
-                        + "  watermark for `c` as myfunc(c, 1) - interval '5' second\n"
-                        + ") with (\n"
-                        + "  'connector.type' = 'kafka')\n";
-        final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        SqlNode node = parser.parse(sql);
-        assert node instanceof SqlCreateTable;
-        Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
-        assert operation instanceof CreateTableOperation;
-        CreateTableOperation op = (CreateTableOperation) operation;
-        CatalogTable catalogTable = op.getCatalogTable();
-        Map<String, String> properties = catalogTable.toProperties();
-        Map<String, String> expected = new HashMap<>();
-        expected.put("schema.0.name", "a");
-        expected.put("schema.0.data-type", "INT");
-        expected.put("schema.1.name", "b");
-        expected.put("schema.1.data-type", "BIGINT");
-        expected.put("schema.2.name", "c");
-        expected.put("schema.2.data-type", "TIMESTAMP(3)");
-        expected.put("schema.watermark.0.rowtime", "c");
-        expected.put(
-                "schema.watermark.0.strategy.expr",
-                "`builtin`.`default`.`myfunc`(`c`, 1) - INTERVAL '5' SECOND");
-        expected.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
-        expected.put("connector.type", "kafka");
-        assertEquals(expected, properties);
-    }
-
-    @Test
-    public void testBasicCreateTableLike() {
-        Map<String, String> sourceProperties = new HashMap<>();
-        sourceProperties.put("format.type", "json");
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("f0", DataTypes.INT().notNull())
-                                .column("f1", DataTypes.TIMESTAMP(3))
-                                .build(),
-                        null,
-                        Collections.emptyList(),
-                        sourceProperties);
-
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
-        final String sql =
-                "create table derivedTable(\n"
-                        + "  a int,\n"
-                        + "  watermark for f1 as `f1` - interval '5' second\n"
-                        + ")\n"
-                        + "PARTITIONED BY (a, f0)\n"
-                        + "with (\n"
-                        + "  'connector.type' = 'kafka'"
-                        + ")\n"
-                        + "like sourceTable";
-        Operation operation = parseAndConvert(sql);
-
-        assertThat(
-                operation,
-                isCreateTableOperation(
-                        withSchema(
-                                Schema.newBuilder()
-                                        .column("f0", DataTypes.INT().notNull())
-                                        .column("f1", DataTypes.TIMESTAMP(3))
-                                        .column("a", DataTypes.INT())
-                                        .watermark("f1", "`f1` - INTERVAL '5' SECOND")
-                                        .build()),
-                        withOptions(entry("connector.type", "kafka"), entry("format.type", "json")),
-                        partitionedBy("a", "f0")));
-    }
-
-    @Test
-    public void testCreateTableLikeWithFullPath() {
-        Map<String, String> sourceProperties = new HashMap<>();
-        sourceProperties.put("connector.type", "kafka");
-        sourceProperties.put("format.type", "json");
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("f0", DataTypes.INT().notNull())
-                                .column("f1", DataTypes.TIMESTAMP(3))
-                                .build(),
-                        null,
-                        Collections.emptyList(),
-                        sourceProperties);
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-        final String sql = "create table mytable like `builtin`.`default`.sourceTable";
-        Operation operation = parseAndConvert(sql);
-
-        assertThat(
-                operation,
-                isCreateTableOperation(
-                        withSchema(
-                                Schema.newBuilder()
-                                        .column("f0", DataTypes.INT().notNull())
-                                        .column("f1", DataTypes.TIMESTAMP(3))
-                                        .build()),
-                        withOptions(
-                                entry("connector.type", "kafka"), entry("format.type", "json"))));
-    }
-
-    @Test
-    public void testMergingCreateTableLike() {
-        Map<String, String> sourceProperties = new HashMap<>();
-        sourceProperties.put("format.type", "json");
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("f0", DataTypes.INT().notNull())
-                                .column("f1", DataTypes.TIMESTAMP(3))
-                                .columnByExpression("f2", "`f0` + 12345")
-                                .watermark("f1", "`f1` - interval '1' second")
-                                .build(),
-                        null,
-                        Arrays.asList("f0", "f1"),
-                        sourceProperties);
-
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
-        final String sql =
-                "create table derivedTable(\n"
-                        + "  a int,\n"
-                        + "  watermark for f1 as `f1` - interval '5' second\n"
-                        + ")\n"
-                        + "PARTITIONED BY (a, f0)\n"
-                        + "with (\n"
-                        + "  'connector.type' = 'kafka'"
-                        + ")\n"
-                        + "like sourceTable (\n"
-                        + "   EXCLUDING GENERATED\n"
-                        + "   EXCLUDING PARTITIONS\n"
-                        + "   OVERWRITING OPTIONS\n"
-                        + "   OVERWRITING WATERMARKS"
-                        + ")";
-        Operation operation = parseAndConvert(sql);
-
-        assertThat(
-                operation,
-                isCreateTableOperation(
-                        withSchema(
-                                Schema.newBuilder()
-                                        .column("f0", DataTypes.INT().notNull())
-                                        .column("f1", DataTypes.TIMESTAMP(3))
-                                        .column("a", DataTypes.INT())
-                                        .watermark("f1", "`f1` - INTERVAL '5' SECOND")
-                                        .build()),
-                        withOptions(entry("connector.type", "kafka"), entry("format.type", "json")),
-                        partitionedBy("a", "f0")));
-    }
-
-    @Test
-    public void testCreateTableInvalidPartition() {
-        final String sql =
-                "create table derivedTable(\n" + "  a int\n" + ")\n" + "PARTITIONED BY (f3)";
-
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Partition column 'f3' not defined in the table schema. Available columns: ['a']");
-        parseAndConvert(sql);
-    }
-
-    @Test
-    public void testCreateTableLikeInvalidPartition() {
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder().column("f0", DataTypes.INT().notNull()).build(),
-                        null,
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
-        final String sql =
-                "create table derivedTable(\n"
-                        + "  a int\n"
-                        + ")\n"
-                        + "PARTITIONED BY (f3)\n"
-                        + "like sourceTable";
-
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Partition column 'f3' not defined in the table schema. Available columns: ['f0', 'a']");
-        parseAndConvert(sql);
-    }
-
-    @Test
-    public void testCreateTableInvalidWatermark() {
-        final String sql =
-                "create table derivedTable(\n"
-                        + "  a int,\n"
-                        + "  watermark for f1 as `f1` - interval '5' second\n"
-                        + ")";
-
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "The rowtime attribute field 'f1' is not defined in the table schema,"
-                        + " at line 3, column 17\n"
-                        + "Available fields: ['a']");
-        parseAndConvert(sql);
-    }
-
-    @Test
-    public void testCreateTableLikeInvalidWatermark() {
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder().column("f0", DataTypes.INT().notNull()).build(),
-                        null,
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
-        final String sql =
-                "create table derivedTable(\n"
-                        + "  a int,\n"
-                        + "  watermark for f1 as `f1` - interval '5' second\n"
-                        + ")\n"
-                        + "like sourceTable";
-
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "The rowtime attribute field 'f1' is not defined in the table schema,"
-                        + " at line 3, column 17\n"
-                        + "Available fields: ['f0', 'a']");
-        parseAndConvert(sql);
-    }
-
-    @Test
-    public void testCreateTableLikeNestedWatermark() {
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("f0", DataTypes.INT().notNull())
-                                .column(
-                                        "f1",
-                                        DataTypes.ROW(
-                                                DataTypes.FIELD("tmstmp", DataTypes.TIMESTAMP(3))))
-                                .build(),
-                        null,
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
-        final String sql =
-                "create table derivedTable(\n"
-                        + "  a int,\n"
-                        + "  watermark for f1.t as f1.t - interval '5' second\n"
-                        + ")\n"
-                        + "like sourceTable";
-
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "The rowtime attribute field 'f1.t' is not defined in the table schema,"
-                        + " at line 3, column 20\n"
-                        + "Nested field 't' was not found in a composite type:"
-                        + " ROW<`tmstmp` TIMESTAMP(3)>.");
-        parseAndConvert(sql);
-    }
-
-    @Test
-    public void testSqlInsertWithStaticPartition() {
-        final String sql = "insert into t1 partition(a=1) select b, c, d from t2";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assert operation instanceof CatalogSinkModifyOperation;
-        CatalogSinkModifyOperation sinkModifyOperation = (CatalogSinkModifyOperation) operation;
-        final Map<String, String> expectedStaticPartitions = new HashMap<>();
-        expectedStaticPartitions.put("a", "1");
-        assertEquals(expectedStaticPartitions, sinkModifyOperation.getStaticPartitions());
-    }
-
-    @Test
-    public void testSqlInsertWithDynamicTableOptions() {
-        final String sql =
-                "insert into t1 /*+ OPTIONS('k1'='v1', 'k2'='v2') */\n"
-                        + "select a, b, c, d from t2";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assert operation instanceof CatalogSinkModifyOperation;
-        CatalogSinkModifyOperation sinkModifyOperation = (CatalogSinkModifyOperation) operation;
-        Map<String, String> dynamicOptions = sinkModifyOperation.getDynamicOptions();
-        assertNotNull(dynamicOptions);
-        assertThat(dynamicOptions.size(), is(2));
-        assertThat(dynamicOptions.toString(), is("{k1=v1, k2=v2}"));
-    }
-
-    @Test
-    public void testDynamicTableWithInvalidOptions() {
-        final String sql = "select * from t1 /*+ OPTIONS('opt1', 'opt2') */";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        thrown.expect(AssertionError.class);
-        thrown.expectMessage("Hint [OPTIONS] only support " + "non empty key value options");
-        parse(sql, planner, parser);
-    }
-
-    @Test // TODO: tweak the tests when FLINK-13604 is fixed.
-    public void testCreateTableWithFullDataTypes() {
-        final List<TestItem> testItems =
-                Arrays.asList(
-                        createTestItem("CHAR", DataTypes.CHAR(1)),
-                        createTestItem("CHAR NOT NULL", DataTypes.CHAR(1).notNull()),
-                        createTestItem("CHAR NULL", DataTypes.CHAR(1)),
-                        createTestItem("CHAR(33)", DataTypes.CHAR(33)),
-                        createTestItem("VARCHAR", DataTypes.STRING()),
-                        createTestItem("VARCHAR(33)", DataTypes.VARCHAR(33)),
-                        createTestItem("STRING", DataTypes.STRING()),
-                        createTestItem("BOOLEAN", DataTypes.BOOLEAN()),
-                        createTestItem("BINARY", DataTypes.BINARY(1)),
-                        createTestItem("BINARY(33)", DataTypes.BINARY(33)),
-                        createTestItem("VARBINARY", DataTypes.BYTES()),
-                        createTestItem("VARBINARY(33)", DataTypes.VARBINARY(33)),
-                        createTestItem("BYTES", DataTypes.BYTES()),
-                        createTestItem("DECIMAL", DataTypes.DECIMAL(10, 0)),
-                        createTestItem("DEC", DataTypes.DECIMAL(10, 0)),
-                        createTestItem("NUMERIC", DataTypes.DECIMAL(10, 0)),
-                        createTestItem("DECIMAL(10)", DataTypes.DECIMAL(10, 0)),
-                        createTestItem("DEC(10)", DataTypes.DECIMAL(10, 0)),
-                        createTestItem("NUMERIC(10)", DataTypes.DECIMAL(10, 0)),
-                        createTestItem("DECIMAL(10, 3)", DataTypes.DECIMAL(10, 3)),
-                        createTestItem("DEC(10, 3)", DataTypes.DECIMAL(10, 3)),
-                        createTestItem("NUMERIC(10, 3)", DataTypes.DECIMAL(10, 3)),
-                        createTestItem("TINYINT", DataTypes.TINYINT()),
-                        createTestItem("SMALLINT", DataTypes.SMALLINT()),
-                        createTestItem("INTEGER", DataTypes.INT()),
-                        createTestItem("INT", DataTypes.INT()),
-                        createTestItem("BIGINT", DataTypes.BIGINT()),
-                        createTestItem("FLOAT", DataTypes.FLOAT()),
-                        createTestItem("DOUBLE", DataTypes.DOUBLE()),
-                        createTestItem("DOUBLE PRECISION", DataTypes.DOUBLE()),
-                        createTestItem("DATE", DataTypes.DATE()),
-                        createTestItem("TIME", DataTypes.TIME()),
-                        createTestItem("TIME WITHOUT TIME ZONE", DataTypes.TIME()),
-                        // Expect to be TIME(3).
-                        createTestItem("TIME(3)", DataTypes.TIME()),
-                        // Expect to be TIME(3).
-                        createTestItem("TIME(3) WITHOUT TIME ZONE", DataTypes.TIME()),
-                        createTestItem("TIMESTAMP", DataTypes.TIMESTAMP(6)),
-                        createTestItem("TIMESTAMP WITHOUT TIME ZONE", DataTypes.TIMESTAMP(6)),
-                        createTestItem("TIMESTAMP(3)", DataTypes.TIMESTAMP(3)),
-                        createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE", DataTypes.TIMESTAMP(3)),
-                        createTestItem(
-                                "TIMESTAMP WITH LOCAL TIME ZONE",
-                                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)),
-                        createTestItem(
-                                "TIMESTAMP(3) WITH LOCAL TIME ZONE",
-                                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
-                        createTestItem(
-                                "ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>",
-                                DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))),
-                        createTestItem(
-                                "ARRAY<INT NOT NULL>", DataTypes.ARRAY(DataTypes.INT().notNull())),
-                        createTestItem("INT ARRAY", DataTypes.ARRAY(DataTypes.INT())),
-                        createTestItem(
-                                "INT NOT NULL ARRAY", DataTypes.ARRAY(DataTypes.INT().notNull())),
-                        createTestItem(
-                                "INT ARRAY NOT NULL", DataTypes.ARRAY(DataTypes.INT()).notNull()),
-                        createTestItem(
-                                "MULTISET<INT NOT NULL>",
-                                DataTypes.MULTISET(DataTypes.INT().notNull())),
-                        createTestItem("INT MULTISET", DataTypes.MULTISET(DataTypes.INT())),
-                        createTestItem(
-                                "INT NOT NULL MULTISET",
-                                DataTypes.MULTISET(DataTypes.INT().notNull())),
-                        createTestItem(
-                                "INT MULTISET NOT NULL",
-                                DataTypes.MULTISET(DataTypes.INT()).notNull()),
-                        createTestItem(
-                                "MAP<BIGINT, BOOLEAN>",
-                                DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BOOLEAN())),
-                        // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
-                        createTestItem(
-                                "ROW<f0 INT NOT NULL, f1 BOOLEAN>",
-                                DataTypes.ROW(
-                                        DataTypes.FIELD("f0", DataTypes.INT()),
-                                        DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
-                        // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
-                        createTestItem(
-                                "ROW(f0 INT NOT NULL, f1 BOOLEAN)",
-                                DataTypes.ROW(
-                                        DataTypes.FIELD("f0", DataTypes.INT()),
-                                        DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
-                        createTestItem(
-                                "ROW<`f0` INT>",
-                                DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))),
-                        createTestItem(
-                                "ROW(`f0` INT)",
-                                DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))),
-                        createTestItem("ROW<>", DataTypes.ROW()),
-                        createTestItem("ROW()", DataTypes.ROW()),
-                        // Expect to be ROW<`f0` INT NOT NULL '...', `f1` BOOLEAN '...'>.
-                        createTestItem(
-                                "ROW<f0 INT NOT NULL 'This is a comment.',"
-                                        + " f1 BOOLEAN 'This as well.'>",
-                                DataTypes.ROW(
-                                        DataTypes.FIELD("f0", DataTypes.INT()),
-                                        DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
-                        createTestItem(
-                                "ARRAY<ROW<f0 INT, f1 BOOLEAN>>",
-                                DataTypes.ARRAY(
-                                        DataTypes.ROW(
-                                                DataTypes.FIELD("f0", DataTypes.INT()),
-                                                DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
-                        createTestItem(
-                                "ROW<f0 INT, f1 BOOLEAN> MULTISET",
-                                DataTypes.MULTISET(
-                                        DataTypes.ROW(
-                                                DataTypes.FIELD("f0", DataTypes.INT()),
-                                                DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
-                        createTestItem(
-                                "MULTISET<ROW<f0 INT, f1 BOOLEAN>>",
-                                DataTypes.MULTISET(
-                                        DataTypes.ROW(
-                                                DataTypes.FIELD("f0", DataTypes.INT()),
-                                                DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
-                        createTestItem(
-                                "ROW<f0 Row<f00 INT, f01 BOOLEAN>, "
-                                        + "f1 INT ARRAY, "
-                                        + "f2 BOOLEAN MULTISET>",
-                                DataTypes.ROW(
-                                        DataTypes.FIELD(
-                                                "f0",
-                                                DataTypes.ROW(
-                                                        DataTypes.FIELD("f00", DataTypes.INT()),
-                                                        DataTypes.FIELD(
-                                                                "f01", DataTypes.BOOLEAN()))),
-                                        DataTypes.FIELD("f1", DataTypes.ARRAY(DataTypes.INT())),
-                                        DataTypes.FIELD(
-                                                "f2", DataTypes.MULTISET(DataTypes.BOOLEAN())))));
-        StringBuilder buffer = new StringBuilder("create table t1(\n");
-        for (int i = 0; i < testItems.size(); i++) {
-            buffer.append("f").append(i).append(" ").append(testItems.get(i).testExpr);
-            if (i == testItems.size() - 1) {
-                buffer.append(")");
-            } else {
-                buffer.append(",\n");
-            }
-        }
-        final String sql = buffer.toString();
-        final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        SqlNode node = parser.parse(sql);
-        assert node instanceof SqlCreateTable;
-        Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
-        TableSchema schema = ((CreateTableOperation) operation).getCatalogTable().getSchema();
-        Object[] expectedDataTypes = testItems.stream().map(item -> item.expectedType).toArray();
-        assertArrayEquals(expectedDataTypes, schema.getFieldDataTypes());
-    }
-
-    @Test
-    public void testCreateTableWithComputedColumn() {
-        final String sql =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a int,\n"
-                        + "  b varchar, \n"
-                        + "  c as a - 1, \n"
-                        + "  d as b || '$$', \n"
-                        + "  e as my_udf1(a),"
-                        + "  f as `default`.my_udf2(a) + 1,"
-                        + "  g as builtin.`default`.my_udf3(a) || '##'\n"
-                        + ")\n"
-                        + "  with (\n"
-                        + "    'connector' = 'kafka', \n"
-                        + "    'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-        functionCatalog.registerTempCatalogScalarFunction(
-                ObjectIdentifier.of("builtin", "default", "my_udf1"), Func0$.MODULE$);
-        functionCatalog.registerTempCatalogScalarFunction(
-                ObjectIdentifier.of("builtin", "default", "my_udf2"), Func1$.MODULE$);
-        functionCatalog.registerTempCatalogScalarFunction(
-                ObjectIdentifier.of("builtin", "default", "my_udf3"), Func8$.MODULE$);
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, getParserBySqlDialect(SqlDialect.DEFAULT));
-        assert operation instanceof CreateTableOperation;
-        CreateTableOperation op = (CreateTableOperation) operation;
-        CatalogTable catalogTable = op.getCatalogTable();
-        assertArrayEquals(
-                new String[] {"a", "b", "c", "d", "e", "f", "g"},
-                catalogTable.getSchema().getFieldNames());
-        assertArrayEquals(
-                new DataType[] {
-                    DataTypes.INT(),
-                    DataTypes.STRING(),
-                    DataTypes.INT(),
-                    DataTypes.STRING(),
-                    DataTypes.INT().notNull(),
-                    DataTypes.INT(),
-                    DataTypes.STRING()
-                },
-                catalogTable.getSchema().getFieldDataTypes());
-        String[] columnExpressions =
-                catalogTable.getSchema().getTableColumns().stream()
-                        .filter(ComputedColumn.class::isInstance)
-                        .map(ComputedColumn.class::cast)
-                        .map(ComputedColumn::getExpression)
-                        .toArray(String[]::new);
-        String[] expected =
-                new String[] {
-                    "`a` - 1",
-                    "`b` || '$$'",
-                    "`builtin`.`default`.`my_udf1`(`a`)",
-                    "`builtin`.`default`.`my_udf2`(`a`) + 1",
-                    "`builtin`.`default`.`my_udf3`(`a`) || '##'"
-                };
-        assertArrayEquals(expected, columnExpressions);
-    }
-
-    @Test
-    public void testCreateTableWithMetadataColumn() {
-        final String sql =
-                "CREATE TABLE tbl1 (\n"
-                        + "  a INT,\n"
-                        + "  b STRING,\n"
-                        + "  c INT METADATA,\n"
-                        + "  d INT METADATA FROM 'other.key',\n"
-                        + "  e INT METADATA VIRTUAL\n"
-                        + ")\n"
-                        + "  WITH (\n"
-                        + "    'connector' = 'kafka',\n"
-                        + "    'kafka.topic' = 'log.test'\n"
-                        + ")\n";
-
-        final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final Operation operation = parse(sql, planner, getParserBySqlDialect(SqlDialect.DEFAULT));
-        assert operation instanceof CreateTableOperation;
-        final CreateTableOperation op = (CreateTableOperation) operation;
-        final TableSchema actualSchema = op.getCatalogTable().getSchema();
-
-        final TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("a", DataTypes.INT()))
-                        .add(TableColumn.physical("b", DataTypes.STRING()))
-                        .add(TableColumn.metadata("c", DataTypes.INT()))
-                        .add(TableColumn.metadata("d", DataTypes.INT(), "other.key"))
-                        .add(TableColumn.metadata("e", DataTypes.INT(), true))
-                        .build();
-
-        assertEquals(expectedSchema, actualSchema);
-    }
-
-    @Test
-    public void testAlterTable() throws Exception {
-        Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        catalogManager.registerCatalog("cat1", catalog);
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder().column("a", DataTypes.STRING()).build(),
-                        "tb1",
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.setCurrentCatalog("cat1");
-        catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
-        final String[] renameTableSqls =
-                new String[] {
-                    "alter table cat1.db1.tb1 rename to tb2",
-                    "alter table db1.tb1 rename to tb2",
-                    "alter table tb1 rename to cat1.db1.tb2",
-                };
-        final ObjectIdentifier expectedIdentifier = ObjectIdentifier.of("cat1", "db1", "tb1");
-        final ObjectIdentifier expectedNewIdentifier = ObjectIdentifier.of("cat1", "db1", "tb2");
-        // test rename table converter
-        for (int i = 0; i < renameTableSqls.length; i++) {
-            Operation operation = parse(renameTableSqls[i], SqlDialect.DEFAULT);
-            assert operation instanceof AlterTableRenameOperation;
-            final AlterTableRenameOperation alterTableRenameOperation =
-                    (AlterTableRenameOperation) operation;
-            assertEquals(expectedIdentifier, alterTableRenameOperation.getTableIdentifier());
-            assertEquals(expectedNewIdentifier, alterTableRenameOperation.getNewTableIdentifier());
-        }
-        // test alter table options
-        Operation operation =
-                parse(
-                        "alter table cat1.db1.tb1 set ('k1' = 'v1', 'K2' = 'V2')",
-                        SqlDialect.DEFAULT);
-        assert operation instanceof AlterTableOptionsOperation;
-        final AlterTableOptionsOperation alterTableOptionsOperation =
-                (AlterTableOptionsOperation) operation;
-        assertEquals(expectedIdentifier, alterTableOptionsOperation.getTableIdentifier());
-        assertEquals(2, alterTableOptionsOperation.getCatalogTable().getOptions().size());
-        Map<String, String> options = new HashMap<>();
-        options.put("k1", "v1");
-        options.put("K2", "V2");
-        assertEquals(options, alterTableOptionsOperation.getCatalogTable().getOptions());
-    }
-
-    @Test
-    public void testAlterTableAddPkConstraint() throws Exception {
-        Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        catalogManager.registerCatalog("cat1", catalog);
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.STRING().notNull())
-                                .column("b", DataTypes.BIGINT().notNull())
-                                .column("c", DataTypes.BIGINT())
-                                .build(),
-                        "tb1",
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.setCurrentCatalog("cat1");
-        catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
-        // Test alter add table constraint.
-        Operation operation =
-                parse(
-                        "alter table tb1 add constraint ct1 primary key(a, b) not enforced",
-                        SqlDialect.DEFAULT);
-        assert operation instanceof AlterTableAddConstraintOperation;
-        AlterTableAddConstraintOperation addConstraintOperation =
-                (AlterTableAddConstraintOperation) operation;
-        assertThat(
-                addConstraintOperation.asSummaryString(),
-                is(
-                        "ALTER TABLE ADD CONSTRAINT: (identifier: [`cat1`.`db1`.`tb1`], "
-                                + "constraintName: [ct1], columns: [a, b])"));
-        // Test alter table add pk on nullable column
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage("Could not create a PRIMARY KEY 'ct1'. Column 'c' is nullable.");
-        parse("alter table tb1 add constraint ct1 primary key(c) not enforced", SqlDialect.DEFAULT);
-    }
-
-    @Test
-    public void testAlterTableAddPkConstraintEnforced() throws Exception {
-        Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        catalogManager.registerCatalog("cat1", catalog);
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.STRING().notNull())
-                                .column("b", DataTypes.BIGINT().notNull())
-                                .column("c", DataTypes.BIGINT())
-                                .build(),
-                        "tb1",
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.setCurrentCatalog("cat1");
-        catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
-        // Test alter table add enforced
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage(
-                "Flink doesn't support ENFORCED mode for PRIMARY KEY constaint. "
-                        + "ENFORCED/NOT ENFORCED  controls if the constraint checks are performed on the "
-                        + "incoming/outgoing data. Flink does not own the data therefore the "
-                        + "only supported mode is the NOT ENFORCED mode");
-        parse("alter table tb1 add constraint ct1 primary key(a, b)", SqlDialect.DEFAULT);
-    }
-
-    @Test
-    public void testAlterTableAddUniqueConstraint() throws Exception {
-        Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        catalogManager.registerCatalog("cat1", catalog);
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.STRING().notNull())
-                                .column("b", DataTypes.BIGINT().notNull())
-                                .build(),
-                        "tb1",
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.setCurrentCatalog("cat1");
-        catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
-        // Test alter add table constraint.
-        thrown.expect(UnsupportedOperationException.class);
-        thrown.expectMessage("UNIQUE constraint is not supported yet");
-        parse("alter table tb1 add constraint ct1 unique(a, b) not enforced", SqlDialect.DEFAULT);
-    }
-
-    @Test
-    public void testAlterTableRenameColumn() throws Exception {
-        Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        catalogManager.registerCatalog("cat1", catalog);
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.STRING().notNull())
-                                .column("b", DataTypes.BIGINT().notNull())
-                                .column("c", DataTypes.TIMESTAMP(3).notNull())
-                                .watermark("c", "c - INTERVAL '5' SECOND")
-                                .primaryKey("b")
-                                .build(),
-                        "tb1",
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.setCurrentCatalog("cat1");
-        catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
-        // Test alter table rename
-        Operation operation = parse("alter table tb1 rename c to c1", SqlDialect.DEFAULT);
-        assert operation instanceof AlterTableSchemaOperation;
-        Schema unresolvedSchema =
-                ((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema();
-        String[] newColumns =
-                unresolvedSchema.getColumns().stream()
-                        .map(Schema.UnresolvedColumn::getName)
-                        .toArray(String[]::new);
-        // test rename column name
-        assertArrayEquals(newColumns, new String[] {"a", "b", "c1"});
-        // test watermark expression
-        Schema.UnresolvedWatermarkSpec unresolvedWatermarkSpec =
-                unresolvedSchema.getWatermarkSpecs().get(0);
-        assertThat(unresolvedWatermarkSpec.getColumnName(), is("c1"));
-
-        String expression =
-                ((SqlCallExpression) unresolvedWatermarkSpec.getWatermarkExpression())
-                        .getSqlExpression();
-        assertThat(expression, is("c1 - INTERVAL '5' SECOND"));
-    }
-
-    @Test
-    public void testAlterTableAddUniqueConstraintEnforced() throws Exception {
-        Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        catalogManager.registerCatalog("cat1", catalog);
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.STRING().notNull())
-                                .column("b", DataTypes.BIGINT().notNull())
-                                .column("c", DataTypes.BIGINT())
-                                .build(),
-                        "tb1",
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.setCurrentCatalog("cat1");
-        catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
-        // Test alter table add enforced
-        thrown.expect(UnsupportedOperationException.class);
-        thrown.expectMessage("UNIQUE constraint is not supported yet");
-        parse("alter table tb1 add constraint ct1 unique(a, b)", SqlDialect.DEFAULT);
-    }
-
-    @Test
-    public void testAlterTableDropConstraint() throws Exception {
-        Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        catalogManager.registerCatalog("cat1", catalog);
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.STRING().notNull())
-                                .column("b", DataTypes.BIGINT().notNull())
-                                .column("c", DataTypes.BIGINT())
-                                .primaryKeyNamed("ct1", "a", "b")
-                                .build(),
-                        "tb1",
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-        catalogManager.setCurrentCatalog("cat1");
-        catalogManager.setCurrentDatabase("db1");
-        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
-        // Test alter table add enforced
-        Operation operation = parse("alter table tb1 drop constraint ct1", SqlDialect.DEFAULT);
-        assert operation instanceof AlterTableDropConstraintOperation;
-        AlterTableDropConstraintOperation dropConstraint =
-                (AlterTableDropConstraintOperation) operation;
-        assertThat(
-                dropConstraint.asSummaryString(),
-                is("ALTER TABLE `cat1`.`db1`.`tb1` DROP CONSTRAINT ct1"));
-        thrown.expect(ValidationException.class);
-        thrown.expectMessage("CONSTRAINT [ct2] does not exist");
-        parse("alter table tb1 drop constraint ct2", SqlDialect.DEFAULT);
-    }
-
-    @Test
-    public void testCreateViewWithMatchRecognize() {
-        Map<String, String> prop = new HashMap<>();
-        prop.put("connector", "values");
-        prop.put("bounded", "true");
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("id", DataTypes.INT().notNull())
-                                .column("measurement", DataTypes.BIGINT().notNull())
-                                .column(
-                                        "ts",
-                                        DataTypes.ROW(
-                                                DataTypes.FIELD("tmstmp", DataTypes.TIMESTAMP(3))))
-                                .build(),
-                        null,
-                        Collections.emptyList(),
-                        prop);
-
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "events"), false);
-
-        final String sql =
-                ""
-                        + "CREATE TEMPORARY VIEW foo AS "
-                        + "SELECT * "
-                        + "FROM events MATCH_RECOGNIZE ("
-                        + "    PARTITION BY id "
-                        + "    ORDER BY ts ASC "
-                        + "    MEASURES "
-                        + "      next_step.measurement - this_step.measurement AS diff "
-                        + "    AFTER MATCH SKIP TO NEXT ROW "
-                        + "    PATTERN (this_step next_step)"
-                        + "    DEFINE "
-                        + "         this_step AS TRUE,"
-                        + "         next_step AS TRUE"
-                        + ")";
-
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assertThat(operation, instanceOf(CreateViewOperation.class));
-    }
-
-    @Test
-    public void testCreateViewWithDynamicTableOptions() {
-        tableConfig
-                .getConfiguration()
-                .setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
-        Map<String, String> prop = new HashMap<>();
-        prop.put("connector", "values");
-        prop.put("bounded", "true");
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("f0", DataTypes.INT())
-                                .column("f1", DataTypes.VARCHAR(20))
-                                .build(),
-                        null,
-                        Collections.emptyList(),
-                        prop);
-
-        catalogManager.createTable(
-                catalogTable, ObjectIdentifier.of("builtin", "default", "sourceA"), false);
-
-        final String sql =
-                ""
-                        + "create view test_view as\n"
-                        + "select *\n"
-                        + "from sourceA /*+ OPTIONS('changelog-mode'='I') */";
-
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assertThat(operation, instanceOf(CreateViewOperation.class));
-    }
-
-    @Test
-    public void testBeginStatementSet() {
-        final String sql = "BEGIN STATEMENT SET";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof BeginStatementSetOperation;
-        final BeginStatementSetOperation beginStatementSetOperation =
-                (BeginStatementSetOperation) operation;
-
-        assertEquals("BEGIN STATEMENT SET", beginStatementSetOperation.asSummaryString());
-    }
-
-    @Test
-    public void testEnd() {
-        final String sql = "END";
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof EndStatementSetOperation;
-        final EndStatementSetOperation endStatementSetOperation =
-                (EndStatementSetOperation) operation;
-
-        assertEquals("END", endStatementSetOperation.asSummaryString());
-    }
-
-    // ~ Tool Methods ----------------------------------------------------------
-
-    private static TestItem createTestItem(Object... args) {
-        assert args.length == 2;
-        final String testExpr = (String) args[0];
-        TestItem testItem = TestItem.fromTestExpr(testExpr);
-        if (args[1] instanceof String) {
-            testItem.withExpectedError((String) args[1]);
-        } else {
-            testItem.withExpectedType(args[1]);
-        }
-        return testItem;
-    }
-
-    private void assertShowFunctions(
-            String sql, String expectedSummary, FunctionScope expectedScope) {
-        Operation operation = parse(sql, SqlDialect.DEFAULT);
-        assert operation instanceof ShowFunctionsOperation;
-        final ShowFunctionsOperation showFunctionsOperation = (ShowFunctionsOperation) operation;
-
-        assertEquals(expectedScope, showFunctionsOperation.getFunctionScope());
-        assertEquals(expectedSummary, showFunctionsOperation.asSummaryString());
-    }
-
-    private Operation parse(String sql, FlinkPlannerImpl planner, CalciteParser parser) {
-        SqlNode node = parser.parse(sql);
-        return SqlToOperationConverter.convert(planner, catalogManager, node).get();
-    }
-
-    private Operation parse(String sql, SqlDialect sqlDialect) {
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(sqlDialect);
-        final CalciteParser parser = getParserBySqlDialect(sqlDialect);
-        SqlNode node = parser.parse(sql);
-        return SqlToOperationConverter.convert(planner, catalogManager, node).get();
-    }
-
-    private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
-        tableConfig.setSqlDialect(sqlDialect);
-        return plannerContext.createFlinkPlanner(
-                catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase());
-    }
-
-    private CalciteParser getParserBySqlDialect(SqlDialect sqlDialect) {
-        tableConfig.setSqlDialect(sqlDialect);
-        return plannerContext.createCalciteParser();
-    }
-
-    // ~ Inner Classes ----------------------------------------------------------
-
-    private static class TestItem {
-        private final String testExpr;
-        @Nullable private Object expectedType;
-        @Nullable private String expectedError;
-
-        private TestItem(String testExpr) {
-            this.testExpr = testExpr;
-        }
-
-        static TestItem fromTestExpr(String testExpr) {
-            return new TestItem(testExpr);
-        }
-
-        TestItem withExpectedType(Object expectedType) {
-            this.expectedType = expectedType;
-            return this;
-        }
-
-        TestItem withExpectedError(String expectedError) {
-            this.expectedError = expectedError;
-            return this;
-        }
-
-        @Override
-        public String toString() {
-            return this.testExpr;
-        }
-    }
-
-    private Operation parseAndConvert(String sql) {
-        final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-
-        SqlNode node = parser.parse(sql);
-        return SqlToOperationConverter.convert(planner, catalogManager, node).get();
-    }
-}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
new file mode 100644
index 00000000000..7b22246de8d
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
@@ -0,0 +1,108 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+
+import org.apache.calcite.rex.RexInputRef;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** A finder used to look up referenced column name in a {@link ResolvedExpression}. */
+public class ColumnReferenceFinder {
+
+    private ColumnReferenceFinder() {}
+
+    public static Set<String> findReferencedColumn(
+            ResolvedExpression resolvedExpression, List<String> tableColumns) {
+        ColumnReferenceVisitor visitor = new ColumnReferenceVisitor(tableColumns);
+        visitor.visit(resolvedExpression);
+        return visitor.referencedColumns;
+    }
+
+    private static class ColumnReferenceVisitor extends ExpressionDefaultVisitor<Void> {
+        private final List<String> tableColumns;
+        private final Set<String> referencedColumns;
+
+        public ColumnReferenceVisitor(List<String> tableColumns) {
+            this.tableColumns = tableColumns;
+            this.referencedColumns = new HashSet<>();
+        }
+
+        @Override
+        public Void visit(Expression expression) {
+            if (expression instanceof LocalReferenceExpression) {
+                return visit((LocalReferenceExpression) expression);
+            } else if (expression instanceof FieldReferenceExpression) {
+                return visit((FieldReferenceExpression) expression);
+            } else if (expression instanceof RexNodeExpression) {
+                return visit((RexNodeExpression) expression);
+            } else if (expression instanceof CallExpression) {
+                return visit((CallExpression) expression);
+            } else {
+                return super.visit(expression);
+            }
+        }
+
+        @Override
+        public Void visit(FieldReferenceExpression fieldReference) {
+            referencedColumns.add(fieldReference.getName());
+            return null;
+        }
+
+        public Void visit(LocalReferenceExpression localReference) {
+            referencedColumns.add(localReference.getName());
+            return null;
+        }
+
+        public Void visit(RexNodeExpression rexNode) {
+            // get the referenced column ref in table
+            Set<RexInputRef> inputRefs = FlinkRexUtil.findAllInputRefs(rexNode.getRexNode());
+            // get the referenced column name by index
+            inputRefs.forEach(
+                    inputRef -> {
+                        int index = inputRef.getIndex();
+                        referencedColumns.add(tableColumns.get(index));
+                    });
+            return null;
+        }
+
+        @Override
+        public Void visit(CallExpression call) {
+            for (Expression expression : call.getChildren()) {
+                visit(expression);
+            }
+            return null;
+        }
+
+        @Override
+        protected Void defaultMethod(Expression expression) {
+            throw new TableException("Unexpected expression: " + expression);
+        }
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
index 24acefd2930..db4cfc1e9cf 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.operations;
 
 import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableModify;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableRenameColumn;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn;
 import org.apache.flink.sql.parser.ddl.SqlWatermark;
@@ -27,9 +28,15 @@ import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
 import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.SchemaResolver;
+import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.expressions.SqlCallExpression;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.expressions.ColumnReferenceFinder;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
@@ -45,6 +52,7 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -64,6 +72,8 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDa
  */
 public class AlterSchemaConverter {
 
+    private static final String EX_MSG_PREFIX = "Failed to execute ALTER TABLE statement.\n";
+
     private final SqlValidator sqlValidator;
     private final Function<SqlNode, String> escapeExpression;
     private final Consumer<SqlTableConstraint> constraintValidator;
@@ -104,9 +114,83 @@ public class AlterSchemaConverter {
         return converter.convert();
     }
 
-    private abstract static class SchemaConverter {
+    public Schema applySchemaChange(
+            SqlAlterTableRenameColumn renameColumn, ContextResolvedTable originalTable) {
+        String oldColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
+        String newColumnName = getColumnName(renameColumn.getNewColumnIdentifier());
+        List<String> tableColumns =
+                originalTable.getResolvedSchema().getColumns().stream()
+                        .map(Column::getName)
+                        .collect(Collectors.toList());
+        // validate old column is exists or new column isn't duplicated or old column isn't
+        // referenced by computed column
+        validateColumnName(
+                oldColumnName,
+                newColumnName,
+                tableColumns,
+                originalTable.getResolvedSchema(),
+                ((CatalogTable) originalTable.getResolvedTable()).getPartitionKeys());
+
+        // validate old column isn't referenced by watermark
+        List<WatermarkSpec> watermarkSpecs = originalTable.getResolvedSchema().getWatermarkSpecs();
+        watermarkSpecs.forEach(
+                watermarkSpec -> {
+                    String rowtimeAttribute = watermarkSpec.getRowtimeAttribute();
+                    Set<String> referencedColumns =
+                            ColumnReferenceFinder.findReferencedColumn(
+                                    watermarkSpec.getWatermarkExpression(), tableColumns);
+                    if (oldColumnName.equals(rowtimeAttribute)
+                            || referencedColumns.contains(oldColumnName)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Old column %s is referred by watermark expression %s, "
+                                                + "currently doesn't allow to rename column which is "
+                                                + "referred by watermark expression.",
+                                        oldColumnName, watermarkSpec.asSummaryString()));
+                    }
+                });
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        Schema originSchema = originalTable.getTable().getUnresolvedSchema();
+        originSchema
+                .getColumns()
+                .forEach(
+                        column -> {
+                            if (oldColumnName.equals(column.getName())) {
+                                buildNewColumnFromOriginColumn(builder, column, newColumnName);
+                            } else {
+                                builder.fromColumns(Collections.singletonList(column));
+                            }
+                        });
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames();
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> newPrimaryKeyNames =
+                    originPrimaryKeyNames.stream()
+                            .map(pkName -> pkName.equals(oldColumnName) ? newColumnName : pkName)
+                            .collect(Collectors.toList());
+            builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
+        }
+
+        // build watermark
+        originSchema
+                .getWatermarkSpecs()
+                .forEach(
+                        watermarkSpec ->
+                                builder.watermark(
+                                        watermarkSpec.getColumnName(),
+                                        watermarkSpec.getWatermarkExpression()));
+
+        // generate new schema
+        return builder.build();
+    }
+
+    // --------------------------------------------------------------------------------------------
 
-        static final String EX_MSG_PREFIX = "Failed to execute ALTER TABLE statement.\n";
+    private abstract static class SchemaConverter {
 
         List<String> sortedColumnNames = new ArrayList<>();
         Set<String> alterColNames = new HashSet<>();
@@ -278,13 +362,7 @@ public class AlterSchemaConverter {
             for (SqlNode alterColumn : alterColumns) {
                 SqlTableColumnPosition columnPosition = (SqlTableColumnPosition) alterColumn;
                 SqlTableColumn column = columnPosition.getColumn();
-                if (!column.getName().isSimple()) {
-                    throw new UnsupportedOperationException(
-                            String.format(
-                                    "%sAlter nested row type is not supported yet.",
-                                    EX_MSG_PREFIX));
-                }
-                String columnName = column.getName().getSimple();
+                String columnName = getColumnName(column.getName());
                 if (!alterColNames.add(columnName)) {
                     throw new ValidationException(
                             String.format(
@@ -493,6 +571,87 @@ public class AlterSchemaConverter {
         }
     }
 
+    // --------------------------------------------------------------------------------------------
+
+    private void validateColumnName(
+            String originColumnName,
+            String newColumnName,
+            List<String> tableColumns,
+            ResolvedSchema originResolvedSchema,
+            List<String> partitionKeys) {
+        // validate old column
+        if (!tableColumns.contains(originColumnName)) {
+            throw new ValidationException(
+                    String.format(
+                            "Old column %s not found in table schema for RENAME COLUMN",
+                            originColumnName));
+        }
+
+        // validate new column
+        if (tableColumns.contains(newColumnName)) {
+            throw new ValidationException(
+                    String.format(
+                            "New column %s already existed in table schema for RENAME COLUMN",
+                            newColumnName));
+        }
+
+        // validate old column name isn't referred by computed column case
+        originResolvedSchema.getColumns().stream()
+                .filter(column -> column instanceof Column.ComputedColumn)
+                .forEach(
+                        column -> {
+                            Column.ComputedColumn computedColumn = (Column.ComputedColumn) column;
+                            Set<String> referencedColumn =
+                                    ColumnReferenceFinder.findReferencedColumn(
+                                            computedColumn.getExpression(), tableColumns);
+                            if (referencedColumn.contains(originColumnName)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Old column %s is referred by computed column %s, currently doesn't "
+                                                        + "allow to rename column which is referred by computed column.",
+                                                originColumnName,
+                                                computedColumn.asSummaryString()));
+                            }
+                        });
+        // validate partition keys doesn't contain the old column
+        if (partitionKeys.contains(originColumnName)) {
+            throw new ValidationException(
+                    String.format(
+                            "Can not rename column %s because it is used as the partition keys.",
+                            originColumnName));
+        }
+    }
+
+    private void buildNewColumnFromOriginColumn(
+            Schema.Builder builder, Schema.UnresolvedColumn originColumn, String columnName) {
+        if (originColumn instanceof Schema.UnresolvedComputedColumn) {
+            builder.columnByExpression(
+                    columnName, ((Schema.UnresolvedComputedColumn) originColumn).getExpression());
+        } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
+            builder.column(
+                    columnName, ((Schema.UnresolvedPhysicalColumn) originColumn).getDataType());
+        } else if (originColumn instanceof Schema.UnresolvedMetadataColumn) {
+            Schema.UnresolvedMetadataColumn metadataColumn =
+                    (Schema.UnresolvedMetadataColumn) originColumn;
+            builder.columnByMetadata(
+                    columnName,
+                    metadataColumn.getDataType(),
+                    metadataColumn.getMetadataKey(),
+                    metadataColumn.isVirtual());
+        }
+        originColumn.getComment().ifPresent(builder::withComment);
+    }
+
+    private static String getColumnName(SqlIdentifier identifier) {
+        if (!identifier.isSimple()) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "%sAlter nested row type %s is not supported yet.",
+                            EX_MSG_PREFIX, identifier));
+        }
+        return identifier.getSimple();
+    }
+
     private AlterSchemaStrategy computeAlterSchemaStrategy(SqlAlterTableSchema alterTableSchema) {
         if (alterTableSchema instanceof SqlAlterTableAdd) {
             return AlterSchemaStrategy.ADD;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 237977c049c..3b4b196e991 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -28,6 +28,7 @@ import org.apache.flink.sql.parser.ddl.SqlAlterTableCompact;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableOptions;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableRename;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableRenameColumn;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableReset;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
 import org.apache.flink.sql.parser.ddl.SqlAlterView;
@@ -531,6 +532,20 @@ public class SqlToOperationConverter {
                     (SqlChangeColumn) sqlAlterTable,
                     (CatalogTable) baseTable,
                     flinkPlanner.getOrCreateSqlValidator());
+        } else if (sqlAlterTable instanceof SqlAlterTableRenameColumn) {
+            SqlAlterTableRenameColumn sqlAlterTableRenameColumn =
+                    (SqlAlterTableRenameColumn) sqlAlterTable;
+            Schema newSchema =
+                    alterSchemaConverter.applySchemaChange(
+                            sqlAlterTableRenameColumn, optionalCatalogTable.get());
+            CatalogTable baseCatalogTable = (CatalogTable) baseTable;
+            return new AlterTableSchemaOperation(
+                    tableIdentifier,
+                    CatalogTable.of(
+                            newSchema,
+                            baseCatalogTable.getComment(),
+                            baseCatalogTable.getPartitionKeys(),
+                            baseCatalogTable.getOptions()));
         } else if (sqlAlterTable instanceof SqlAddPartitions) {
             List<CatalogPartitionSpec> specs = new ArrayList<>();
             List<CatalogPartition> partitions = new ArrayList<>();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index 1f79acbb0b3..6b663e70ff9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.sql.parser.ddl.SqlChangeColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableOption;
-import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
@@ -32,8 +31,6 @@ import org.apache.flink.table.api.WatermarkSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.SqlCallExpression;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
@@ -41,12 +38,10 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.utils.TypeConversions;
 
-import com.google.common.collect.Lists;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.validate.SqlValidator;
-import org.apache.commons.lang3.StringUtils;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -67,12 +62,10 @@ public class OperationConverterUtils {
             CatalogTable catalogTable,
             SqlValidator sqlValidator) {
         // This is only used by the Hive dialect at the moment. In Hive, only non-partition columns
-        // can be
-        // added/replaced and users will only define non-partition columns in the new column list.
-        // Therefore, we require
-        // that partitions columns must appear last in the schema (which is inline with Hive).
-        // Otherwise, we won't be
-        // able to determine the column positions after the non-partition columns are replaced.
+        // can be added/replaced and users will only define non-partition columns in the new column
+        // list. Therefore, we require that partitions columns must appear last in the schema (which
+        // is inline with Hive). Otherwise, we won't be able to determine the column positions after
+        // the non-partition columns are replaced.
         TableSchema oldSchema = catalogTable.getSchema();
         int numPartCol = catalogTable.getPartitionKeys().size();
         Set<String> lastCols =
@@ -149,116 +142,6 @@ public class OperationConverterUtils {
         // TODO: handle watermark and constraints
     }
 
-    public static Operation convertRenameColumn(
-            ObjectIdentifier tableIdentifier,
-            String originColumnName,
-            String newColumnName,
-            CatalogTable catalogTable) {
-
-        Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();
-        validateColumnName(originColumnName, newColumnName, modifiedTableSchema);
-
-        Schema.Builder builder = Schema.newBuilder();
-        // build column
-        modifiedTableSchema.getColumns().stream()
-                .forEach(
-                        column -> {
-                            if (StringUtils.equals(column.getName(), originColumnName)) {
-                                buildNewColumnFromOriginColumn(builder, column, newColumnName);
-                            } else {
-                                buildNewColumnFromOriginColumn(builder, column, column.getName());
-                            }
-                        });
-        // build primary key column
-        List<String> originPrimaryKeyNames =
-                modifiedTableSchema
-                        .getPrimaryKey()
-                        .map(Schema.UnresolvedPrimaryKey::getColumnNames)
-                        .orElseGet(Lists::newArrayList);
-
-        List<String> newPrimaryKeyNames =
-                originPrimaryKeyNames.stream()
-                        .map(
-                                pkName ->
-                                        StringUtils.equals(pkName, originColumnName)
-                                                ? newColumnName
-                                                : pkName)
-                        .collect(Collectors.toList());
-
-        if (newPrimaryKeyNames.size() > 0) {
-            builder.primaryKey(newPrimaryKeyNames);
-        }
-        // build watermark
-        modifiedTableSchema.getWatermarkSpecs().stream()
-                .forEach(
-                        watermarkSpec -> {
-                            String watermarkRefColumnName = watermarkSpec.getColumnName();
-                            Expression watermarkExpression = watermarkSpec.getWatermarkExpression();
-                            if (StringUtils.equals(watermarkRefColumnName, originColumnName)) {
-                                String newWatermarkExpression =
-                                        ((SqlCallExpression) watermarkExpression)
-                                                .getSqlExpression()
-                                                .replace(watermarkRefColumnName, newColumnName);
-                                builder.watermark(newColumnName, newWatermarkExpression);
-                            } else {
-                                builder.watermark(watermarkRefColumnName, watermarkExpression);
-                            }
-                        });
-        // build partition key
-        List<String> newPartitionKeys =
-                catalogTable.getPartitionKeys().stream()
-                        .map(
-                                name ->
-                                        StringUtils.equals(name, originColumnName)
-                                                ? newColumnName
-                                                : name)
-                        .collect(Collectors.toList());
-        // generate new schema
-        Schema newSchema = builder.build();
-
-        return new AlterTableSchemaOperation(
-                tableIdentifier,
-                CatalogTable.of(
-                        newSchema,
-                        catalogTable.getComment(),
-                        newPartitionKeys,
-                        catalogTable.getOptions()));
-    }
-
-    private static void validateColumnName(
-            String originColumnName, String newColumnName, Schema modifiedTableSchema) {
-        List<String> columnNames =
-                modifiedTableSchema.getColumns().stream()
-                        .map(Schema.UnresolvedColumn::getName)
-                        .collect(Collectors.toList());
-
-        int originColumnIndex = columnNames.indexOf(originColumnName);
-        if (originColumnIndex < 0) {
-            throw new ValidationException(
-                    String.format("Old column %s not found for RENAME COLUMN ", originColumnName));
-        }
-
-        int sameColumnNameIndex = columnNames.indexOf(newColumnName);
-        if (sameColumnNameIndex >= 0) {
-            throw new ValidationException(
-                    String.format("New column %s existed for RENAME COLUMN ", newColumnName));
-        }
-    }
-
-    private static void buildNewColumnFromOriginColumn(
-            Schema.Builder builder, Schema.UnresolvedColumn originColumn, String columnName) {
-        if (originColumn instanceof Schema.UnresolvedComputedColumn) {
-            builder.columnByExpression(
-                    columnName, ((Schema.UnresolvedComputedColumn) originColumn).getExpression());
-        } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
-            builder.column(
-                    columnName, ((Schema.UnresolvedPhysicalColumn) originColumn).getDataType());
-        } else {
-            builder.columnByMetadata(
-                    columnName, ((Schema.UnresolvedMetadataColumn) originColumn).getDataType());
-        }
-    }
-
     // change a column in the old table schema and return the updated table schema
     public static TableSchema changeColumn(
             TableSchema oldSchema,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
index fc342e39b3b..81819208b9d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
@@ -313,7 +313,7 @@ object FlinkRexUtil {
    * @return
    *   InputRef HashSet.
    */
-  private[flink] def findAllInputRefs(node: RexNode): util.HashSet[RexInputRef] = {
+  def findAllInputRefs(node: RexNode): util.HashSet[RexInputRef] = {
     val set = new util.HashSet[RexInputRef]
     node.accept(new RexVisitorImpl[Void](true) {
       override def visitInputRef(inputRef: RexInputRef): Void = {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index c7ba3a5214c..1bbeb29d920 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -130,6 +130,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
+import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.planner.utils.OperationMatchers.entry;
 import static org.apache.flink.table.planner.utils.OperationMatchers.isCreateTableOperation;
 import static org.apache.flink.table.planner.utils.OperationMatchers.partitionedBy;
@@ -1268,6 +1269,121 @@ public class SqlToOperationConverterTest {
                 .hasMessageContaining("ALTER TABLE RESET does not support empty key");
     }
 
+    @Test
+    public void testAlterTableRenameColumn() throws Exception {
+        prepareTable("tb1", false, false, true, 3);
+        // rename pk column c
+        Operation operation = parse("alter table tb1 rename c to c1");
+        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema())
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.INT().notNull())
+                                .column("b", DataTypes.BIGINT().notNull())
+                                .column("c1", DataTypes.STRING().notNull())
+                                .withComment("column comment")
+                                .columnByExpression("d", "a*(b+2 + a*b)")
+                                .column(
+                                        "e",
+                                        DataTypes.ROW(
+                                                DataTypes.STRING(),
+                                                DataTypes.INT(),
+                                                DataTypes.ROW(
+                                                        DataTypes.DOUBLE(),
+                                                        DataTypes.ARRAY(DataTypes.FLOAT()))))
+                                .columnByExpression("f", "e.f1 + e.f2.f0")
+                                .columnByMetadata("g", DataTypes.STRING(), null, true)
+                                .column("ts", DataTypes.TIMESTAMP(3))
+                                .withComment("just a comment")
+                                .watermark("ts", "ts - interval '5' seconds")
+                                .primaryKeyNamed("ct1", "a", "b", "c1")
+                                .build());
+
+        // rename computed column
+        operation = parse("alter table tb1 rename f to f1");
+        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema())
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.INT().notNull())
+                                .column("b", DataTypes.BIGINT().notNull())
+                                .column("c", DataTypes.STRING().notNull())
+                                .withComment("column comment")
+                                .columnByExpression("d", "a*(b+2 + a*b)")
+                                .column(
+                                        "e",
+                                        DataTypes.ROW(
+                                                DataTypes.STRING(),
+                                                DataTypes.INT(),
+                                                DataTypes.ROW(
+                                                        DataTypes.DOUBLE(),
+                                                        DataTypes.ARRAY(DataTypes.FLOAT()))))
+                                .columnByExpression("f1", "e.f1 + e.f2.f0")
+                                .columnByMetadata("g", DataTypes.STRING(), null, true)
+                                .column("ts", DataTypes.TIMESTAMP(3))
+                                .withComment("just a comment")
+                                .watermark("ts", "ts - interval '5' seconds")
+                                .primaryKeyNamed("ct1", "a", "b", "c")
+                                .build());
+
+        // rename column c that is used in a computed column
+        assertThatThrownBy(() -> parse("alter table tb1 rename a to a1"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Old column a is referred by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b), "
+                                + "currently doesn't allow to rename column which is referred by computed column.");
+
+        // rename column used in the watermark expression
+        assertThatThrownBy(() -> parse("alter table tb1 rename ts to ts1"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Old column ts is referred by watermark expression WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds, "
+                                + "currently doesn't allow to rename column which is referred by watermark expression.");
+
+        // rename nested column
+        assertThatThrownBy(() -> parse("alter table tb1 rename e.f1 to e.f11"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Alter nested row type e.f1 is not supported yet.");
+
+        // rename column with duplicate name
+        assertThatThrownBy(() -> parse("alter table tb1 rename c to a"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "New column a already existed in table schema for RENAME COLUMN");
+
+        // rename column e test computed column expression is ApiExpression which doesn't implement
+        // the equals method
+        CatalogTable catalogTable2 =
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.STRING().notNull())
+                                .column("b", DataTypes.INT().notNull())
+                                .column("e", DataTypes.STRING())
+                                .columnByExpression("j", $("e").upperCase())
+                                .columnByExpression("g", "TO_TIMESTAMP(e)")
+                                .primaryKey("a", "b")
+                                .build(),
+                        "tb2",
+                        Collections.singletonList("a"),
+                        Collections.emptyMap());
+        catalogManager
+                .getCatalog("cat1")
+                .get()
+                .createTable(new ObjectPath("db1", "tb2"), catalogTable2, true);
+
+        assertThatThrownBy(() -> parse("alter table `cat1`.`db1`.`tb2` rename e to e1"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Old column e is referred by computed column `j` STRING AS upper(e), currently doesn't "
+                                + "allow to rename column which is referred by computed column.");
+
+        // rename column used as partition key
+        assertThatThrownBy(() -> parse("alter table tb2 rename a to a1"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Can not rename column a because it is used as the partition keys");
+    }
+
     @Test
     public void testAlterTableDropConstraint() throws Exception {
         prepareNonManagedTable(true);
@@ -1396,9 +1512,7 @@ public class SqlToOperationConverterTest {
         // add an inner field to a nested row
         assertThatThrownBy(() -> parse("alter table tb1 add (e.f3 string)"))
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Alter nested row type is not supported yet.");
+                .hasMessageContaining("Alter nested row type e.f3 is not supported yet.");
 
         // refer to a nested inner field
         assertThatThrownBy(() -> parse("alter table tb1 add (x string after e.f2)"))
@@ -1409,9 +1523,7 @@ public class SqlToOperationConverterTest {
 
         assertThatThrownBy(() -> parse("alter table tb1 add (e.f3 string after e.f1)"))
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Alter nested row type is not supported yet.");
+                .hasMessageContaining("Alter nested row type e.f3 is not supported yet.");
     }
 
     @Test
@@ -1430,7 +1542,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1464,7 +1576,7 @@ public class SqlToOperationConverterTest {
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
                                 + "  `i` AS [`b` * 2],\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1483,7 +1595,8 @@ public class SqlToOperationConverterTest {
                         .column("a", DataTypes.INT().notNull())
                         .column("b", DataTypes.BIGINT().notNull())
                         .columnByExpression("i", new SqlCallExpression("`b` * 2"))
-                        .column("c", DataTypes.STRING())
+                        .column("c", DataTypes.STRING().notNull())
+                        .withComment("column comment")
                         .columnByExpression("d", "a*(b+2 + a*b)")
                         .column(
                                 "e",
@@ -1514,7 +1627,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1649,7 +1762,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1671,7 +1784,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1696,7 +1809,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1720,7 +1833,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1799,7 +1912,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1820,7 +1933,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1849,7 +1962,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1879,7 +1992,7 @@ public class SqlToOperationConverterTest {
                         "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
                                 + "  `a` INT NOT NULL,\n"
                                 + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING,\n"
+                                + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
                                 + "  `d` AS [a*(b+2 + a*b)],\n"
                                 + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
                                 + "  `f` AS [e.f1 + e.f2.f0],\n"
@@ -1970,9 +2083,7 @@ public class SqlToOperationConverterTest {
         // modify an inner field to a nested row
         assertThatThrownBy(() -> parse("alter table tb2 modify (e.f0 string)"))
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Alter nested row type is not supported yet.");
+                .hasMessageContaining("Alter nested row type e.f0 is not supported yet.");
 
         // refer to a nested inner field
         assertThatThrownBy(() -> parse("alter table tb2 modify (g string after e.f2)"))
@@ -1983,9 +2094,7 @@ public class SqlToOperationConverterTest {
 
         assertThatThrownBy(() -> parse("alter table tb2 modify (e.f0 string after e.f1)"))
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Failed to execute ALTER TABLE statement.\n"
-                                + "Alter nested row type is not supported yet.");
+                .hasMessageContaining("Alter nested row type e.f0 is not supported yet.");
     }
 
     @Test
@@ -2005,7 +2114,8 @@ public class SqlToOperationConverterTest {
                         .column("b", DataTypes.BIGINT().notNull())
                         .withComment("move b to first and add comment")
                         .column("a", DataTypes.INT().notNull())
-                        .column("c", DataTypes.STRING())
+                        .column("c", DataTypes.STRING().notNull())
+                        .withComment("column comment")
                         .columnByExpression("d", "a*(b+2 + a*b)")
                         .column(
                                 "e",
@@ -2030,7 +2140,8 @@ public class SqlToOperationConverterTest {
                 Schema.newBuilder()
                         .column("a", DataTypes.INT().notNull())
                         .column("b", DataTypes.BIGINT().notNull())
-                        .column("c", DataTypes.STRING())
+                        .column("c", DataTypes.STRING().notNull())
+                        .withComment("column comment")
                         .columnByExpression("d", "a*(b+2 + a*b)")
                         .column(
                                 "e",
@@ -2063,6 +2174,7 @@ public class SqlToOperationConverterTest {
                 tableIdentifier,
                 Schema.newBuilder()
                         .column("c", DataTypes.BIGINT())
+                        .withComment("column comment")
                         .column("a", DataTypes.INT().notNull())
                         .column("b", DataTypes.BIGINT().notNull())
                         .columnByExpression("d", "`a` + 2")
@@ -2097,7 +2209,8 @@ public class SqlToOperationConverterTest {
                         .withComment("change g")
                         .column("a", DataTypes.INT().notNull())
                         .column("b", DataTypes.BIGINT().notNull())
-                        .column("c", DataTypes.STRING())
+                        .column("c", DataTypes.STRING().notNull())
+                        .withComment("column comment")
                         .columnByExpression("d", "a*(b+2 + a*b)")
                         .columnByMetadata("e", DataTypes.INT(), null, true)
                         .column("f", DataTypes.TIMESTAMP(3).notNull())
@@ -2183,6 +2296,7 @@ public class SqlToOperationConverterTest {
                         .column("a", DataTypes.INT().notNull())
                         .column("b", DataTypes.BIGINT().notNull())
                         .column("c", DataTypes.STRING().notNull())
+                        .withComment("column comment")
                         .columnByExpression("d", "a*(b+2 + a*b)")
                         .column(
                                 "e",
@@ -2670,7 +2784,8 @@ public class SqlToOperationConverterTest {
                 Schema.newBuilder()
                         .column("a", DataTypes.INT().notNull())
                         .column("b", DataTypes.BIGINT().notNull())
-                        .column("c", DataTypes.STRING())
+                        .column("c", DataTypes.STRING().notNull())
+                        .withComment("column comment")
                         .columnByExpression("d", "a*(b+2 + a*b)")
                         .column(
                                 "e",
@@ -2689,10 +2804,17 @@ public class SqlToOperationConverterTest {
         if (!managedTable) {
             options.put("connector", "dummy");
         }
-        if (numOfPkFields == 1) {
+        if (numOfPkFields == 0) {
+            // do nothing
+        } else if (numOfPkFields == 1) {
             builder.primaryKeyNamed("ct1", "a");
         } else if (numOfPkFields == 2) {
             builder.primaryKeyNamed("ct1", "a", "b");
+        } else if (numOfPkFields == 3) {
+            builder.primaryKeyNamed("ct1", "a", "b", "c");
+        } else {
+            throw new IllegalArgumentException(
+                    String.format("Don't support to set pk with %s fields.", numOfPkFields));
         }
 
         if (hasWatermark) {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index d9a7e4cdd27..a78bb5a07cc 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -50,7 +50,7 @@ import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
 import org.junit.rules.{ExpectedException, TemporaryFolder}
 
 import java.io.File
-import java.util.UUID
+import java.util.{Collections, UUID}
 
 import scala.annotation.meta.getter
 
@@ -796,6 +796,28 @@ class TableEnvironmentTest {
     checkData(expectedResult.iterator(), tableResult.collect())
   }
 
+  @Test
+  def testAlterRenameColumn(): Unit = {
+    tableEnv.executeSql("""
+                          |CREATE TABLE MyTable (
+                          | a bigint
+                          |) WITH (
+                          |  'connector' = 'COLLECTION',
+                          |  'is-bounded' = 'false'
+                          |)
+                          |""".stripMargin)
+    tableEnv.executeSql("""
+                          |ALTER TABLE MyTable RENAME a TO b
+                          |""".stripMargin)
+    val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+    checkData(
+      Collections
+        .singletonList(Row.of("b", "BIGINT", Boolean.box(true), null, null, null))
+        .iterator(),
+      tableResult.collect())
+  }
+
   @Test
   def testAlterTableCompactOnNonManagedTable(): Unit = {
     val statement =