You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/26 09:38:54 UTC

[GitHub] [flink] fsk119 commented on a diff in pull request #20652: [FLINK-22315][table] Support add/modify column/constraint/watermark for ALTER TABLE statement

fsk119 commented on code in PR #20652:
URL: https://github.com/apache/flink/pull/20652#discussion_r1005314845


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -408,6 +408,336 @@ class TableEnvironmentTest {
       tableEnv.executeSql("explain plan for select * from MyTable where a > 10").getResultKind)
   }
 
+  @Test
+  def testAlterTableAddColumn(): Unit = {
+    val statement =
+      """
+        |CREATE TABLE MyTable (
+        |  a BIGINT,
+        |  b INT PRIMARY KEY NOT ENFORCED,
+        |  c STRING METADATA VIRTUAL,
+        |  d TIMESTAMP(3)
+        |) WITH (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(statement)
+
+    assertThatThrownBy(() => tableEnv.executeSql("""
+                                                   |ALTER TABLE MyTable ADD (
+                                                   |  PRIMARY KEY (a) NOT ENFORCED
+                                                   |)
+                                                   |""".stripMargin))
+      .hasMessageContaining("The base table already has a primary key [b]. " +
+        "You might want to drop it before adding a new one.")
+      .isInstanceOf(classOf[ValidationException])

Review Comment:
   isInstanceOf[ValidationException]



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java:
##########
@@ -1466,6 +1476,441 @@ public void testAlterTableCompactOnManagedPartitionedTable() throws Exception {
                 parse("alter table tb1 compact", SqlDialect.DEFAULT), staticPartitions);
     }
 
+    @Test
+    public void testAlterTableAddColumn() throws Exception {
+        prepareNonManagedTable(false);
+
+        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", "tb1");
+        Schema originalSchema =
+                catalogManager.getTable(tableIdentifier).get().getTable().getUnresolvedSchema();
+
+        // test duplicated column name
+        assertThatThrownBy(() -> parse("alter table tb1 add a bigint", SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Try to add a column 'a' which already exists in the table.");
+
+        assertThatThrownBy(
+                        () -> parse("alter table tb1 add (d bigint, d string)", SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Try to add a column 'd' which already exists in the table.");
+
+        // test reference nonexistent column name
+        assertThatThrownBy(() -> parse("alter table tb1 add x bigint after y", SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Referenced column 'y' by 'AFTER' does not exist in the table.");
+
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add (x bigint after y, y string)",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Referenced column 'y' by 'AFTER' does not exist in the table.");
+
+        // test add a single column
+        Operation operation =
+                parse(
+                        "alter table tb1 add d double not null comment 'd is double not null'",
+                        SqlDialect.DEFAULT);
+        assertAlterTableSchema(
+                operation,
+                tableIdentifier,
+                Schema.newBuilder()
+                        .fromSchema(originalSchema)
+                        .column("d", DataTypes.DOUBLE().notNull())
+                        .withComment("d is double not null")
+                        .build());
+
+        // test add multiple columns with pk
+        operation =
+                parse(
+                        "alter table tb1 add (\n"
+                                + " e as upper(a) first,\n"
+                                + " f as b*2 after e,\n"
+                                + " g int metadata from 'mk1' virtual comment 'comment_metadata' first,\n"
+                                + " h string primary key not enforced after a)",
+                        SqlDialect.DEFAULT);
+
+        assertAlterTableSchema(
+                operation,
+                tableIdentifier,
+                Schema.newBuilder()
+                        .columnByMetadata("g", DataTypes.INT(), "mk1", true)
+                        .withComment("comment_metadata")
+                        .columnByExpression("e", new SqlCallExpression("UPPER(`a`)"))
+                        .columnByExpression("f", new SqlCallExpression("`b` * 2"))
+                        .column("a", DataTypes.STRING().notNull())
+                        .column("h", DataTypes.STRING().notNull())
+                        .column("b", DataTypes.BIGINT().notNull())
+                        .column("c", DataTypes.BIGINT())
+                        .primaryKey("h")
+                        .build());
+
+        // test add nested type
+        operation =
+                parse(
+                        "alter table tb1 add (\n"
+                                + " r row<r1 bigint, r2 string, r3 array<double> not null> not null comment 'add composite type',\n"
+                                + " m map<string not null, int not null>,\n"
+                                + " g as r.r1 * 2 after r,\n"
+                                + " ts as to_timestamp(r.r2) comment 'rowtime' after g,\n"
+                                + " na as r.r3 after ts)",
+                        SqlDialect.DEFAULT);
+        assertAlterTableSchema(
+                operation,
+                tableIdentifier,
+                Schema.newBuilder()
+                        .fromSchema(originalSchema)
+                        .column(
+                                "r",
+                                DataTypes.ROW(
+                                                DataTypes.FIELD("r1", DataTypes.BIGINT()),
+                                                DataTypes.FIELD("r2", DataTypes.STRING()),
+                                                DataTypes.FIELD(
+                                                        "r3",
+                                                        DataTypes.ARRAY(DataTypes.DOUBLE())
+                                                                .notNull()))
+                                        .notNull())
+                        .withComment("add composite type")
+                        .columnByExpression("g", "`r`.`r1` * 2")
+                        .columnByExpression("ts", "TO_TIMESTAMP(`r`.`r2`)")
+                        .withComment("rowtime")
+                        .columnByExpression("na", "`r`.`r3`")
+                        .column(
+                                "m",
+                                DataTypes.MAP(
+                                        DataTypes.STRING().notNull(), DataTypes.INT().notNull()))
+                        .build());
+    }
+
+    @Test
+    public void testAlterTableAddPkOnPrimaryKeyedTable() throws Exception {
+        prepareNonManagedTable(true);
+
+        assertThatThrownBy(() -> parse("alter table tb1 add unique(a)", SqlDialect.DEFAULT))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("UNIQUE constraint is not supported yet");
+
+        assertThatThrownBy(() -> parse("alter table tb1 add primary key(c)", SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Flink doesn't support ENFORCED mode for PRIMARY KEY constraint");
+
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add primary key(c) not enforced",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The base table already has a primary key [a, b]. You might want to drop it before adding a new one");
+
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add d string not null primary key not enforced",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The base table already has a primary key [a, b]. You might want to drop it before adding a new one");
+    }
+
+    @Test
+    public void testAlterTableSchemaAddPrimaryKey() throws Exception {
+        prepareNonManagedTable(false);
+
+        // test add a composite pk which contains computed column
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add (\n"
+                                                + "  d as upper(a),\n"
+                                                + "  primary key (b, d) not enforced)",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Could not create a PRIMARY KEY with column 'd' at line 3, column 19.\n"
+                                + "A PRIMARY KEY constraint must be declared on physical columns.");
+
+        // test add a pk which is metadata column
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add (e int metadata, primary key (e) not enforced)",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Could not create a PRIMARY KEY with column 'e' at line 1, column 51.\n"
+                                + "A PRIMARY KEY constraint must be declared on physical columns.");
+
+        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", "tb1");
+        Schema originalSchema =
+                catalogManager.getTable(tableIdentifier).get().getTable().getUnresolvedSchema();
+        Operation operation =
+                parse(
+                        "alter table tb1 add constraint my_pk primary key (a, b) not enforced",
+                        SqlDialect.DEFAULT);
+
+        assertAlterTableSchema(
+                operation,
+                tableIdentifier,
+                Schema.newBuilder()
+                        .fromSchema(originalSchema)
+                        .primaryKeyNamed("my_pk", "a", "b")
+                        .build());
+
+        operation =
+                parse(
+                        "alter table tb1 add d bigint not null primary key not enforced",
+                        SqlDialect.DEFAULT);
+        assertAlterTableSchema(
+                operation,
+                tableIdentifier,
+                Schema.newBuilder()
+                        .fromSchema(originalSchema)
+                        .column("d", DataTypes.BIGINT().notNull())
+                        .primaryKey("d")
+                        .build());
+
+        // test implicit nullability conversion
+        operation =
+                parse("alter table tb1 add d bigint primary key not enforced", SqlDialect.DEFAULT);
+        assertAlterTableSchema(
+                operation,
+                tableIdentifier,
+                Schema.newBuilder()
+                        .fromSchema(originalSchema)
+                        .column("d", DataTypes.BIGINT().notNull())
+                        .primaryKey("d")
+                        .build());
+    }
+
+    @Test
+    public void testAlterTableSchemaAddWatermark() throws Exception {
+        prepareNonManagedTable(false);
+
+        // test add watermark with an undefined column as rowtime
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add watermark for ts as ts",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The rowtime attribute field 'ts' is not defined in the table schema, at line 1, column 35\n"
+                                + "Available fields: ['a', 'b', 'c']");
+
+        // test add watermark with an undefined nested column as rowtime
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add (d row<d1 string, ts timestamp(3)>, watermark for d.ts as d.ts)",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The nested rowtime attribute field 'd.ts' cannot define a watermark.");
+
+        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", "tb1");
+        Schema originalSchema =
+                catalogManager.getTable(tableIdentifier).get().getTable().getUnresolvedSchema();
+
+        // test add watermark with new added physical column as rowtime
+        Operation operation =
+                parse(
+                        "alter table tb1 add (ts timestamp(3) not null, watermark for ts as ts)",
+                        SqlDialect.DEFAULT);
+
+        assertAlterTableSchema(
+                operation,
+                tableIdentifier,
+                Schema.newBuilder()
+                        .fromSchema(originalSchema)
+                        .column("ts", DataTypes.TIMESTAMP(3).notNull())
+                        .watermark("ts", "`ts`")
+                        .build());
+
+        // test add watermark with new added computed column as rowtime
+        operation =
+                parse(
+                        "alter table tb1 add (log_ts string, ts as to_timestamp(log_ts), watermark for ts as ts - interval '3' second)",
+                        SqlDialect.DEFAULT);
+
+        assertAlterTableSchema(
+                operation,
+                tableIdentifier,
+                Schema.newBuilder()
+                        .fromSchema(originalSchema)
+                        .column("log_ts", DataTypes.STRING())
+                        .columnByExpression("ts", "TO_TIMESTAMP(`log_ts`)")
+                        .watermark("ts", "`ts` - INTERVAL '3' SECOND")
+                        .build());
+
+        // test add watermark to the table which already has watermark defined
+        prepareTableWithWatermark();
+
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb2 add watermark for ts as ts",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "There already exists a watermark spec for column '[ts]' in the base table. "
+                                + "You might want to drop it before adding a new one.");
+    }
+
+    @Test
+    public void testAlterTableSchemaModifyColumn() throws Exception {

Review Comment:
   Take a look at this case
   
   ```
    operation =
                   parse(
                           "alter table tb1 modify (a row<d int>, c as a.d + 1, a int)",
                           SqlDialect.DEFAULT);
   ```
   
   I think we should fail. 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterTableSchemaUtil.java:
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableModify;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlWatermark;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Util to convert {@link SqlAlterTableSchema} with source table to generate new {@link Schema}. */
+public class AlterTableSchemaUtil {
+
+    private final SqlValidator sqlValidator;
+    private final Consumer<SqlTableConstraint> validateTableConstraint;
+    private final Function<SqlNode, String> escapeExpression;
+
+    AlterTableSchemaUtil(
+            SqlValidator sqlValidator,
+            Function<SqlNode, String> escapeExpression,
+            Consumer<SqlTableConstraint> validateTableConstraint) {
+        this.sqlValidator = sqlValidator;
+        this.validateTableConstraint = validateTableConstraint;
+        this.escapeExpression = escapeExpression;
+    }
+
+    public Schema convertSchema(
+            SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable originalTable) {
+        UnresolvedSchemaBuilder builder =
+                new UnresolvedSchemaBuilder(
+                        originalTable,
+                        (FlinkTypeFactory) sqlValidator.getTypeFactory(),
+                        sqlValidator,
+                        validateTableConstraint,
+                        escapeExpression);
+        AlterSchemaStrategy strategy = computeAlterSchemaStrategy(alterTableSchema);
+        List<SqlNode> columnPositions = alterTableSchema.getColumnPositions().getList();
+        builder.addOrModifyColumns(strategy, columnPositions);
+        alterTableSchema
+                .getWatermark()
+                .ifPresent(sqlWatermark -> builder.addOrModifyWatermarks(strategy, sqlWatermark));
+        alterTableSchema
+                .getFullConstraint()
+                .ifPresent(
+                        (pk) ->
+                                builder.addOrModifyPrimaryKey(
+                                        strategy, pk, columnPositions.isEmpty()));
+        return builder.build();
+    }
+
+    private static class UnresolvedSchemaBuilder {
+
+        List<String> sortedColumnNames = new ArrayList<>();
+        Map<String, Schema.UnresolvedColumn> columns = new HashMap<>();
+        Map<String, Schema.UnresolvedWatermarkSpec> watermarkSpecs = new HashMap<>();
+        @Nullable Schema.UnresolvedPrimaryKey primaryKey = null;
+
+        // Intermediate state
+        Map<String, RelDataType> physicalFieldNamesToTypes = new HashMap<>();
+        Map<String, RelDataType> metadataFieldNamesToTypes = new HashMap<>();
+        Map<String, RelDataType> computedFieldNamesToTypes = new HashMap<>();
+
+        Function<SqlNode, String> escapeExpressions;
+        FlinkTypeFactory typeFactory;
+        SqlValidator sqlValidator;
+        Consumer<SqlTableConstraint> validateTableConstraint;
+
+        UnresolvedSchemaBuilder(
+                ResolvedCatalogTable sourceTable,
+                FlinkTypeFactory typeFactory,
+                SqlValidator sqlValidator,
+                Consumer<SqlTableConstraint> validateTableConstraint,
+                Function<SqlNode, String> escapeExpressions) {
+            this.typeFactory = typeFactory;
+            this.sqlValidator = sqlValidator;
+            this.escapeExpressions = escapeExpressions;
+            this.validateTableConstraint = validateTableConstraint;
+            populateColumnsFromSourceTable(sourceTable);
+            populatePrimaryKeyFromSourceTable(sourceTable.getUnresolvedSchema());
+            populateWatermarksFromSourceTable(sourceTable.getUnresolvedSchema());
+        }
+
+        private void populateColumnsFromSourceTable(ResolvedCatalogTable sourceTable) {
+            List<DataType> types = sourceTable.getResolvedSchema().getColumnDataTypes();
+            List<Schema.UnresolvedColumn> sourceColumns =
+                    sourceTable.getUnresolvedSchema().getColumns();
+            for (int i = 0; i < sourceColumns.size(); i++) {
+                Schema.UnresolvedColumn column = sourceColumns.get(i);
+                String columnName = column.getName();
+                sortedColumnNames.add(columnName);
+                columns.put(columnName, column);
+                RelDataType type =
+                        typeFactory.createFieldTypeFromLogicalType(types.get(i).getLogicalType());
+                if (column instanceof Schema.UnresolvedPhysicalColumn) {
+                    physicalFieldNamesToTypes.put(columnName, type);
+                } else if (column instanceof Schema.UnresolvedComputedColumn) {
+                    computedFieldNamesToTypes.put(columnName, type);
+                } else if (column instanceof Schema.UnresolvedMetadataColumn) {
+                    metadataFieldNamesToTypes.put(columnName, type);
+                }
+            }
+        }
+
+        private void populatePrimaryKeyFromSourceTable(Schema sourceSchema) {
+            if (sourceSchema.getPrimaryKey().isPresent()) {
+                primaryKey = sourceSchema.getPrimaryKey().get();
+            }
+        }
+
+        private void populateWatermarksFromSourceTable(Schema sourceSchema) {
+            for (Schema.UnresolvedWatermarkSpec sourceWatermarkSpec :
+                    sourceSchema.getWatermarkSpecs()) {
+                watermarkSpecs.put(sourceWatermarkSpec.getColumnName(), sourceWatermarkSpec);
+            }
+        }
+
+        private void addOrModifyColumns(
+                AlterSchemaStrategy strategy, List<SqlNode> alterColumnPositions) {
+            collectColumnPosition(strategy, alterColumnPositions);
+            for (SqlNode alterColumnPos : alterColumnPositions) {
+                SqlTableColumn alterColumn = ((SqlTableColumnPosition) alterColumnPos).getColumn();
+                if (alterColumn instanceof SqlTableColumn.SqlComputedColumn) {
+                    convertComputedColumn((SqlTableColumn.SqlComputedColumn) alterColumn);
+                } else {
+                    convertNonComputedColumn(alterColumn);
+                }
+            }
+        }
+
+        private void addOrModifyPrimaryKey(
+                AlterSchemaStrategy strategy,
+                SqlTableConstraint alterPrimaryKey,
+                boolean withoutAlterColumn) {
+            validateTableConstraint.accept(alterPrimaryKey);
+            if (strategy == AlterSchemaStrategy.ADD && primaryKey != null) {
+                throw new ValidationException(
+                        String.format(
+                                "The base table already has a primary key %s. You might "
+                                        + "want to drop it before adding a new one.",
+                                primaryKey.getColumnNames()));
+            } else if (strategy == AlterSchemaStrategy.MODIFY && primaryKey == null) {
+                throw new ValidationException(
+                        "The base table does not define any primary key. You might "
+                                + "want to add a new one.");
+            }
+            List<String> primaryKeyColumns = new ArrayList<>();
+            for (SqlNode primaryKeyNode : alterPrimaryKey.getColumns()) {
+                String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple();
+                if (!columns.containsKey(primaryKey)) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Primary key column '%s' is not defined in the schema at %s",
+                                    primaryKey, primaryKeyNode.getParserPosition()));
+                }
+                if (!(columns.get(primaryKey) instanceof Schema.UnresolvedPhysicalColumn)) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Could not create a PRIMARY KEY with column '%s' at %s.\n"
+                                            + "A PRIMARY KEY constraint must be declared on physical columns.",
+                                    primaryKey, primaryKeyNode.getParserPosition()));
+                }
+                primaryKeyColumns.add(primaryKey);
+            }
+            if (withoutAlterColumn) {
+                // a single add/modify constraint changes the nullability of columns implicitly
+                for (String primaryKeyColumn : primaryKeyColumns) {
+                    RelDataType originalType = physicalFieldNamesToTypes.get(primaryKeyColumn);
+                    DataType alterDataType =
+                            TypeConversions.fromLogicalToDataType(
+                                    FlinkTypeFactory.toLogicalType(
+                                            typeFactory.createTypeWithNullability(
+                                                    originalType, false)));
+                    Schema.UnresolvedColumn column = columns.remove(primaryKeyColumn);
+                    columns.put(
+                            primaryKeyColumn,
+                            column(
+                                    primaryKeyColumn,
+                                    alterDataType,
+                                    column.getComment().orElse(null)));
+                }
+            }
+            primaryKey =
+                    primaryKeyNamed(
+                            alterPrimaryKey
+                                    .getConstraintName()
+                                    .orElseGet(
+                                            () ->
+                                                    primaryKeyColumns.stream()
+                                                            .collect(
+                                                                    Collectors.joining(
+                                                                            "_", "PK_", ""))),
+                            primaryKeyColumns);
+        }
+
+        private void addOrModifyWatermarks(
+                AlterSchemaStrategy strategy, SqlWatermark alterWatermarkSpec) {
+            SqlIdentifier eventTimeColumnName = alterWatermarkSpec.getEventTimeColumnName();
+
+            Map<String, RelDataType> nameToTypeMap = new HashMap<>();
+            nameToTypeMap.putAll(physicalFieldNamesToTypes);
+            nameToTypeMap.putAll(metadataFieldNamesToTypes);
+            nameToTypeMap.putAll(computedFieldNamesToTypes);
+
+            verifyRowtimeAttribute(strategy, eventTimeColumnName, nameToTypeMap);
+            String rowtimeAttribute = eventTimeColumnName.toString();
+
+            SqlNode expression = alterWatermarkSpec.getWatermarkStrategy();
+
+            // this will validate and expand function identifiers
+            SqlNode validated =
+                    sqlValidator.validateParameterizedExpression(expression, nameToTypeMap);
+            watermarkSpecs.clear();
+            watermarkSpecs.put(
+                    rowtimeAttribute,
+                    watermarkSpec(
+                            rowtimeAttribute,
+                            new SqlCallExpression(escapeExpressions.apply(validated))));
+        }
+
+        private void convertNonComputedColumn(SqlTableColumn column) {
+            boolean isPhysical = column instanceof SqlTableColumn.SqlRegularColumn;
+            String name = column.getName().getSimple();
+            SqlDataTypeSpec typeSpec =
+                    isPhysical
+                            ? ((SqlTableColumn.SqlRegularColumn) column).getType()
+                            : ((SqlTableColumn.SqlMetadataColumn) column).getType();
+            boolean nullable = typeSpec.getNullable() == null || typeSpec.getNullable();
+            LogicalType logicalType =
+                    FlinkTypeFactory.toLogicalType(typeSpec.deriveType(sqlValidator, nullable));
+            DataType dataType = TypeConversions.fromLogicalToDataType(logicalType);
+            RelDataType relType = typeSpec.deriveType(sqlValidator, nullable);
+            Schema.UnresolvedColumn newColumn;
+            if (isPhysical) {
+                newColumn = column(name, dataType, getComment(column));
+                physicalFieldNamesToTypes.put(name, relType);
+            } else {
+                newColumn =
+                        columnByMetadata(
+                                name,
+                                dataType,
+                                ((SqlTableColumn.SqlMetadataColumn) column)
+                                        .getMetadataAlias()
+                                        .orElse(null),
+                                ((SqlTableColumn.SqlMetadataColumn) column).isVirtual(),
+                                getComment(column));
+                metadataFieldNamesToTypes.put(name, relType);
+            }
+            columns.put(name, newColumn);
+        }
+
+        private void convertComputedColumn(SqlTableColumn.SqlComputedColumn column) {
+            String name = column.getName().getSimple();
+            Map<String, RelDataType> accessibleFieldNamesToTypes = new HashMap<>();
+            accessibleFieldNamesToTypes.putAll(physicalFieldNamesToTypes);
+            accessibleFieldNamesToTypes.putAll(metadataFieldNamesToTypes);
+
+            // computed column cannot be generated on another computed column
+            final SqlNode validatedExpr =
+                    sqlValidator.validateParameterizedExpression(
+                            column.getExpr(), accessibleFieldNamesToTypes);
+            final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr);
+
+            Schema.UnresolvedColumn newColumn =
+                    columnByExpression(
+                            name, escapeExpressions.apply(validatedExpr), getComment(column));
+            computedFieldNamesToTypes.put(name, validatedType);
+            columns.put(name, newColumn);
+        }
+
+        private void verifyRowtimeAttribute(
+                AlterSchemaStrategy strategy,
+                SqlIdentifier eventTimeColumnName,
+                Map<String, RelDataType> allFieldsTypes) {
+            if (!watermarkSpecs.isEmpty() && strategy == AlterSchemaStrategy.ADD) {
+                throw new ValidationException(
+                        String.format(
+                                "There already exists a watermark spec for column '%s' in the base table. You "
+                                        + "might want to drop it before adding a new one.",
+                                watermarkSpecs.keySet()));
+            } else if (watermarkSpecs.isEmpty() && strategy == AlterSchemaStrategy.MODIFY) {
+                throw new ValidationException(
+                        "There is no watermark defined in the base table. You might want to add a new one.");
+            }
+            if (!eventTimeColumnName.isSimple()) {
+                throw new ValidationException(
+                        String.format(
+                                "The nested rowtime attribute field '%s' cannot define a watermark.",
+                                eventTimeColumnName));
+            }
+            String rowtimeField = eventTimeColumnName.getSimple();
+            List<String> components = eventTimeColumnName.names;
+            if (!allFieldsTypes.containsKey(components.get(0))) {
+                throw new ValidationException(
+                        String.format(
+                                "The rowtime attribute field '%s' is not defined in the table schema, at %s\n"
+                                        + "Available fields: [%s]",
+                                rowtimeField,
+                                eventTimeColumnName.getParserPosition(),
+                                allFieldsTypes.keySet().stream()
+                                        .collect(Collectors.joining("', '", "'", "'"))));
+            }
+        }
+
+        @Nullable
+        private String getComment(SqlTableColumn column) {
+            return column.getComment()
+                    .map(SqlCharStringLiteral.class::cast)
+                    .map(c -> c.getValueAs(String.class))
+                    .orElse(null);
+        }
+
+        private void collectColumnPosition(
+                AlterSchemaStrategy strategy, List<SqlNode> alterColumns) {
+            for (SqlNode alterColumn : alterColumns) {
+                assert alterColumn instanceof SqlTableColumnPosition;
+                SqlTableColumnPosition columnPosition = (SqlTableColumnPosition) alterColumn;
+                SqlTableColumn column = columnPosition.getColumn();
+                String name = column.getName().getSimple();
+                boolean existed = sortedColumnNames.contains(name);
+                boolean first = columnPosition.isFirstColumn();
+                boolean after = columnPosition.isAfterReferencedColumn();
+                if (strategy == AlterSchemaStrategy.ADD) {
+                    if (existed) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Try to add a column '%s' which already exists in the table.",
+                                        name));
+                    }
+                    if (first) {
+                        sortedColumnNames.add(0, name);
+                    } else if (after) {
+                        getReferencedColumn(
+                                        columnPosition,
+                                        (refName) -> sortedColumnNames.contains(refName))
+                                .ifPresent(
+                                        (refCol) ->
+                                                sortedColumnNames.add(
+                                                        sortedColumnNames.indexOf(refCol) + 1,
+                                                        name));
+                    } else {
+                        sortedColumnNames.add(name);
+                    }
+                } else if (strategy == AlterSchemaStrategy.MODIFY) {
+                    if (!existed) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Try to modify a column '%s' which does not exist in the table.",
+                                        name));
+                    }
+                    if (first) {
+                        sortedColumnNames.remove(name);
+                        sortedColumnNames.add(0, name);
+                    } else if (after) {
+                        getReferencedColumn(
+                                        columnPosition, (refName) -> columns.containsKey(refName))
+                                .ifPresent(
+                                        (refCol) -> {
+                                            sortedColumnNames.remove(name);
+                                            sortedColumnNames.add(
+                                                    sortedColumnNames.indexOf(refCol) + 1, name);
+                                        });
+                    }
+                }
+            }
+        }
+
+        private Optional<String> getReferencedColumn(

Review Comment:
   nit: I think we can return `String` rather than`Optional`.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java:
##########
@@ -1466,6 +1476,441 @@ public void testAlterTableCompactOnManagedPartitionedTable() throws Exception {
                 parse("alter table tb1 compact", SqlDialect.DEFAULT), staticPartitions);
     }
 
+    @Test
+    public void testAlterTableAddColumn() throws Exception {
+        prepareNonManagedTable(false);
+
+        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", "tb1");
+        Schema originalSchema =
+                catalogManager.getTable(tableIdentifier).get().getTable().getUnresolvedSchema();
+
+        // test duplicated column name
+        assertThatThrownBy(() -> parse("alter table tb1 add a bigint", SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Try to add a column 'a' which already exists in the table.");
+
+        assertThatThrownBy(
+                        () -> parse("alter table tb1 add (d bigint, d string)", SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Try to add a column 'd' which already exists in the table.");

Review Comment:
   The error message may mislead users. Actually column `d` is not in the table.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java:
##########
@@ -1466,6 +1476,441 @@ public void testAlterTableCompactOnManagedPartitionedTable() throws Exception {
                 parse("alter table tb1 compact", SqlDialect.DEFAULT), staticPartitions);
     }
 
+    @Test
+    public void testAlterTableAddColumn() throws Exception {

Review Comment:
   can we add column after nested column? The current error message is not easy for others to understand.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterTableSchemaUtil.java:
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableModify;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlWatermark;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Util to convert {@link SqlAlterTableSchema} with source table to generate new {@link Schema}. */
+public class AlterTableSchemaUtil {
+
+    private final SqlValidator sqlValidator;
+    private final Consumer<SqlTableConstraint> validateTableConstraint;
+    private final Function<SqlNode, String> escapeExpression;
+
+    AlterTableSchemaUtil(
+            SqlValidator sqlValidator,
+            Function<SqlNode, String> escapeExpression,
+            Consumer<SqlTableConstraint> validateTableConstraint) {
+        this.sqlValidator = sqlValidator;
+        this.validateTableConstraint = validateTableConstraint;
+        this.escapeExpression = escapeExpression;
+    }
+
+    public Schema convertSchema(
+            SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable originalTable) {
+        UnresolvedSchemaBuilder builder =
+                new UnresolvedSchemaBuilder(
+                        originalTable,
+                        (FlinkTypeFactory) sqlValidator.getTypeFactory(),
+                        sqlValidator,
+                        validateTableConstraint,
+                        escapeExpression);
+        AlterSchemaStrategy strategy = computeAlterSchemaStrategy(alterTableSchema);
+        List<SqlNode> columnPositions = alterTableSchema.getColumnPositions().getList();
+        builder.addOrModifyColumns(strategy, columnPositions);
+        alterTableSchema
+                .getWatermark()
+                .ifPresent(sqlWatermark -> builder.addOrModifyWatermarks(strategy, sqlWatermark));
+        alterTableSchema
+                .getFullConstraint()
+                .ifPresent(
+                        (pk) ->
+                                builder.addOrModifyPrimaryKey(
+                                        strategy, pk, columnPositions.isEmpty()));
+        return builder.build();
+    }
+
+    private static class UnresolvedSchemaBuilder {

Review Comment:
   Rename to SchemaConverter? Currently we only have `Schema` and `ResolvedSchema` in Flink. It's better we don't introduce a new concept.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterTableSchemaUtil.java:
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableModify;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlWatermark;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Util to convert {@link SqlAlterTableSchema} with source table to generate new {@link Schema}. */
+public class AlterTableSchemaUtil {
+
+    private final SqlValidator sqlValidator;
+    private final Consumer<SqlTableConstraint> validateTableConstraint;
+    private final Function<SqlNode, String> escapeExpression;
+
+    AlterTableSchemaUtil(
+            SqlValidator sqlValidator,
+            Function<SqlNode, String> escapeExpression,
+            Consumer<SqlTableConstraint> validateTableConstraint) {
+        this.sqlValidator = sqlValidator;
+        this.validateTableConstraint = validateTableConstraint;
+        this.escapeExpression = escapeExpression;
+    }
+
+    public Schema convertSchema(
+            SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable originalTable) {
+        UnresolvedSchemaBuilder builder =
+                new UnresolvedSchemaBuilder(
+                        originalTable,
+                        (FlinkTypeFactory) sqlValidator.getTypeFactory(),
+                        sqlValidator,
+                        validateTableConstraint,
+                        escapeExpression);
+        AlterSchemaStrategy strategy = computeAlterSchemaStrategy(alterTableSchema);
+        List<SqlNode> columnPositions = alterTableSchema.getColumnPositions().getList();
+        builder.addOrModifyColumns(strategy, columnPositions);
+        alterTableSchema
+                .getWatermark()
+                .ifPresent(sqlWatermark -> builder.addOrModifyWatermarks(strategy, sqlWatermark));
+        alterTableSchema
+                .getFullConstraint()
+                .ifPresent(
+                        (pk) ->
+                                builder.addOrModifyPrimaryKey(
+                                        strategy, pk, columnPositions.isEmpty()));
+        return builder.build();
+    }
+
+    private static class UnresolvedSchemaBuilder {
+
+        List<String> sortedColumnNames = new ArrayList<>();
+        Map<String, Schema.UnresolvedColumn> columns = new HashMap<>();
+        Map<String, Schema.UnresolvedWatermarkSpec> watermarkSpecs = new HashMap<>();
+        @Nullable Schema.UnresolvedPrimaryKey primaryKey = null;
+
+        // Intermediate state
+        Map<String, RelDataType> physicalFieldNamesToTypes = new HashMap<>();
+        Map<String, RelDataType> metadataFieldNamesToTypes = new HashMap<>();
+        Map<String, RelDataType> computedFieldNamesToTypes = new HashMap<>();
+
+        Function<SqlNode, String> escapeExpressions;
+        FlinkTypeFactory typeFactory;
+        SqlValidator sqlValidator;
+        Consumer<SqlTableConstraint> validateTableConstraint;
+
+        UnresolvedSchemaBuilder(
+                ResolvedCatalogTable sourceTable,
+                FlinkTypeFactory typeFactory,
+                SqlValidator sqlValidator,
+                Consumer<SqlTableConstraint> validateTableConstraint,
+                Function<SqlNode, String> escapeExpressions) {
+            this.typeFactory = typeFactory;
+            this.sqlValidator = sqlValidator;
+            this.escapeExpressions = escapeExpressions;
+            this.validateTableConstraint = validateTableConstraint;
+            populateColumnsFromSourceTable(sourceTable);
+            populatePrimaryKeyFromSourceTable(sourceTable.getUnresolvedSchema());
+            populateWatermarksFromSourceTable(sourceTable.getUnresolvedSchema());
+        }
+
+        private void populateColumnsFromSourceTable(ResolvedCatalogTable sourceTable) {
+            List<DataType> types = sourceTable.getResolvedSchema().getColumnDataTypes();
+            List<Schema.UnresolvedColumn> sourceColumns =
+                    sourceTable.getUnresolvedSchema().getColumns();
+            for (int i = 0; i < sourceColumns.size(); i++) {
+                Schema.UnresolvedColumn column = sourceColumns.get(i);
+                String columnName = column.getName();
+                sortedColumnNames.add(columnName);
+                columns.put(columnName, column);
+                RelDataType type =
+                        typeFactory.createFieldTypeFromLogicalType(types.get(i).getLogicalType());
+                if (column instanceof Schema.UnresolvedPhysicalColumn) {
+                    physicalFieldNamesToTypes.put(columnName, type);
+                } else if (column instanceof Schema.UnresolvedComputedColumn) {
+                    computedFieldNamesToTypes.put(columnName, type);
+                } else if (column instanceof Schema.UnresolvedMetadataColumn) {
+                    metadataFieldNamesToTypes.put(columnName, type);
+                }
+            }
+        }
+
+        private void populatePrimaryKeyFromSourceTable(Schema sourceSchema) {
+            if (sourceSchema.getPrimaryKey().isPresent()) {
+                primaryKey = sourceSchema.getPrimaryKey().get();
+            }
+        }
+
+        private void populateWatermarksFromSourceTable(Schema sourceSchema) {
+            for (Schema.UnresolvedWatermarkSpec sourceWatermarkSpec :
+                    sourceSchema.getWatermarkSpecs()) {
+                watermarkSpecs.put(sourceWatermarkSpec.getColumnName(), sourceWatermarkSpec);
+            }
+        }
+
+        private void addOrModifyColumns(
+                AlterSchemaStrategy strategy, List<SqlNode> alterColumnPositions) {
+            collectColumnPosition(strategy, alterColumnPositions);
+            for (SqlNode alterColumnPos : alterColumnPositions) {
+                SqlTableColumn alterColumn = ((SqlTableColumnPosition) alterColumnPos).getColumn();
+                if (alterColumn instanceof SqlTableColumn.SqlComputedColumn) {
+                    convertComputedColumn((SqlTableColumn.SqlComputedColumn) alterColumn);
+                } else {
+                    convertNonComputedColumn(alterColumn);
+                }
+            }
+        }
+
+        private void addOrModifyPrimaryKey(
+                AlterSchemaStrategy strategy,
+                SqlTableConstraint alterPrimaryKey,
+                boolean withoutAlterColumn) {
+            validateTableConstraint.accept(alterPrimaryKey);
+            if (strategy == AlterSchemaStrategy.ADD && primaryKey != null) {
+                throw new ValidationException(
+                        String.format(
+                                "The base table already has a primary key %s. You might "
+                                        + "want to drop it before adding a new one.",
+                                primaryKey.getColumnNames()));
+            } else if (strategy == AlterSchemaStrategy.MODIFY && primaryKey == null) {
+                throw new ValidationException(
+                        "The base table does not define any primary key. You might "
+                                + "want to add a new one.");
+            }
+            List<String> primaryKeyColumns = new ArrayList<>();
+            for (SqlNode primaryKeyNode : alterPrimaryKey.getColumns()) {
+                String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple();
+                if (!columns.containsKey(primaryKey)) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Primary key column '%s' is not defined in the schema at %s",
+                                    primaryKey, primaryKeyNode.getParserPosition()));
+                }
+                if (!(columns.get(primaryKey) instanceof Schema.UnresolvedPhysicalColumn)) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Could not create a PRIMARY KEY with column '%s' at %s.\n"
+                                            + "A PRIMARY KEY constraint must be declared on physical columns.",
+                                    primaryKey, primaryKeyNode.getParserPosition()));
+                }
+                primaryKeyColumns.add(primaryKey);
+            }
+            if (withoutAlterColumn) {
+                // a single add/modify constraint changes the nullability of columns implicitly
+                for (String primaryKeyColumn : primaryKeyColumns) {
+                    RelDataType originalType = physicalFieldNamesToTypes.get(primaryKeyColumn);
+                    DataType alterDataType =
+                            TypeConversions.fromLogicalToDataType(
+                                    FlinkTypeFactory.toLogicalType(
+                                            typeFactory.createTypeWithNullability(
+                                                    originalType, false)));
+                    Schema.UnresolvedColumn column = columns.remove(primaryKeyColumn);
+                    columns.put(
+                            primaryKeyColumn,
+                            column(
+                                    primaryKeyColumn,
+                                    alterDataType,
+                                    column.getComment().orElse(null)));
+                }
+            }
+            primaryKey =
+                    primaryKeyNamed(
+                            alterPrimaryKey
+                                    .getConstraintName()
+                                    .orElseGet(
+                                            () ->
+                                                    primaryKeyColumns.stream()
+                                                            .collect(
+                                                                    Collectors.joining(
+                                                                            "_", "PK_", ""))),
+                            primaryKeyColumns);
+        }
+
+        private void addOrModifyWatermarks(
+                AlterSchemaStrategy strategy, SqlWatermark alterWatermarkSpec) {
+            SqlIdentifier eventTimeColumnName = alterWatermarkSpec.getEventTimeColumnName();
+
+            Map<String, RelDataType> nameToTypeMap = new HashMap<>();
+            nameToTypeMap.putAll(physicalFieldNamesToTypes);
+            nameToTypeMap.putAll(metadataFieldNamesToTypes);
+            nameToTypeMap.putAll(computedFieldNamesToTypes);
+
+            verifyRowtimeAttribute(strategy, eventTimeColumnName, nameToTypeMap);
+            String rowtimeAttribute = eventTimeColumnName.toString();
+
+            SqlNode expression = alterWatermarkSpec.getWatermarkStrategy();
+
+            // this will validate and expand function identifiers
+            SqlNode validated =
+                    sqlValidator.validateParameterizedExpression(expression, nameToTypeMap);
+            watermarkSpecs.clear();
+            watermarkSpecs.put(
+                    rowtimeAttribute,
+                    watermarkSpec(
+                            rowtimeAttribute,
+                            new SqlCallExpression(escapeExpressions.apply(validated))));
+        }
+
+        private void convertNonComputedColumn(SqlTableColumn column) {
+            boolean isPhysical = column instanceof SqlTableColumn.SqlRegularColumn;
+            String name = column.getName().getSimple();
+            SqlDataTypeSpec typeSpec =
+                    isPhysical
+                            ? ((SqlTableColumn.SqlRegularColumn) column).getType()
+                            : ((SqlTableColumn.SqlMetadataColumn) column).getType();
+            boolean nullable = typeSpec.getNullable() == null || typeSpec.getNullable();
+            LogicalType logicalType =
+                    FlinkTypeFactory.toLogicalType(typeSpec.deriveType(sqlValidator, nullable));
+            DataType dataType = TypeConversions.fromLogicalToDataType(logicalType);
+            RelDataType relType = typeSpec.deriveType(sqlValidator, nullable);
+            Schema.UnresolvedColumn newColumn;
+            if (isPhysical) {
+                newColumn = column(name, dataType, getComment(column));
+                physicalFieldNamesToTypes.put(name, relType);
+            } else {
+                newColumn =
+                        columnByMetadata(
+                                name,
+                                dataType,
+                                ((SqlTableColumn.SqlMetadataColumn) column)
+                                        .getMetadataAlias()
+                                        .orElse(null),
+                                ((SqlTableColumn.SqlMetadataColumn) column).isVirtual(),
+                                getComment(column));
+                metadataFieldNamesToTypes.put(name, relType);
+            }
+            columns.put(name, newColumn);
+        }
+
+        private void convertComputedColumn(SqlTableColumn.SqlComputedColumn column) {
+            String name = column.getName().getSimple();
+            Map<String, RelDataType> accessibleFieldNamesToTypes = new HashMap<>();
+            accessibleFieldNamesToTypes.putAll(physicalFieldNamesToTypes);
+            accessibleFieldNamesToTypes.putAll(metadataFieldNamesToTypes);
+
+            // computed column cannot be generated on another computed column
+            final SqlNode validatedExpr =
+                    sqlValidator.validateParameterizedExpression(
+                            column.getExpr(), accessibleFieldNamesToTypes);
+            final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr);
+
+            Schema.UnresolvedColumn newColumn =
+                    columnByExpression(
+                            name, escapeExpressions.apply(validatedExpr), getComment(column));
+            computedFieldNamesToTypes.put(name, validatedType);
+            columns.put(name, newColumn);
+        }
+
+        private void verifyRowtimeAttribute(
+                AlterSchemaStrategy strategy,
+                SqlIdentifier eventTimeColumnName,
+                Map<String, RelDataType> allFieldsTypes) {
+            if (!watermarkSpecs.isEmpty() && strategy == AlterSchemaStrategy.ADD) {
+                throw new ValidationException(
+                        String.format(
+                                "There already exists a watermark spec for column '%s' in the base table. You "
+                                        + "might want to drop it before adding a new one.",
+                                watermarkSpecs.keySet()));
+            } else if (watermarkSpecs.isEmpty() && strategy == AlterSchemaStrategy.MODIFY) {
+                throw new ValidationException(
+                        "There is no watermark defined in the base table. You might want to add a new one.");
+            }
+            if (!eventTimeColumnName.isSimple()) {
+                throw new ValidationException(
+                        String.format(
+                                "The nested rowtime attribute field '%s' cannot define a watermark.",
+                                eventTimeColumnName));
+            }
+            String rowtimeField = eventTimeColumnName.getSimple();
+            List<String> components = eventTimeColumnName.names;
+            if (!allFieldsTypes.containsKey(components.get(0))) {
+                throw new ValidationException(
+                        String.format(
+                                "The rowtime attribute field '%s' is not defined in the table schema, at %s\n"
+                                        + "Available fields: [%s]",
+                                rowtimeField,
+                                eventTimeColumnName.getParserPosition(),
+                                allFieldsTypes.keySet().stream()
+                                        .collect(Collectors.joining("', '", "'", "'"))));
+            }
+        }
+
+        @Nullable
+        private String getComment(SqlTableColumn column) {
+            return column.getComment()
+                    .map(SqlCharStringLiteral.class::cast)
+                    .map(c -> c.getValueAs(String.class))
+                    .orElse(null);
+        }
+
+        private void collectColumnPosition(
+                AlterSchemaStrategy strategy, List<SqlNode> alterColumns) {
+            for (SqlNode alterColumn : alterColumns) {
+                assert alterColumn instanceof SqlTableColumnPosition;
+                SqlTableColumnPosition columnPosition = (SqlTableColumnPosition) alterColumn;
+                SqlTableColumn column = columnPosition.getColumn();
+                String name = column.getName().getSimple();
+                boolean existed = sortedColumnNames.contains(name);
+                boolean first = columnPosition.isFirstColumn();
+                boolean after = columnPosition.isAfterReferencedColumn();
+                if (strategy == AlterSchemaStrategy.ADD) {
+                    if (existed) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Try to add a column '%s' which already exists in the table.",
+                                        name));
+                    }
+                    if (first) {
+                        sortedColumnNames.add(0, name);
+                    } else if (after) {
+                        getReferencedColumn(
+                                        columnPosition,
+                                        (refName) -> sortedColumnNames.contains(refName))
+                                .ifPresent(
+                                        (refCol) ->
+                                                sortedColumnNames.add(
+                                                        sortedColumnNames.indexOf(refCol) + 1,
+                                                        name));
+                    } else {
+                        sortedColumnNames.add(name);
+                    }
+                } else if (strategy == AlterSchemaStrategy.MODIFY) {
+                    if (!existed) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Try to modify a column '%s' which does not exist in the table.",
+                                        name));
+                    }
+                    if (first) {
+                        sortedColumnNames.remove(name);
+                        sortedColumnNames.add(0, name);
+                    } else if (after) {
+                        getReferencedColumn(
+                                        columnPosition, (refName) -> columns.containsKey(refName))
+                                .ifPresent(
+                                        (refCol) -> {
+                                            sortedColumnNames.remove(name);
+                                            sortedColumnNames.add(
+                                                    sortedColumnNames.indexOf(refCol) + 1, name);
+                                        });
+                    }
+                }
+            }
+        }
+
+        private Optional<String> getReferencedColumn(
+                SqlTableColumnPosition columnPosition, Function<String, Boolean> existFn) {
+            assert columnPosition.getAfterReferencedColumn() != null;
+            String referencedName = columnPosition.getAfterReferencedColumn().getSimple();
+            if (!existFn.apply(referencedName)) {
+                throw new ValidationException(
+                        String.format(
+                                "Referenced column '%s' by 'AFTER' does not exist in the table.",
+                                referencedName));
+            }
+            return Optional.of(referencedName);
+        }
+
+        private Schema.UnresolvedColumn column(
+                String columnName, AbstractDataType<?> dataType, @Nullable String comment) {
+            return Schema.newBuilder()
+                    .column(columnName, dataType)
+                    .withComment(comment)
+                    .build()
+                    .getColumns()
+                    .get(0);

Review Comment:
   Replace with constructor?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterTableSchemaUtil.java:
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableModify;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlWatermark;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Util to convert {@link SqlAlterTableSchema} with source table to generate new {@link Schema}. */
+public class AlterTableSchemaUtil {
+
+    private final SqlValidator sqlValidator;
+    private final Consumer<SqlTableConstraint> validateTableConstraint;
+    private final Function<SqlNode, String> escapeExpression;
+
+    AlterTableSchemaUtil(
+            SqlValidator sqlValidator,
+            Function<SqlNode, String> escapeExpression,
+            Consumer<SqlTableConstraint> validateTableConstraint) {
+        this.sqlValidator = sqlValidator;
+        this.validateTableConstraint = validateTableConstraint;
+        this.escapeExpression = escapeExpression;
+    }
+
+    public Schema convertSchema(
+            SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable originalTable) {
+        UnresolvedSchemaBuilder builder =
+                new UnresolvedSchemaBuilder(
+                        originalTable,
+                        (FlinkTypeFactory) sqlValidator.getTypeFactory(),
+                        sqlValidator,
+                        validateTableConstraint,
+                        escapeExpression);
+        AlterSchemaStrategy strategy = computeAlterSchemaStrategy(alterTableSchema);
+        List<SqlNode> columnPositions = alterTableSchema.getColumnPositions().getList();
+        builder.addOrModifyColumns(strategy, columnPositions);
+        alterTableSchema
+                .getWatermark()
+                .ifPresent(sqlWatermark -> builder.addOrModifyWatermarks(strategy, sqlWatermark));
+        alterTableSchema
+                .getFullConstraint()
+                .ifPresent(
+                        (pk) ->
+                                builder.addOrModifyPrimaryKey(
+                                        strategy, pk, columnPositions.isEmpty()));
+        return builder.build();
+    }
+
+    private static class UnresolvedSchemaBuilder {
+
+        List<String> sortedColumnNames = new ArrayList<>();
+        Map<String, Schema.UnresolvedColumn> columns = new HashMap<>();
+        Map<String, Schema.UnresolvedWatermarkSpec> watermarkSpecs = new HashMap<>();
+        @Nullable Schema.UnresolvedPrimaryKey primaryKey = null;
+
+        // Intermediate state
+        Map<String, RelDataType> physicalFieldNamesToTypes = new HashMap<>();
+        Map<String, RelDataType> metadataFieldNamesToTypes = new HashMap<>();
+        Map<String, RelDataType> computedFieldNamesToTypes = new HashMap<>();
+
+        Function<SqlNode, String> escapeExpressions;
+        FlinkTypeFactory typeFactory;
+        SqlValidator sqlValidator;
+        Consumer<SqlTableConstraint> validateTableConstraint;
+
+        UnresolvedSchemaBuilder(
+                ResolvedCatalogTable sourceTable,
+                FlinkTypeFactory typeFactory,
+                SqlValidator sqlValidator,
+                Consumer<SqlTableConstraint> validateTableConstraint,
+                Function<SqlNode, String> escapeExpressions) {
+            this.typeFactory = typeFactory;
+            this.sqlValidator = sqlValidator;
+            this.escapeExpressions = escapeExpressions;
+            this.validateTableConstraint = validateTableConstraint;
+            populateColumnsFromSourceTable(sourceTable);
+            populatePrimaryKeyFromSourceTable(sourceTable.getUnresolvedSchema());
+            populateWatermarksFromSourceTable(sourceTable.getUnresolvedSchema());
+        }
+
+        private void populateColumnsFromSourceTable(ResolvedCatalogTable sourceTable) {
+            List<DataType> types = sourceTable.getResolvedSchema().getColumnDataTypes();
+            List<Schema.UnresolvedColumn> sourceColumns =
+                    sourceTable.getUnresolvedSchema().getColumns();
+            for (int i = 0; i < sourceColumns.size(); i++) {
+                Schema.UnresolvedColumn column = sourceColumns.get(i);
+                String columnName = column.getName();
+                sortedColumnNames.add(columnName);
+                columns.put(columnName, column);
+                RelDataType type =
+                        typeFactory.createFieldTypeFromLogicalType(types.get(i).getLogicalType());
+                if (column instanceof Schema.UnresolvedPhysicalColumn) {
+                    physicalFieldNamesToTypes.put(columnName, type);
+                } else if (column instanceof Schema.UnresolvedComputedColumn) {
+                    computedFieldNamesToTypes.put(columnName, type);
+                } else if (column instanceof Schema.UnresolvedMetadataColumn) {
+                    metadataFieldNamesToTypes.put(columnName, type);
+                }
+            }
+        }
+
+        private void populatePrimaryKeyFromSourceTable(Schema sourceSchema) {
+            if (sourceSchema.getPrimaryKey().isPresent()) {
+                primaryKey = sourceSchema.getPrimaryKey().get();
+            }
+        }
+
+        private void populateWatermarksFromSourceTable(Schema sourceSchema) {
+            for (Schema.UnresolvedWatermarkSpec sourceWatermarkSpec :
+                    sourceSchema.getWatermarkSpecs()) {
+                watermarkSpecs.put(sourceWatermarkSpec.getColumnName(), sourceWatermarkSpec);
+            }
+        }
+
+        private void addOrModifyColumns(
+                AlterSchemaStrategy strategy, List<SqlNode> alterColumnPositions) {
+            collectColumnPosition(strategy, alterColumnPositions);
+            for (SqlNode alterColumnPos : alterColumnPositions) {
+                SqlTableColumn alterColumn = ((SqlTableColumnPosition) alterColumnPos).getColumn();
+                if (alterColumn instanceof SqlTableColumn.SqlComputedColumn) {
+                    convertComputedColumn((SqlTableColumn.SqlComputedColumn) alterColumn);
+                } else {
+                    convertNonComputedColumn(alterColumn);
+                }
+            }
+        }
+
+        private void addOrModifyPrimaryKey(
+                AlterSchemaStrategy strategy,
+                SqlTableConstraint alterPrimaryKey,
+                boolean withoutAlterColumn) {
+            validateTableConstraint.accept(alterPrimaryKey);
+            if (strategy == AlterSchemaStrategy.ADD && primaryKey != null) {
+                throw new ValidationException(
+                        String.format(
+                                "The base table already has a primary key %s. You might "
+                                        + "want to drop it before adding a new one.",
+                                primaryKey.getColumnNames()));
+            } else if (strategy == AlterSchemaStrategy.MODIFY && primaryKey == null) {
+                throw new ValidationException(
+                        "The base table does not define any primary key. You might "
+                                + "want to add a new one.");
+            }
+            List<String> primaryKeyColumns = new ArrayList<>();
+            for (SqlNode primaryKeyNode : alterPrimaryKey.getColumns()) {
+                String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple();
+                if (!columns.containsKey(primaryKey)) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Primary key column '%s' is not defined in the schema at %s",
+                                    primaryKey, primaryKeyNode.getParserPosition()));
+                }
+                if (!(columns.get(primaryKey) instanceof Schema.UnresolvedPhysicalColumn)) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Could not create a PRIMARY KEY with column '%s' at %s.\n"
+                                            + "A PRIMARY KEY constraint must be declared on physical columns.",
+                                    primaryKey, primaryKeyNode.getParserPosition()));
+                }
+                primaryKeyColumns.add(primaryKey);
+            }
+            if (withoutAlterColumn) {
+                // a single add/modify constraint changes the nullability of columns implicitly
+                for (String primaryKeyColumn : primaryKeyColumns) {
+                    RelDataType originalType = physicalFieldNamesToTypes.get(primaryKeyColumn);
+                    DataType alterDataType =
+                            TypeConversions.fromLogicalToDataType(
+                                    FlinkTypeFactory.toLogicalType(
+                                            typeFactory.createTypeWithNullability(
+                                                    originalType, false)));
+                    Schema.UnresolvedColumn column = columns.remove(primaryKeyColumn);
+                    columns.put(
+                            primaryKeyColumn,
+                            column(
+                                    primaryKeyColumn,
+                                    alterDataType,
+                                    column.getComment().orElse(null)));
+                }
+            }
+            primaryKey =
+                    primaryKeyNamed(
+                            alterPrimaryKey
+                                    .getConstraintName()
+                                    .orElseGet(
+                                            () ->
+                                                    primaryKeyColumns.stream()
+                                                            .collect(
+                                                                    Collectors.joining(
+                                                                            "_", "PK_", ""))),
+                            primaryKeyColumns);
+        }
+
+        private void addOrModifyWatermarks(
+                AlterSchemaStrategy strategy, SqlWatermark alterWatermarkSpec) {
+            SqlIdentifier eventTimeColumnName = alterWatermarkSpec.getEventTimeColumnName();
+
+            Map<String, RelDataType> nameToTypeMap = new HashMap<>();
+            nameToTypeMap.putAll(physicalFieldNamesToTypes);
+            nameToTypeMap.putAll(metadataFieldNamesToTypes);
+            nameToTypeMap.putAll(computedFieldNamesToTypes);
+
+            verifyRowtimeAttribute(strategy, eventTimeColumnName, nameToTypeMap);
+            String rowtimeAttribute = eventTimeColumnName.toString();
+
+            SqlNode expression = alterWatermarkSpec.getWatermarkStrategy();
+
+            // this will validate and expand function identifiers
+            SqlNode validated =
+                    sqlValidator.validateParameterizedExpression(expression, nameToTypeMap);
+            watermarkSpecs.clear();
+            watermarkSpecs.put(
+                    rowtimeAttribute,
+                    watermarkSpec(
+                            rowtimeAttribute,
+                            new SqlCallExpression(escapeExpressions.apply(validated))));
+        }
+
+        private void convertNonComputedColumn(SqlTableColumn column) {
+            boolean isPhysical = column instanceof SqlTableColumn.SqlRegularColumn;
+            String name = column.getName().getSimple();
+            SqlDataTypeSpec typeSpec =
+                    isPhysical
+                            ? ((SqlTableColumn.SqlRegularColumn) column).getType()
+                            : ((SqlTableColumn.SqlMetadataColumn) column).getType();
+            boolean nullable = typeSpec.getNullable() == null || typeSpec.getNullable();
+            LogicalType logicalType =
+                    FlinkTypeFactory.toLogicalType(typeSpec.deriveType(sqlValidator, nullable));
+            DataType dataType = TypeConversions.fromLogicalToDataType(logicalType);
+            RelDataType relType = typeSpec.deriveType(sqlValidator, nullable);
+            Schema.UnresolvedColumn newColumn;
+            if (isPhysical) {
+                newColumn = column(name, dataType, getComment(column));
+                physicalFieldNamesToTypes.put(name, relType);
+            } else {
+                newColumn =
+                        columnByMetadata(
+                                name,
+                                dataType,
+                                ((SqlTableColumn.SqlMetadataColumn) column)
+                                        .getMetadataAlias()
+                                        .orElse(null),
+                                ((SqlTableColumn.SqlMetadataColumn) column).isVirtual(),
+                                getComment(column));
+                metadataFieldNamesToTypes.put(name, relType);
+            }
+            columns.put(name, newColumn);
+        }
+
+        private void convertComputedColumn(SqlTableColumn.SqlComputedColumn column) {
+            String name = column.getName().getSimple();
+            Map<String, RelDataType> accessibleFieldNamesToTypes = new HashMap<>();
+            accessibleFieldNamesToTypes.putAll(physicalFieldNamesToTypes);
+            accessibleFieldNamesToTypes.putAll(metadataFieldNamesToTypes);
+
+            // computed column cannot be generated on another computed column
+            final SqlNode validatedExpr =
+                    sqlValidator.validateParameterizedExpression(
+                            column.getExpr(), accessibleFieldNamesToTypes);
+            final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr);
+
+            Schema.UnresolvedColumn newColumn =
+                    columnByExpression(
+                            name, escapeExpressions.apply(validatedExpr), getComment(column));
+            computedFieldNamesToTypes.put(name, validatedType);
+            columns.put(name, newColumn);
+        }
+
+        private void verifyRowtimeAttribute(
+                AlterSchemaStrategy strategy,
+                SqlIdentifier eventTimeColumnName,
+                Map<String, RelDataType> allFieldsTypes) {
+            if (!watermarkSpecs.isEmpty() && strategy == AlterSchemaStrategy.ADD) {
+                throw new ValidationException(
+                        String.format(
+                                "There already exists a watermark spec for column '%s' in the base table. You "
+                                        + "might want to drop it before adding a new one.",
+                                watermarkSpecs.keySet()));
+            } else if (watermarkSpecs.isEmpty() && strategy == AlterSchemaStrategy.MODIFY) {
+                throw new ValidationException(
+                        "There is no watermark defined in the base table. You might want to add a new one.");
+            }
+            if (!eventTimeColumnName.isSimple()) {
+                throw new ValidationException(
+                        String.format(
+                                "The nested rowtime attribute field '%s' cannot define a watermark.",
+                                eventTimeColumnName));
+            }
+            String rowtimeField = eventTimeColumnName.getSimple();
+            List<String> components = eventTimeColumnName.names;
+            if (!allFieldsTypes.containsKey(components.get(0))) {
+                throw new ValidationException(
+                        String.format(
+                                "The rowtime attribute field '%s' is not defined in the table schema, at %s\n"
+                                        + "Available fields: [%s]",
+                                rowtimeField,
+                                eventTimeColumnName.getParserPosition(),
+                                allFieldsTypes.keySet().stream()
+                                        .collect(Collectors.joining("', '", "'", "'"))));
+            }
+        }
+
+        @Nullable
+        private String getComment(SqlTableColumn column) {
+            return column.getComment()
+                    .map(SqlCharStringLiteral.class::cast)
+                    .map(c -> c.getValueAs(String.class))
+                    .orElse(null);
+        }
+
+        private void collectColumnPosition(
+                AlterSchemaStrategy strategy, List<SqlNode> alterColumns) {
+            for (SqlNode alterColumn : alterColumns) {
+                assert alterColumn instanceof SqlTableColumnPosition;

Review Comment:
   It's better to use Preconditions rather than assert. Because assert can be turned off.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org