You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/11 01:57:50 UTC

[GitHub] [flink] lsyldliu commented on a diff in pull request #20652: [FLINK-22315][table] Support add/modify column/constraint/watermark for ALTER TABLE statement

lsyldliu commented on code in PR #20652:
URL: https://github.com/apache/flink/pull/20652#discussion_r991742960


##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java:
##########
@@ -72,4 +87,14 @@ public Optional<SqlWatermark> getWatermark() {
     public List<SqlTableConstraint> getConstraints() {
         return constraints;
     }
+
+    public List<SqlTableConstraint> getFullConstraint() {

Review Comment:
   It would be better that return `Optional<SqlTableConstraint>` directly.



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java:
##########
@@ -61,6 +67,15 @@ public List<SqlNode> getOperandList() {
                 watermark);
     }
 
+    @Override
+    public void validate() throws SqlValidateException {
+        List<SqlTableColumn> columns = new ArrayList<>();
+        for (SqlNode columnPos : columnList) {
+            columns.add(((SqlTableColumnPosition) columnPos).getColumn());
+        }
+        SqlConstraintValidator.validate(constraints, new SqlNodeList(columns, SqlParserPos.ZERO));

Review Comment:
   The code in 73 line and 95 line seems to be duplicated, so I think we can extract the above method.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterTableSchemaUtil.java:
##########
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlWatermark;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Util to convert {@link SqlAlterTableSchema} with source table to generate new {@link Schema}. */
+public class AlterTableSchemaUtil {
+
+    private final SqlValidator sqlValidator;
+    private final Consumer<SqlTableConstraint> validateTableConstraint;
+    private final Function<SqlNode, String> escapeExpression;
+
+    AlterTableSchemaUtil(
+            SqlValidator sqlValidator,
+            Function<SqlNode, String> escapeExpression,
+            Consumer<SqlTableConstraint> validateTableConstraint) {
+        this.sqlValidator = sqlValidator;
+        this.validateTableConstraint = validateTableConstraint;
+        this.escapeExpression = escapeExpression;
+    }
+
+    public Schema convertSchema(
+            SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable originalTable) {
+        UnresolvedSchemaBuilder builder =
+                new UnresolvedSchemaBuilder(
+                        originalTable,
+                        (FlinkTypeFactory) sqlValidator.getTypeFactory(),
+                        sqlValidator,
+                        validateTableConstraint,
+                        escapeExpression);
+        AlterSchemaStrategy strategy =
+                alterTableSchema instanceof SqlAlterTableAdd

Review Comment:
   Assuming there exist other classes such as `SqlAlterTableDrop`、 `SqlAlterTableRenameColumn` that also extends `SqlAlterTableSchema`, this judge logic maybe occur unexpected result, so I think we should extract a method to get the `AlterSchemaStrategy`, it is convenient for us to expand the AlterSchemaStrategy in the future.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterTableSchemaUtil.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlWatermark;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.util.NlsString;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Util to convert {@link SqlAlterTableSchema} with source table to generate new {@link Schema}. */
+public class AlterTableSchemaUtil {
+
+    private final SqlValidator sqlValidator;
+    private final Consumer<SqlTableConstraint> validateTableConstraint;
+    private final Function<SqlNode, String> escapeExpression;
+
+    AlterTableSchemaUtil(
+            SqlValidator sqlValidator,
+            Function<SqlNode, String> escapeExpression,
+            Consumer<SqlTableConstraint> validateTableConstraint) {
+        this.sqlValidator = sqlValidator;
+        this.validateTableConstraint = validateTableConstraint;
+        this.escapeExpression = escapeExpression;
+    }
+
+    public Schema convertSchema(
+            SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable originalTable) {
+        UnresolvedSchemaBuilder builder =
+                new UnresolvedSchemaBuilder(
+                        originalTable,
+                        (FlinkTypeFactory) sqlValidator.getTypeFactory(),
+                        sqlValidator,
+                        validateTableConstraint,
+                        escapeExpression);
+        AlterSchemaStrategy strategy =
+                alterTableSchema instanceof SqlAlterTableAdd
+                        ? AlterSchemaStrategy.ADD
+                        : AlterSchemaStrategy.MODIFY;
+        builder.addOrModifyColumns(strategy, alterTableSchema.getColumns().getList());
+        List<SqlTableConstraint> fullConstraint = alterTableSchema.getFullConstraint();
+        if (!fullConstraint.isEmpty()) {
+            builder.addOrModifyPrimaryKey(strategy, fullConstraint.get(0));
+        }
+        alterTableSchema
+                .getWatermark()
+                .ifPresent(sqlWatermark -> builder.addOrModifyWatermarks(strategy, sqlWatermark));
+        return builder.build();
+    }
+
+    private static class UnresolvedSchemaBuilder {
+
+        List<String> newColumnNames = new ArrayList<>();
+        Set<String> alterColumnNames = new HashSet<>();
+        Map<String, Schema.UnresolvedColumn> columns = new HashMap<>();
+        Map<String, Schema.UnresolvedWatermarkSpec> watermarkSpec = new HashMap<>();
+        @Nullable Schema.UnresolvedPrimaryKey primaryKey = null;
+
+        // Intermediate state
+        Map<String, RelDataType> physicalFieldNamesToTypes = new HashMap<>();
+        Map<String, RelDataType> metadataFieldNamesToTypes = new HashMap<>();
+        Map<String, RelDataType> computedFieldNamesToTypes = new HashMap<>();
+
+        Function<SqlNode, String> escapeExpressions;
+        FlinkTypeFactory typeFactory;
+        SqlValidator sqlValidator;
+        Consumer<SqlTableConstraint> validateTableConstraint;
+
+        UnresolvedSchemaBuilder(
+                ResolvedCatalogTable sourceTable,
+                FlinkTypeFactory typeFactory,
+                SqlValidator sqlValidator,
+                Consumer<SqlTableConstraint> validateTableConstraint,
+                Function<SqlNode, String> escapeExpressions) {
+            this.typeFactory = typeFactory;
+            this.sqlValidator = sqlValidator;
+            this.escapeExpressions = escapeExpressions;
+            this.validateTableConstraint = validateTableConstraint;
+            populateColumnsFromSourceTable(sourceTable);
+            populatePrimaryKeyFromSourceTable(sourceTable.getUnresolvedSchema());
+            populateWatermarksFromSourceTable(sourceTable.getUnresolvedSchema());
+        }
+
+        private void populateColumnsFromSourceTable(ResolvedCatalogTable sourceTable) {
+            List<DataType> types = sourceTable.getResolvedSchema().getColumnDataTypes();
+            List<Schema.UnresolvedColumn> sourceColumns =
+                    sourceTable.getUnresolvedSchema().getColumns();
+            for (int i = 0; i < sourceColumns.size(); i++) {
+                Schema.UnresolvedColumn column = sourceColumns.get(i);
+                String columnName = column.getName();
+                newColumnNames.add(columnName);
+                columns.put(columnName, column);
+                RelDataType type =
+                        typeFactory.createFieldTypeFromLogicalType(types.get(i).getLogicalType());
+                if (column instanceof Schema.UnresolvedPhysicalColumn) {
+                    physicalFieldNamesToTypes.put(columnName, type);
+                } else if (column instanceof Schema.UnresolvedComputedColumn) {
+                    computedFieldNamesToTypes.put(columnName, type);
+                } else if (column instanceof Schema.UnresolvedMetadataColumn) {
+                    metadataFieldNamesToTypes.put(columnName, type);
+                }
+            }
+        }
+
+        private void populatePrimaryKeyFromSourceTable(Schema sourceSchema) {
+            if (sourceSchema.getPrimaryKey().isPresent()) {
+                primaryKey = sourceSchema.getPrimaryKey().get();
+            }
+        }
+
+        private void populateWatermarksFromSourceTable(Schema sourceSchema) {
+            for (Schema.UnresolvedWatermarkSpec sourceWatermarkSpec :
+                    sourceSchema.getWatermarkSpecs()) {
+                watermarkSpec.put(sourceWatermarkSpec.getColumnName(), sourceWatermarkSpec);
+            }
+        }
+
+        private void addOrModifyColumns(
+                AlterSchemaStrategy strategy, List<SqlNode> alterColumnPositions) {
+            collectColumnPosition(strategy, alterColumnPositions);
+            for (SqlNode alterColumnPos : alterColumnPositions) {
+                SqlTableColumn alterColumn = ((SqlTableColumnPosition) alterColumnPos).getColumn();
+                if (alterColumn instanceof SqlTableColumn.SqlComputedColumn) {
+                    convertComputedColumn((SqlTableColumn.SqlComputedColumn) alterColumn);
+                } else {
+                    convertNonComputedColumn(alterColumn);
+                }
+            }
+        }
+
+        private void addOrModifyPrimaryKey(
+                AlterSchemaStrategy strategy, SqlTableConstraint alterPrimaryKey) {
+            validateTableConstraint.accept(alterPrimaryKey);
+            if (strategy == AlterSchemaStrategy.ADD) {
+                if (primaryKey != null) {
+                    throw new ValidationException(
+                            String.format(
+                                    "The base table already has a primary key %s. You might "
+                                            + "want to drop it before adding a new one.",
+                                    primaryKey.getColumnNames()));
+                }
+            }
+            List<String> primaryKeyColumns = new ArrayList<>();
+            for (SqlNode primaryKeyNode : alterPrimaryKey.getColumns()) {
+                String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple();
+                if (!columns.containsKey(primaryKey)) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Primary key column '%s' is not defined in the schema at %s",
+                                    primaryKey, primaryKeyNode.getParserPosition()));
+                }
+                if (!(columns.get(primaryKey) instanceof Schema.UnresolvedPhysicalColumn)) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Could not create a PRIMARY KEY with column '%s' at %s.\n"
+                                            + "A PRIMARY KEY constraint must be declared on physical columns.",
+                                    primaryKey, primaryKeyNode.getParserPosition()));
+                }
+                primaryKeyColumns.add(primaryKey);
+            }
+            if (alterColumnNames.isEmpty()) {
+                // a single add/modify constraint changes the nullability of columns implicitly
+                for (String primaryKeyColumn : primaryKeyColumns) {
+                    RelDataType originalType = physicalFieldNamesToTypes.get(primaryKeyColumn);
+                    DataType alterDataType =
+                            TypeConversions.fromLogicalToDataType(
+                                    FlinkTypeFactory.toLogicalType(
+                                            typeFactory.createTypeWithNullability(
+                                                    originalType, false)));
+                    Schema.UnresolvedColumn column = columns.remove(primaryKeyColumn);
+                    columns.put(
+                            primaryKeyColumn,
+                            new Schema.UnresolvedPhysicalColumn(
+                                    primaryKeyColumn,
+                                    alterDataType,
+                                    column.getComment().orElse(null)));
+                }
+            }
+            primaryKey =
+                    new Schema.UnresolvedPrimaryKey(
+                            alterPrimaryKey
+                                    .getConstraintName()
+                                    .orElseGet(
+                                            () ->
+                                                    primaryKeyColumns.stream()
+                                                            .collect(
+                                                                    Collectors.joining(
+                                                                            "_", "PK_", ""))),
+                            primaryKeyColumns);
+        }
+
+        private void addOrModifyWatermarks(
+                AlterSchemaStrategy strategy, SqlWatermark alterWatermarkSpec) {
+            SqlIdentifier eventTimeColumnName = alterWatermarkSpec.getEventTimeColumnName();
+
+            Map<String, RelDataType> nameToTypeMap = new HashMap<>();
+            nameToTypeMap.putAll(physicalFieldNamesToTypes);
+            nameToTypeMap.putAll(metadataFieldNamesToTypes);
+            nameToTypeMap.putAll(computedFieldNamesToTypes);
+
+            verifyRowtimeAttribute(strategy, eventTimeColumnName, nameToTypeMap);
+            String rowtimeAttribute = eventTimeColumnName.toString();
+
+            SqlNode expression = alterWatermarkSpec.getWatermarkStrategy();
+
+            // this will validate and expand function identifiers
+            SqlNode validated =
+                    sqlValidator.validateParameterizedExpression(expression, nameToTypeMap);
+            watermarkSpec.clear();
+            watermarkSpec.put(
+                    rowtimeAttribute,
+                    new Schema.UnresolvedWatermarkSpec(
+                            rowtimeAttribute,
+                            new SqlCallExpression(escapeExpressions.apply(validated))));
+        }
+
+        private void convertNonComputedColumn(SqlTableColumn column) {
+            boolean isPhysical = column instanceof SqlTableColumn.SqlRegularColumn;
+            String name = column.getName().getSimple();
+            SqlDataTypeSpec typeSpec =
+                    isPhysical
+                            ? ((SqlTableColumn.SqlRegularColumn) column).getType()
+                            : ((SqlTableColumn.SqlMetadataColumn) column).getType();
+            boolean nullable = typeSpec.getNullable() == null || typeSpec.getNullable();
+            LogicalType logicalType =
+                    FlinkTypeFactory.toLogicalType(typeSpec.deriveType(sqlValidator, nullable));
+            DataType dataType = TypeConversions.fromLogicalToDataType(logicalType);
+            RelDataType relType = typeSpec.deriveType(sqlValidator, nullable);
+            Schema.UnresolvedColumn newColumn;
+            if (isPhysical) {
+                newColumn = new Schema.UnresolvedPhysicalColumn(name, dataType, getComment(column));
+                physicalFieldNamesToTypes.put(name, relType);
+            } else {
+                newColumn =
+                        new Schema.UnresolvedMetadataColumn(
+                                name,
+                                dataType,
+                                ((SqlTableColumn.SqlMetadataColumn) column)
+                                        .getMetadataAlias()
+                                        .orElse(null),
+                                ((SqlTableColumn.SqlMetadataColumn) column).isVirtual(),
+                                getComment(column));
+                metadataFieldNamesToTypes.put(name, relType);
+            }
+            columns.put(name, newColumn);
+        }
+
+        private void convertComputedColumn(SqlTableColumn.SqlComputedColumn column) {
+            String name = column.getName().getSimple();
+            Map<String, RelDataType> accessibleFieldNamesToTypes = new HashMap<>();
+            accessibleFieldNamesToTypes.putAll(physicalFieldNamesToTypes);
+            accessibleFieldNamesToTypes.putAll(metadataFieldNamesToTypes);
+
+            // computed column cannot be generated on another computed column
+            final SqlNode validatedExpr =
+                    sqlValidator.validateParameterizedExpression(
+                            column.getExpr(), accessibleFieldNamesToTypes);
+            final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr);
+
+            Schema.UnresolvedColumn newColumn =
+                    new Schema.UnresolvedComputedColumn(
+                            name,
+                            new SqlCallExpression(escapeExpressions.apply(validatedExpr)),
+                            getComment(column));
+            computedFieldNamesToTypes.put(name, validatedType);
+            columns.put(name, newColumn);
+        }
+
+        private void verifyRowtimeAttribute(
+                AlterSchemaStrategy strategy,
+                SqlIdentifier eventTimeColumnName,
+                Map<String, RelDataType> allFieldsTypes) {
+            if (!watermarkSpec.isEmpty() && strategy == AlterSchemaStrategy.ADD) {
+                throw new ValidationException(
+                        String.format(
+                                "There already exists a watermark spec for column '%s' in the base table. You "
+                                        + "might want to drop it before adding a new one.",
+                                watermarkSpec.keySet()));
+            }
+            if (!eventTimeColumnName.isSimple()) {
+                throw new ValidationException(
+                        String.format(
+                                "The nested rowtime attribute field '%s' cannot define a watermark.",
+                                eventTimeColumnName));
+            }
+            String rowtimeField = eventTimeColumnName.getSimple();
+            List<String> components = eventTimeColumnName.names;
+            if (!allFieldsTypes.containsKey(components.get(0))) {
+                throw new ValidationException(
+                        String.format(
+                                "The rowtime attribute field '%s' is not defined in the table schema, at %s\n"
+                                        + "Available fields: [%s]",
+                                rowtimeField,
+                                eventTimeColumnName.getParserPosition(),
+                                allFieldsTypes.keySet().stream()
+                                        .collect(Collectors.joining("', '", "'", "'"))));
+            }
+        }
+
+        @Nullable
+        private String getComment(SqlTableColumn column) {
+            return column.getComment()
+                    .map(SqlLiteral::value)
+                    .map(
+                            comparable ->
+                                    comparable instanceof NlsString
+                                            ? ((NlsString) comparable).getValue()
+                                            : comparable.toString())
+                    .orElse(null);
+        }
+
+        private void collectColumnPosition(
+                AlterSchemaStrategy strategy, List<SqlNode> alterColumns) {
+            for (SqlNode alterColumn : alterColumns) {
+                assert alterColumn instanceof SqlTableColumnPosition;
+                SqlTableColumnPosition columnPosition = (SqlTableColumnPosition) alterColumn;
+                SqlTableColumn column = columnPosition.getColumn();
+                String name = column.getName().getSimple();
+                boolean existed = columns.containsKey(name);
+                boolean first = columnPosition.isFirstColumn();
+                boolean after = columnPosition.isAfterReferencedColumn();
+                if (strategy == AlterSchemaStrategy.ADD) {
+                    if (existed) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Try to add a column '%s' which already exists in the table.",
+                                        name));
+                    }
+                    if (first) {
+                        newColumnNames.add(0, name);
+                    } else if (after) {
+                        getReferencedColumn(
+                                        columnPosition,
+                                        (refName) ->
+                                                columns.containsKey(refName)
+                                                        || alterColumnNames.contains(refName))
+                                .ifPresent(
+                                        (refCol) ->
+                                                newColumnNames.add(
+                                                        newColumnNames.indexOf(refCol) + 1, name));
+                    } else {
+                        newColumnNames.add(name);
+                    }
+                } else if (strategy == AlterSchemaStrategy.MODIFY) {
+                    if (!existed) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Try to modify a column '%s' which does not exist in the table.",
+                                        name));
+                    }
+                    if (first) {
+                        newColumnNames.remove(name);
+                        newColumnNames.add(0, name);
+                    } else if (after) {
+                        getReferencedColumn(
+                                        columnPosition, (refName) -> columns.containsKey(refName))
+                                .ifPresent(
+                                        (refCol) -> {
+                                            newColumnNames.remove(name);
+                                            newColumnNames.add(
+                                                    newColumnNames.indexOf(refCol) + 1, name);
+                                        });
+                    }
+                }
+                alterColumnNames.add(name);
+            }
+        }
+
+        private Optional<String> getReferencedColumn(
+                SqlTableColumnPosition columnPosition, Function<String, Boolean> existFn) {
+            boolean after = columnPosition.isAfterReferencedColumn();
+            if (after) {
+                assert columnPosition.getAfterReferencedColumn() != null;
+                String referencedName = columnPosition.getAfterReferencedColumn().getSimple();
+                if (!existFn.apply(referencedName)) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Referenced column '%s' by 'AFTER' does not exist in the table.",
+                                    referencedName));
+                }
+                return Optional.of(referencedName);
+            }
+            return Optional.empty();
+        }
+
+        public Schema build() {
+            List<Schema.UnresolvedColumn> newColumns = new ArrayList<>();
+            for (String column : newColumnNames) {
+                newColumns.add(columns.get(column));
+            }
+            Schema.Builder resultBuilder = Schema.newBuilder().fromColumns(newColumns);
+            if (primaryKey != null) {
+                String constraintName = primaryKey.getConstraintName();
+                List<String> pkColumns = primaryKey.getColumnNames();
+                if (constraintName != null) {
+                    resultBuilder.primaryKeyNamed(constraintName, pkColumns);
+                } else {
+                    resultBuilder.primaryKey(pkColumns);
+                }
+            }
+            watermarkSpec.forEach((k, v) -> resultBuilder.watermark(k, v.getWatermarkExpression()));

Review Comment:
   Just to keep the same code logic, no other consideration.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterTableSchemaUtil.java:
##########
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlWatermark;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Util to convert {@link SqlAlterTableSchema} with source table to generate new {@link Schema}. */
+public class AlterTableSchemaUtil {
+
+    private final SqlValidator sqlValidator;
+    private final Consumer<SqlTableConstraint> validateTableConstraint;
+    private final Function<SqlNode, String> escapeExpression;
+
+    AlterTableSchemaUtil(
+            SqlValidator sqlValidator,
+            Function<SqlNode, String> escapeExpression,
+            Consumer<SqlTableConstraint> validateTableConstraint) {
+        this.sqlValidator = sqlValidator;
+        this.validateTableConstraint = validateTableConstraint;
+        this.escapeExpression = escapeExpression;
+    }
+
+    public Schema convertSchema(
+            SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable originalTable) {
+        UnresolvedSchemaBuilder builder =
+                new UnresolvedSchemaBuilder(
+                        originalTable,
+                        (FlinkTypeFactory) sqlValidator.getTypeFactory(),
+                        sqlValidator,
+                        validateTableConstraint,
+                        escapeExpression);
+        AlterSchemaStrategy strategy =
+                alterTableSchema instanceof SqlAlterTableAdd
+                        ? AlterSchemaStrategy.ADD
+                        : AlterSchemaStrategy.MODIFY;
+        builder.addOrModifyColumns(strategy, alterTableSchema.getColumns().getList());
+        List<SqlTableConstraint> fullConstraint = alterTableSchema.getFullConstraint();
+        if (!fullConstraint.isEmpty()) {
+            builder.addOrModifyPrimaryKey(
+                    strategy,
+                    fullConstraint.get(0),
+                    alterTableSchema.getColumns().getList().isEmpty());
+        }
+        alterTableSchema
+                .getWatermark()
+                .ifPresent(sqlWatermark -> builder.addOrModifyWatermarks(strategy, sqlWatermark));
+        return builder.build();
+    }
+
+    private static class UnresolvedSchemaBuilder {
+
+        List<String> sortedColumnNames = new ArrayList<>();
+        Map<String, Schema.UnresolvedColumn> columns = new HashMap<>();
+        Map<String, Schema.UnresolvedWatermarkSpec> watermarkSpec = new HashMap<>();
+        @Nullable Schema.UnresolvedPrimaryKey primaryKey = null;
+
+        // Intermediate state
+        Map<String, RelDataType> physicalFieldNamesToTypes = new HashMap<>();
+        Map<String, RelDataType> metadataFieldNamesToTypes = new HashMap<>();
+        Map<String, RelDataType> computedFieldNamesToTypes = new HashMap<>();
+
+        Function<SqlNode, String> escapeExpressions;
+        FlinkTypeFactory typeFactory;
+        SqlValidator sqlValidator;
+        Consumer<SqlTableConstraint> validateTableConstraint;
+
+        UnresolvedSchemaBuilder(
+                ResolvedCatalogTable sourceTable,
+                FlinkTypeFactory typeFactory,
+                SqlValidator sqlValidator,
+                Consumer<SqlTableConstraint> validateTableConstraint,
+                Function<SqlNode, String> escapeExpressions) {
+            this.typeFactory = typeFactory;
+            this.sqlValidator = sqlValidator;
+            this.escapeExpressions = escapeExpressions;
+            this.validateTableConstraint = validateTableConstraint;
+            populateColumnsFromSourceTable(sourceTable);
+            populatePrimaryKeyFromSourceTable(sourceTable.getUnresolvedSchema());
+            populateWatermarksFromSourceTable(sourceTable.getUnresolvedSchema());
+        }
+
+        private void populateColumnsFromSourceTable(ResolvedCatalogTable sourceTable) {
+            List<DataType> types = sourceTable.getResolvedSchema().getColumnDataTypes();
+            List<Schema.UnresolvedColumn> sourceColumns =
+                    sourceTable.getUnresolvedSchema().getColumns();
+            for (int i = 0; i < sourceColumns.size(); i++) {
+                Schema.UnresolvedColumn column = sourceColumns.get(i);
+                String columnName = column.getName();
+                sortedColumnNames.add(columnName);
+                columns.put(columnName, column);
+                RelDataType type =
+                        typeFactory.createFieldTypeFromLogicalType(types.get(i).getLogicalType());
+                if (column instanceof Schema.UnresolvedPhysicalColumn) {
+                    physicalFieldNamesToTypes.put(columnName, type);
+                } else if (column instanceof Schema.UnresolvedComputedColumn) {
+                    computedFieldNamesToTypes.put(columnName, type);
+                } else if (column instanceof Schema.UnresolvedMetadataColumn) {
+                    metadataFieldNamesToTypes.put(columnName, type);
+                }
+            }
+        }
+
+        private void populatePrimaryKeyFromSourceTable(Schema sourceSchema) {
+            if (sourceSchema.getPrimaryKey().isPresent()) {
+                primaryKey = sourceSchema.getPrimaryKey().get();
+            }
+        }
+
+        private void populateWatermarksFromSourceTable(Schema sourceSchema) {
+            for (Schema.UnresolvedWatermarkSpec sourceWatermarkSpec :
+                    sourceSchema.getWatermarkSpecs()) {
+                watermarkSpec.put(sourceWatermarkSpec.getColumnName(), sourceWatermarkSpec);
+            }
+        }
+
+        private void addOrModifyColumns(
+                AlterSchemaStrategy strategy, List<SqlNode> alterColumnPositions) {
+            collectColumnPosition(strategy, alterColumnPositions);
+            for (SqlNode alterColumnPos : alterColumnPositions) {
+                SqlTableColumn alterColumn = ((SqlTableColumnPosition) alterColumnPos).getColumn();
+                if (alterColumn instanceof SqlTableColumn.SqlComputedColumn) {
+                    convertComputedColumn((SqlTableColumn.SqlComputedColumn) alterColumn);
+                } else {
+                    convertNonComputedColumn(alterColumn);
+                }
+            }
+        }
+
+        private void addOrModifyPrimaryKey(
+                AlterSchemaStrategy strategy,
+                SqlTableConstraint alterPrimaryKey,
+                boolean withoutAlterColumn) {
+            validateTableConstraint.accept(alterPrimaryKey);
+            if (strategy == AlterSchemaStrategy.ADD && primaryKey != null) {
+                throw new ValidationException(
+                        String.format(
+                                "The base table already has a primary key %s. You might "
+                                        + "want to drop it before adding a new one.",
+                                primaryKey.getColumnNames()));
+            } else if (strategy == AlterSchemaStrategy.MODIFY && primaryKey == null) {
+                throw new ValidationException(
+                        "The base table does not define any primary key. You might "
+                                + "want to add a new one.");
+            }
+            List<String> primaryKeyColumns = new ArrayList<>();
+            for (SqlNode primaryKeyNode : alterPrimaryKey.getColumns()) {
+                String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple();
+                if (!columns.containsKey(primaryKey)) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Primary key column '%s' is not defined in the schema at %s",
+                                    primaryKey, primaryKeyNode.getParserPosition()));
+                }
+                if (!(columns.get(primaryKey) instanceof Schema.UnresolvedPhysicalColumn)) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Could not create a PRIMARY KEY with column '%s' at %s.\n"
+                                            + "A PRIMARY KEY constraint must be declared on physical columns.",
+                                    primaryKey, primaryKeyNode.getParserPosition()));
+                }
+                primaryKeyColumns.add(primaryKey);
+            }
+            if (withoutAlterColumn) {
+                // a single add/modify constraint changes the nullability of columns implicitly
+                for (String primaryKeyColumn : primaryKeyColumns) {
+                    RelDataType originalType = physicalFieldNamesToTypes.get(primaryKeyColumn);
+                    DataType alterDataType =
+                            TypeConversions.fromLogicalToDataType(
+                                    FlinkTypeFactory.toLogicalType(
+                                            typeFactory.createTypeWithNullability(
+                                                    originalType, false)));
+                    Schema.UnresolvedColumn column = columns.remove(primaryKeyColumn);
+                    columns.put(
+                            primaryKeyColumn,
+                            column(
+                                    primaryKeyColumn,
+                                    alterDataType,
+                                    column.getComment().orElse(null)));
+                }
+            }
+            primaryKey =
+                    primaryKeyNamed(
+                            alterPrimaryKey
+                                    .getConstraintName()
+                                    .orElseGet(
+                                            () ->
+                                                    primaryKeyColumns.stream()
+                                                            .collect(
+                                                                    Collectors.joining(
+                                                                            "_", "PK_", ""))),
+                            primaryKeyColumns);
+        }
+
+        private void addOrModifyWatermarks(
+                AlterSchemaStrategy strategy, SqlWatermark alterWatermarkSpec) {
+            SqlIdentifier eventTimeColumnName = alterWatermarkSpec.getEventTimeColumnName();
+
+            Map<String, RelDataType> nameToTypeMap = new HashMap<>();
+            nameToTypeMap.putAll(physicalFieldNamesToTypes);
+            nameToTypeMap.putAll(metadataFieldNamesToTypes);
+            nameToTypeMap.putAll(computedFieldNamesToTypes);
+
+            verifyRowtimeAttribute(strategy, eventTimeColumnName, nameToTypeMap);
+            String rowtimeAttribute = eventTimeColumnName.toString();
+
+            SqlNode expression = alterWatermarkSpec.getWatermarkStrategy();
+
+            // this will validate and expand function identifiers
+            SqlNode validated =
+                    sqlValidator.validateParameterizedExpression(expression, nameToTypeMap);
+            watermarkSpec.clear();
+            watermarkSpec.put(
+                    rowtimeAttribute,
+                    watermarkSpec(
+                            rowtimeAttribute,
+                            new SqlCallExpression(escapeExpressions.apply(validated))));
+        }
+
+        private void convertNonComputedColumn(SqlTableColumn column) {
+            boolean isPhysical = column instanceof SqlTableColumn.SqlRegularColumn;
+            String name = column.getName().getSimple();
+            SqlDataTypeSpec typeSpec =
+                    isPhysical
+                            ? ((SqlTableColumn.SqlRegularColumn) column).getType()
+                            : ((SqlTableColumn.SqlMetadataColumn) column).getType();
+            boolean nullable = typeSpec.getNullable() == null || typeSpec.getNullable();
+            LogicalType logicalType =
+                    FlinkTypeFactory.toLogicalType(typeSpec.deriveType(sqlValidator, nullable));
+            DataType dataType = TypeConversions.fromLogicalToDataType(logicalType);
+            RelDataType relType = typeSpec.deriveType(sqlValidator, nullable);
+            Schema.UnresolvedColumn newColumn;
+            if (isPhysical) {
+                newColumn = column(name, dataType, getComment(column));
+                physicalFieldNamesToTypes.put(name, relType);
+            } else {
+                newColumn =
+                        columnByMetadata(
+                                name,
+                                dataType,
+                                ((SqlTableColumn.SqlMetadataColumn) column)
+                                        .getMetadataAlias()
+                                        .orElse(null),
+                                ((SqlTableColumn.SqlMetadataColumn) column).isVirtual(),
+                                getComment(column));
+                metadataFieldNamesToTypes.put(name, relType);
+            }
+            columns.put(name, newColumn);
+        }
+
+        private void convertComputedColumn(SqlTableColumn.SqlComputedColumn column) {
+            String name = column.getName().getSimple();
+            Map<String, RelDataType> accessibleFieldNamesToTypes = new HashMap<>();
+            accessibleFieldNamesToTypes.putAll(physicalFieldNamesToTypes);
+            accessibleFieldNamesToTypes.putAll(metadataFieldNamesToTypes);
+
+            // computed column cannot be generated on another computed column
+            final SqlNode validatedExpr =
+                    sqlValidator.validateParameterizedExpression(
+                            column.getExpr(), accessibleFieldNamesToTypes);
+            final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr);
+
+            Schema.UnresolvedColumn newColumn =
+                    columnByExpression(
+                            name,
+                            new SqlCallExpression(escapeExpressions.apply(validatedExpr)),
+                            getComment(column));
+            computedFieldNamesToTypes.put(name, validatedType);
+            columns.put(name, newColumn);
+        }
+
+        private void verifyRowtimeAttribute(
+                AlterSchemaStrategy strategy,
+                SqlIdentifier eventTimeColumnName,
+                Map<String, RelDataType> allFieldsTypes) {
+            if (!watermarkSpec.isEmpty() && strategy == AlterSchemaStrategy.ADD) {
+                throw new ValidationException(
+                        String.format(
+                                "There already exists a watermark spec for column '%s' in the base table. You "
+                                        + "might want to drop it before adding a new one.",
+                                watermarkSpec.keySet()));
+            } else if (watermarkSpec.isEmpty() && strategy == AlterSchemaStrategy.MODIFY) {
+                throw new ValidationException(
+                        "There is no watermark defined in the base table. You might want to add a new one.");
+            }
+            if (!eventTimeColumnName.isSimple()) {
+                throw new ValidationException(
+                        String.format(
+                                "The nested rowtime attribute field '%s' cannot define a watermark.",
+                                eventTimeColumnName));
+            }
+            String rowtimeField = eventTimeColumnName.getSimple();
+            List<String> components = eventTimeColumnName.names;
+            if (!allFieldsTypes.containsKey(components.get(0))) {
+                throw new ValidationException(
+                        String.format(
+                                "The rowtime attribute field '%s' is not defined in the table schema, at %s\n"
+                                        + "Available fields: [%s]",
+                                rowtimeField,
+                                eventTimeColumnName.getParserPosition(),
+                                allFieldsTypes.keySet().stream()
+                                        .collect(Collectors.joining("', '", "'", "'"))));
+            }
+        }
+
+        @Nullable
+        private String getComment(SqlTableColumn column) {
+            return column.getComment()
+                    .map(SqlCharStringLiteral.class::cast)
+                    .map(c -> c.getValueAs(String.class))
+                    .orElse(null);
+        }
+
+        private void collectColumnPosition(
+                AlterSchemaStrategy strategy, List<SqlNode> alterColumns) {
+            for (SqlNode alterColumn : alterColumns) {
+                assert alterColumn instanceof SqlTableColumnPosition;
+                SqlTableColumnPosition columnPosition = (SqlTableColumnPosition) alterColumn;
+                SqlTableColumn column = columnPosition.getColumn();
+                String name = column.getName().getSimple();
+                boolean existed = sortedColumnNames.contains(name);
+                boolean first = columnPosition.isFirstColumn();
+                boolean after = columnPosition.isAfterReferencedColumn();
+                if (strategy == AlterSchemaStrategy.ADD) {
+                    if (existed) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Try to add a column '%s' which already exists in the table.",
+                                        name));
+                    }
+                    if (first) {
+                        sortedColumnNames.add(0, name);
+                    } else if (after) {
+                        getReferencedColumn(
+                                        columnPosition,
+                                        (refName) -> sortedColumnNames.contains(refName))
+                                .ifPresent(
+                                        (refCol) ->
+                                                sortedColumnNames.add(
+                                                        sortedColumnNames.indexOf(refCol) + 1,
+                                                        name));
+                    } else {
+                        sortedColumnNames.add(name);
+                    }
+                } else if (strategy == AlterSchemaStrategy.MODIFY) {
+                    if (!existed) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Try to modify a column '%s' which does not exist in the table.",
+                                        name));
+                    }
+                    if (first) {
+                        sortedColumnNames.remove(name);
+                        sortedColumnNames.add(0, name);
+                    } else if (after) {
+                        getReferencedColumn(
+                                        columnPosition, (refName) -> columns.containsKey(refName))
+                                .ifPresent(
+                                        (refCol) -> {
+                                            sortedColumnNames.remove(name);
+                                            sortedColumnNames.add(
+                                                    sortedColumnNames.indexOf(refCol) + 1, name);
+                                        });
+                    }
+                }
+            }
+        }
+
+        private Optional<String> getReferencedColumn(
+                SqlTableColumnPosition columnPosition, Function<String, Boolean> existFn) {
+            assert columnPosition.getAfterReferencedColumn() != null;
+            String referencedName = columnPosition.getAfterReferencedColumn().getSimple();
+            if (!existFn.apply(referencedName)) {
+                throw new ValidationException(
+                        String.format(
+                                "Referenced column '%s' by 'AFTER' does not exist in the table.",
+                                referencedName));
+            }
+            return Optional.of(referencedName);
+        }
+
+        private Schema.UnresolvedColumn column(
+                String columnName, AbstractDataType<?> dataType, @Nullable String comment) {
+            return Schema.newBuilder()
+                    .column(columnName, dataType)
+                    .withComment(comment)
+                    .build()
+                    .getColumns()
+                    .get(0);
+        }
+
+        private Schema.UnresolvedColumn columnByExpression(
+                String columnName, Expression expression, @Nullable String comment) {
+            return Schema.newBuilder()
+                    .columnByExpression(columnName, expression)

Review Comment:
   Why not use `columnByExpression(String columnName, String sqlExpression)` method directly?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterTableSchemaUtil.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlWatermark;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.util.NlsString;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Util to convert {@link SqlAlterTableSchema} with source table to generate new {@link Schema}. */
+public class AlterTableSchemaUtil {
+
+    private final SqlValidator sqlValidator;
+    private final Consumer<SqlTableConstraint> validateTableConstraint;
+    private final Function<SqlNode, String> escapeExpression;
+
+    AlterTableSchemaUtil(
+            SqlValidator sqlValidator,
+            Function<SqlNode, String> escapeExpression,
+            Consumer<SqlTableConstraint> validateTableConstraint) {
+        this.sqlValidator = sqlValidator;
+        this.validateTableConstraint = validateTableConstraint;
+        this.escapeExpression = escapeExpression;
+    }
+
+    public Schema convertSchema(
+            SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable originalTable) {
+        UnresolvedSchemaBuilder builder =
+                new UnresolvedSchemaBuilder(
+                        originalTable,
+                        (FlinkTypeFactory) sqlValidator.getTypeFactory(),
+                        sqlValidator,
+                        validateTableConstraint,
+                        escapeExpression);
+        AlterSchemaStrategy strategy =
+                alterTableSchema instanceof SqlAlterTableAdd
+                        ? AlterSchemaStrategy.ADD
+                        : AlterSchemaStrategy.MODIFY;
+        builder.addOrModifyColumns(strategy, alterTableSchema.getColumns().getList());
+        List<SqlTableConstraint> fullConstraint = alterTableSchema.getFullConstraint();
+        if (!fullConstraint.isEmpty()) {
+            builder.addOrModifyPrimaryKey(strategy, fullConstraint.get(0));
+        }
+        alterTableSchema
+                .getWatermark()
+                .ifPresent(sqlWatermark -> builder.addOrModifyWatermarks(strategy, sqlWatermark));
+        return builder.build();
+    }
+
+    private static class UnresolvedSchemaBuilder {
+
+        List<String> newColumnNames = new ArrayList<>();
+        Set<String> alterColumnNames = new HashSet<>();
+        Map<String, Schema.UnresolvedColumn> columns = new HashMap<>();
+        Map<String, Schema.UnresolvedWatermarkSpec> watermarkSpec = new HashMap<>();

Review Comment:
   This is a map, so I think we should define it as `watermarkSpecs `, the variable name should try to keep consistent with existing logic, refer to https://github.com/apache/flink/blob/34de3989a613cf7124f9e301cb8284080f4df4ac/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java#L212



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org