You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/06 03:35:19 UTC

[GitHub] [flink] luoyuxia commented on a diff in pull request #19329: [FLINK-22318][table] Support RENAME column name for ALTER TABLE statement

luoyuxia commented on code in PR #19329:
URL: https://github.com/apache/flink/pull/19329#discussion_r843408437


##########
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl:
##########
@@ -548,6 +551,18 @@ SqlAlterTable SqlAlterTable() :
                         tableIdentifier,
                         newTableIdentifier);
         }
+    |
+        <RENAME>

Review Comment:
   I'm not sure whether it's good to omit `COULMN` in rename column sql . Now, we know it's for renaming column name, but what if rename partition name or others?
   For hive, the sql to rename table column is 
   `CHANGE COLUMN? oldName newName`.
   For spark, it also is like 
   `ALTER TABLE table_name RENAME COLUMN clause`.
   



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -144,6 +147,146 @@ public static Operation convertChangeColumn(
         // TODO: handle watermark and constraints
     }
 
+    public static Operation convertRenameColumn(
+            ObjectIdentifier tableIdentifier,
+            String originColumnName,
+            String newColumnName,
+            CatalogTable catalogTable,
+            ResolvedSchema originResolveSchema) {
+
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        List<String> columnNames =
+                originSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+        // validate old column is exists or new column is duplicated or old column is
+        // referenced by computed column
+        validateColumnName(originColumnName, newColumnName, columnNames, originResolveSchema);
+
+        // validate old column is referenced by watermark
+        List<org.apache.flink.table.catalog.WatermarkSpec> watermarkSpecs =
+                originResolveSchema.getWatermarkSpecs();
+        watermarkSpecs.forEach(
+                watermarkSpec -> {
+                    String rowtimeAttribute = watermarkSpec.getRowtimeAttribute();
+                    Set<String> referencedColumns =
+                            ColumnReferenceFinder.findReferencedColumn(
+                                    watermarkSpec.getWatermarkExpression(), columnNames);
+                    if (originColumnName.equals(rowtimeAttribute)
+                            || referencedColumns.contains(originColumnName)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Old column %s is referenced by watermark expression %s, "
+                                                + "currently doesn't allow to rename column which is "
+                                                + "referenced by watermark expression.",
+                                        originColumnName, watermarkSpec.asSummaryString()));
+                    }
+                });
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        originSchema.getColumns().stream()

Review Comment:
   nit: seem `.stream()` can be omitted



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -144,6 +147,146 @@ public static Operation convertChangeColumn(
         // TODO: handle watermark and constraints
     }
 
+    public static Operation convertRenameColumn(
+            ObjectIdentifier tableIdentifier,
+            String originColumnName,
+            String newColumnName,
+            CatalogTable catalogTable,
+            ResolvedSchema originResolveSchema) {
+
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        List<String> columnNames =
+                originSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+        // validate old column is exists or new column is duplicated or old column is
+        // referenced by computed column
+        validateColumnName(originColumnName, newColumnName, columnNames, originResolveSchema);
+
+        // validate old column is referenced by watermark
+        List<org.apache.flink.table.catalog.WatermarkSpec> watermarkSpecs =
+                originResolveSchema.getWatermarkSpecs();
+        watermarkSpecs.forEach(
+                watermarkSpec -> {
+                    String rowtimeAttribute = watermarkSpec.getRowtimeAttribute();
+                    Set<String> referencedColumns =
+                            ColumnReferenceFinder.findReferencedColumn(
+                                    watermarkSpec.getWatermarkExpression(), columnNames);
+                    if (originColumnName.equals(rowtimeAttribute)
+                            || referencedColumns.contains(originColumnName)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Old column %s is referenced by watermark expression %s, "
+                                                + "currently doesn't allow to rename column which is "
+                                                + "referenced by watermark expression.",
+                                        originColumnName, watermarkSpec.asSummaryString()));
+                    }
+                });
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        originSchema.getColumns().stream()
+                .forEach(
+                        column -> {
+                            if (originColumnName.equals(column.getName())) {
+                                buildNewColumnFromOriginColumn(builder, column, newColumnName);
+                            } else {
+                                buildNewColumnFromOriginColumn(builder, column, column.getName());
+                            }
+                        });
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames();
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> newPrimaryKeyNames =
+                    originPrimaryKeyNames.stream()
+                            .map(pkName -> pkName.equals(originColumnName) ? newColumnName : pkName)
+                            .collect(Collectors.toList());
+
+            builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
+        }
+
+        // build watermark
+        originSchema.getWatermarkSpecs().stream()

Review Comment:
   nit: seem .stream() can be omitted



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -144,6 +147,146 @@ public static Operation convertChangeColumn(
         // TODO: handle watermark and constraints
     }
 
+    public static Operation convertRenameColumn(
+            ObjectIdentifier tableIdentifier,
+            String originColumnName,
+            String newColumnName,
+            CatalogTable catalogTable,
+            ResolvedSchema originResolveSchema) {
+
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        List<String> columnNames =
+                originSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+        // validate old column is exists or new column is duplicated or old column is
+        // referenced by computed column
+        validateColumnName(originColumnName, newColumnName, columnNames, originResolveSchema);
+
+        // validate old column is referenced by watermark
+        List<org.apache.flink.table.catalog.WatermarkSpec> watermarkSpecs =
+                originResolveSchema.getWatermarkSpecs();
+        watermarkSpecs.forEach(
+                watermarkSpec -> {
+                    String rowtimeAttribute = watermarkSpec.getRowtimeAttribute();
+                    Set<String> referencedColumns =
+                            ColumnReferenceFinder.findReferencedColumn(
+                                    watermarkSpec.getWatermarkExpression(), columnNames);
+                    if (originColumnName.equals(rowtimeAttribute)
+                            || referencedColumns.contains(originColumnName)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Old column %s is referenced by watermark expression %s, "
+                                                + "currently doesn't allow to rename column which is "
+                                                + "referenced by watermark expression.",
+                                        originColumnName, watermarkSpec.asSummaryString()));
+                    }
+                });
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        originSchema.getColumns().stream()
+                .forEach(
+                        column -> {
+                            if (originColumnName.equals(column.getName())) {
+                                buildNewColumnFromOriginColumn(builder, column, newColumnName);
+                            } else {
+                                buildNewColumnFromOriginColumn(builder, column, column.getName());
+                            }
+                        });
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames();
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> newPrimaryKeyNames =
+                    originPrimaryKeyNames.stream()
+                            .map(pkName -> pkName.equals(originColumnName) ? newColumnName : pkName)
+                            .collect(Collectors.toList());
+
+            builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
+        }
+
+        // build watermark
+        originSchema.getWatermarkSpecs().stream()
+                .forEach(
+                        watermarkSpec ->
+                                builder.watermark(
+                                        watermarkSpec.getColumnName(),
+                                        watermarkSpec.getWatermarkExpression()));
+
+        // build partition key
+        List<String> newPartitionKeys =
+                catalogTable.getPartitionKeys().stream()
+                        .map(name -> name.equals(originColumnName) ? newColumnName : name)
+                        .collect(Collectors.toList());
+
+        // generate new schema
+        return new AlterTableSchemaOperation(
+                tableIdentifier,
+                CatalogTable.of(
+                        builder.build(),
+                        catalogTable.getComment(),
+                        newPartitionKeys,
+                        catalogTable.getOptions()));
+    }
+
+    private static void validateColumnName(
+            String originColumnName,
+            String newColumnName,
+            List<String> columnNames,
+            ResolvedSchema originResolvedSchema) {
+        int originColumnIndex = columnNames.indexOf(originColumnName);
+        if (originColumnIndex < 0) {
+            throw new ValidationException(
+                    String.format("Old column %s not found for RENAME COLUMN ", originColumnName));
+        }
+
+        int sameColumnNameIndex = columnNames.indexOf(newColumnName);
+        if (sameColumnNameIndex >= 0) {
+            throw new ValidationException(
+                    String.format("New column %s existed for RENAME COLUMN ", newColumnName));
+        }
+
+        // validate old column name is referenced by computed column case
+        originResolvedSchema.getColumns().stream()
+                .filter(column -> column instanceof Column.ComputedColumn)
+                .forEach(
+                        column -> {
+                            Column.ComputedColumn computedColumn = (Column.ComputedColumn) column;
+                            Set<String> referencedColumn =
+                                    ColumnReferenceFinder.findReferencedColumn(
+                                            computedColumn.getExpression(), columnNames);
+                            if (referencedColumn.contains(originColumnName)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Old column %s is referenced by computed column %s, currently doesn't "
+                                                        + "allow to rename column which is referenced by computed column.",
+                                                originColumnName,
+                                                computedColumn.asSummaryString()));
+                            }
+                        });
+    }
+
+    private static void buildNewColumnFromOriginColumn(
+            Schema.Builder builder, Schema.UnresolvedColumn originColumn, String columnName) {
+        if (originColumn instanceof Schema.UnresolvedComputedColumn) {
+            builder.columnByExpression(
+                    columnName, ((Schema.UnresolvedComputedColumn) originColumn).getExpression());
+        } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
+            builder.column(
+                    columnName, ((Schema.UnresolvedPhysicalColumn) originColumn).getDataType());
+        } else {

Review Comment:
   Nit: is it will be better do the `column instance of`   again in here intead of assuming all the other column is instance of `UnresolvedMetadataColumn` in case of new types of Column will be added in the future?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java:
##########
@@ -0,0 +1,105 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+
+import org.apache.calcite.rex.RexInputRef;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** A finder used to look up referenced column name in a {@link ResolvedExpression}. */
+public class ColumnReferenceFinder {
+
+    public static Set<String> findReferencedColumn(
+            ResolvedExpression resolvedExpression, List<String> inputFields) {
+        ColumnReferenceVisitor visitor = new ColumnReferenceVisitor(inputFields);
+        visitor.visit(resolvedExpression);
+        return visitor.referencedColumns;
+    }
+
+    private static class ColumnReferenceVisitor extends ExpressionDefaultVisitor<Void> {
+        private final List<String> inputFields;
+        private final Set<String> referencedColumns;
+
+        public ColumnReferenceVisitor(List<String> inputFields) {

Review Comment:
   nit: seems the construct method can be private/protected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org