You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/29 16:04:20 UTC

[GitHub] [flink] fsk119 commented on a diff in pull request #21571: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

fsk119 commented on code in PR #21571:
URL: https://github.com/apache/flink/pull/21571#discussion_r1058978409


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -115,77 +120,135 @@ public Schema applySchemaChange(SqlAlterTableSchema alterTableSchema, Schema ori
     }
 
     public Schema applySchemaChange(
-            SqlAlterTableRenameColumn renameColumn, ContextResolvedTable originalTable) {
-        String oldColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
+            SqlAlterTableRenameColumn renameColumn, ContextResolvedTable originTable) {
+        String originColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
         String newColumnName = getColumnName(renameColumn.getNewColumnIdentifier());
         List<String> tableColumns =
-                originalTable.getResolvedSchema().getColumns().stream()
+                originTable.getResolvedSchema().getColumns().stream()
                         .map(Column::getName)
                         .collect(Collectors.toList());
-        // validate old column is exists or new column isn't duplicated or old column isn't
-        // referenced by computed column
+        // validate origin column is exists, new column name does not collide with existed column
+        // names, and origin column isn't referenced by computed column
         validateColumnName(
-                oldColumnName,
+                originColumnName,
                 newColumnName,
                 tableColumns,
-                originalTable.getResolvedSchema(),
-                ((CatalogTable) originalTable.getResolvedTable()).getPartitionKeys());
+                originTable.getResolvedSchema(),
+                ((CatalogTable) originTable.getResolvedTable()).getPartitionKeys());
+        validateWatermark(originTable, originColumnName, tableColumns);
 
-        // validate old column isn't referenced by watermark
-        List<WatermarkSpec> watermarkSpecs = originalTable.getResolvedSchema().getWatermarkSpecs();
-        watermarkSpecs.forEach(
-                watermarkSpec -> {
-                    String rowtimeAttribute = watermarkSpec.getRowtimeAttribute();
-                    Set<String> referencedColumns =
-                            ColumnReferenceFinder.findReferencedColumn(
-                                    watermarkSpec.getWatermarkExpression(), tableColumns);
-                    if (oldColumnName.equals(rowtimeAttribute)
-                            || referencedColumns.contains(oldColumnName)) {
-                        throw new ValidationException(
-                                String.format(
-                                        "Old column %s is referred by watermark expression %s, "
-                                                + "currently doesn't allow to rename column which is "
-                                                + "referred by watermark expression.",
-                                        oldColumnName, watermarkSpec.asSummaryString()));
+        // generate new schema
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> {
+                    if (column.getName().equals(originColumnName)) {
+                        buildNewColumnFromOriginColumn(builder, column, newColumnName);
+                    } else {
+                        builder.fromColumns(Collections.singletonList(column));
                     }
                 });
+        buildUpdatedPrimaryKey(
+                schemaBuilder,
+                originTable,
+                (pk) -> pk.equals(originColumnName) ? newColumnName : pk);
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
 
-        Schema.Builder builder = Schema.newBuilder();
-        // build column
-        Schema originSchema = originalTable.getTable().getUnresolvedSchema();
-        originSchema
-                .getColumns()
+    public Schema applySchemaChange(
+            SqlAlterTableDropColumn dropColumn, ContextResolvedTable originTable) {
+        List<String> tableColumns =
+                originTable.getResolvedSchema().getColumns().stream()
+                        .map(Column::getName)
+                        .collect(Collectors.toList());
+        Set<String> primaryKeys = new HashSet<>();
+        originTable
+                .getResolvedSchema()
+                .getPrimaryKey()
+                .ifPresent(pk -> primaryKeys.addAll(pk.getColumns()));
+        Set<String> columnsToDrop = new HashSet<>();
+        dropColumn
+                .getColumnList()
                 .forEach(
-                        column -> {
-                            if (oldColumnName.equals(column.getName())) {
-                                buildNewColumnFromOriginColumn(builder, column, newColumnName);
-                            } else {
-                                builder.fromColumns(Collections.singletonList(column));
+                        identifier -> {
+                            String name = getColumnName((SqlIdentifier) identifier);
+                            if (!columnsToDrop.add(name)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "%sDuplicate column `%s`.", EX_MSG_PREFIX, name));
                             }
                         });
-        // build primary key
-        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
-        if (originPrimaryKey.isPresent()) {
-            List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames();
-            String constrainName = originPrimaryKey.get().getConstraintName();
-            List<String> newPrimaryKeyNames =
-                    originPrimaryKeyNames.stream()
-                            .map(pkName -> pkName.equals(oldColumnName) ? newColumnName : pkName)
-                            .collect(Collectors.toList());
-            builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
-        }
-
-        // build watermark
-        originSchema
-                .getWatermarkSpecs()
-                .forEach(
-                        watermarkSpec ->
-                                builder.watermark(
-                                        watermarkSpec.getColumnName(),
-                                        watermarkSpec.getWatermarkExpression()));
 
-        // generate new schema
-        return builder.build();
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        for (SqlNode columnIdentifier : dropColumn.getColumnList()) {
+            String columnToDrop = getColumnName((SqlIdentifier) columnIdentifier);
+            // validate the column to drop exists in the table schema, is not a primary key and
+            // does not derive any computed column
+            validateColumnName(
+                    columnToDrop,
+                    tableColumns,
+                    originTable.getResolvedSchema(),
+                    ((CatalogTable) originTable.getResolvedTable()).getPartitionKeys(),
+                    primaryKeys,
+                    columnsToDrop);
+            validateWatermark(originTable, columnToDrop, tableColumns);
+        }
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> {
+                    if (!columnsToDrop.contains(column.getName())) {
+                        builder.fromColumns(Collections.singletonList(column));
+                    }
+                });
+        buildUpdatedPrimaryKey(schemaBuilder, originTable, (pk) -> pk);

Review Comment:
   `pk -> pk` => `Function.identity()`



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java:
##########
@@ -55,7 +59,11 @@ public List<SqlNode> getOperandList() {
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
         super.unparse(writer, leftPrec, rightPrec);
-        writer.keyword("DROP CONSTRAINT");
-        this.constraintName.unparse(writer, leftPrec, rightPrec);
+        if (constraintName == null) {
+            writer.keyword("DROP PRIMARY KEY");
+        } else {
+            writer.keyword("DROP CONSTRAINT");
+            this.constraintName.unparse(writer, leftPrec, rightPrec);
+        }

Review Comment:
   Here we uses `constraintName = null` to mean drop pk. It's not as straighforward as  introducing a new sql node to express the drop pk syntax. 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -604,24 +703,90 @@ private void validateColumnName(
                             Set<String> referencedColumn =
                                     ColumnReferenceFinder.findReferencedColumn(
                                             computedColumn.getExpression(), tableColumns);
-                            if (referencedColumn.contains(originColumnName)) {
+                            if (computedColumnChecker.apply(referencedColumn, computedColumn)) {
                                 throw new ValidationException(
                                         String.format(
-                                                "Old column %s is referred by computed column %s, currently doesn't "
-                                                        + "allow to rename column which is referred by computed column.",
-                                                originColumnName,
+                                                "%sThe column `%s` is referenced by computed column %s.",
+                                                EX_MSG_PREFIX,
+                                                columnToAlter,
                                                 computedColumn.asSummaryString()));
                             }
                         });
-        // validate partition keys doesn't contain the old column
-        if (partitionKeys.contains(originColumnName)) {
+        // validate partition keys doesn't contain the origin column
+        if (partitionKeys.contains(columnToAlter)) {
             throw new ValidationException(
                     String.format(
-                            "Can not rename column %s because it is used as the partition keys.",
-                            originColumnName));
+                            "%sThe column `%s` is used as the partition keys.",
+                            EX_MSG_PREFIX, columnToAlter));
         }
     }
 
+    private void validateWatermark(
+            ContextResolvedTable originTable, String columnToAlter, List<String> tableColumns) {
+        // validate origin column isn't referenced by watermark
+        List<WatermarkSpec> watermarkSpecs = originTable.getResolvedSchema().getWatermarkSpecs();
+        watermarkSpecs.forEach(
+                watermarkSpec -> {
+                    String rowtimeAttribute = watermarkSpec.getRowtimeAttribute();
+                    Set<String> referencedColumns =
+                            ColumnReferenceFinder.findReferencedColumn(
+                                    watermarkSpec.getWatermarkExpression(), tableColumns);
+                    if (columnToAlter.equals(rowtimeAttribute)
+                            || referencedColumns.contains(columnToAlter)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "%sThe column `%s` is referenced by watermark expression %s.",
+                                        EX_MSG_PREFIX,
+                                        columnToAlter,
+                                        watermarkSpec.asSummaryString()));
+                    }
+                });
+    }
+
+    private void buildUpdatedColumn(
+            Schema.Builder builder,
+            ContextResolvedTable originTable,
+            BiConsumer<Schema.Builder, Schema.UnresolvedColumn> columnConsumer) {
+        // build column
+        originTable
+                .getTable()
+                .getUnresolvedSchema()
+                .getColumns()
+                .forEach(column -> columnConsumer.accept(builder, column));
+    }
+
+    private void buildUpdatedPrimaryKey(
+            Schema.Builder builder,
+            ContextResolvedTable originTable,
+            Function<String, String> pkGenerator) {

Review Comment:
   pkGenerator -> columnRenamer



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -604,24 +703,90 @@ private void validateColumnName(
                             Set<String> referencedColumn =
                                     ColumnReferenceFinder.findReferencedColumn(
                                             computedColumn.getExpression(), tableColumns);
-                            if (referencedColumn.contains(originColumnName)) {
+                            if (computedColumnChecker.apply(referencedColumn, computedColumn)) {
                                 throw new ValidationException(
                                         String.format(
-                                                "Old column %s is referred by computed column %s, currently doesn't "
-                                                        + "allow to rename column which is referred by computed column.",
-                                                originColumnName,
+                                                "%sThe column `%s` is referenced by computed column %s.",
+                                                EX_MSG_PREFIX,
+                                                columnToAlter,
                                                 computedColumn.asSummaryString()));
                             }

Review Comment:
   It's not very easy for others to understand. Could you add some comment to describe the magic here?
   
   BTW, the drop order will influence the logic here, e.g. drop physical column first and then drop the computed column using the physical column?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -115,77 +120,135 @@ public Schema applySchemaChange(SqlAlterTableSchema alterTableSchema, Schema ori
     }
 
     public Schema applySchemaChange(
-            SqlAlterTableRenameColumn renameColumn, ContextResolvedTable originalTable) {
-        String oldColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
+            SqlAlterTableRenameColumn renameColumn, ContextResolvedTable originTable) {
+        String originColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
         String newColumnName = getColumnName(renameColumn.getNewColumnIdentifier());
         List<String> tableColumns =
-                originalTable.getResolvedSchema().getColumns().stream()
+                originTable.getResolvedSchema().getColumns().stream()
                         .map(Column::getName)
                         .collect(Collectors.toList());
-        // validate old column is exists or new column isn't duplicated or old column isn't
-        // referenced by computed column
+        // validate origin column is exists, new column name does not collide with existed column
+        // names, and origin column isn't referenced by computed column
         validateColumnName(
-                oldColumnName,
+                originColumnName,
                 newColumnName,
                 tableColumns,
-                originalTable.getResolvedSchema(),
-                ((CatalogTable) originalTable.getResolvedTable()).getPartitionKeys());
+                originTable.getResolvedSchema(),
+                ((CatalogTable) originTable.getResolvedTable()).getPartitionKeys());
+        validateWatermark(originTable, originColumnName, tableColumns);
 
-        // validate old column isn't referenced by watermark
-        List<WatermarkSpec> watermarkSpecs = originalTable.getResolvedSchema().getWatermarkSpecs();
-        watermarkSpecs.forEach(
-                watermarkSpec -> {
-                    String rowtimeAttribute = watermarkSpec.getRowtimeAttribute();
-                    Set<String> referencedColumns =
-                            ColumnReferenceFinder.findReferencedColumn(
-                                    watermarkSpec.getWatermarkExpression(), tableColumns);
-                    if (oldColumnName.equals(rowtimeAttribute)
-                            || referencedColumns.contains(oldColumnName)) {
-                        throw new ValidationException(
-                                String.format(
-                                        "Old column %s is referred by watermark expression %s, "
-                                                + "currently doesn't allow to rename column which is "
-                                                + "referred by watermark expression.",
-                                        oldColumnName, watermarkSpec.asSummaryString()));
+        // generate new schema
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> {
+                    if (column.getName().equals(originColumnName)) {
+                        buildNewColumnFromOriginColumn(builder, column, newColumnName);
+                    } else {
+                        builder.fromColumns(Collections.singletonList(column));
                     }
                 });
+        buildUpdatedPrimaryKey(
+                schemaBuilder,
+                originTable,
+                (pk) -> pk.equals(originColumnName) ? newColumnName : pk);
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
 
-        Schema.Builder builder = Schema.newBuilder();
-        // build column
-        Schema originSchema = originalTable.getTable().getUnresolvedSchema();
-        originSchema
-                .getColumns()
+    public Schema applySchemaChange(
+            SqlAlterTableDropColumn dropColumn, ContextResolvedTable originTable) {

Review Comment:
   I think `ContextResolvedTable` -> `ResolvedCatalogTable` can reduce a lot of  invocation of the `getTable`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -115,77 +120,135 @@ public Schema applySchemaChange(SqlAlterTableSchema alterTableSchema, Schema ori
     }
 
     public Schema applySchemaChange(
-            SqlAlterTableRenameColumn renameColumn, ContextResolvedTable originalTable) {
-        String oldColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
+            SqlAlterTableRenameColumn renameColumn, ContextResolvedTable originTable) {
+        String originColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
         String newColumnName = getColumnName(renameColumn.getNewColumnIdentifier());
         List<String> tableColumns =
-                originalTable.getResolvedSchema().getColumns().stream()
+                originTable.getResolvedSchema().getColumns().stream()
                         .map(Column::getName)
                         .collect(Collectors.toList());
-        // validate old column is exists or new column isn't duplicated or old column isn't
-        // referenced by computed column
+        // validate origin column is exists, new column name does not collide with existed column
+        // names, and origin column isn't referenced by computed column
         validateColumnName(
-                oldColumnName,
+                originColumnName,
                 newColumnName,
                 tableColumns,
-                originalTable.getResolvedSchema(),
-                ((CatalogTable) originalTable.getResolvedTable()).getPartitionKeys());
+                originTable.getResolvedSchema(),
+                ((CatalogTable) originTable.getResolvedTable()).getPartitionKeys());
+        validateWatermark(originTable, originColumnName, tableColumns);
 
-        // validate old column isn't referenced by watermark
-        List<WatermarkSpec> watermarkSpecs = originalTable.getResolvedSchema().getWatermarkSpecs();
-        watermarkSpecs.forEach(
-                watermarkSpec -> {
-                    String rowtimeAttribute = watermarkSpec.getRowtimeAttribute();
-                    Set<String> referencedColumns =
-                            ColumnReferenceFinder.findReferencedColumn(
-                                    watermarkSpec.getWatermarkExpression(), tableColumns);
-                    if (oldColumnName.equals(rowtimeAttribute)
-                            || referencedColumns.contains(oldColumnName)) {
-                        throw new ValidationException(
-                                String.format(
-                                        "Old column %s is referred by watermark expression %s, "
-                                                + "currently doesn't allow to rename column which is "
-                                                + "referred by watermark expression.",
-                                        oldColumnName, watermarkSpec.asSummaryString()));
+        // generate new schema
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> {
+                    if (column.getName().equals(originColumnName)) {
+                        buildNewColumnFromOriginColumn(builder, column, newColumnName);
+                    } else {
+                        builder.fromColumns(Collections.singletonList(column));
                     }
                 });
+        buildUpdatedPrimaryKey(
+                schemaBuilder,
+                originTable,
+                (pk) -> pk.equals(originColumnName) ? newColumnName : pk);
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
 
-        Schema.Builder builder = Schema.newBuilder();
-        // build column
-        Schema originSchema = originalTable.getTable().getUnresolvedSchema();
-        originSchema
-                .getColumns()
+    public Schema applySchemaChange(
+            SqlAlterTableDropColumn dropColumn, ContextResolvedTable originTable) {
+        List<String> tableColumns =
+                originTable.getResolvedSchema().getColumns().stream()
+                        .map(Column::getName)
+                        .collect(Collectors.toList());
+        Set<String> primaryKeys = new HashSet<>();
+        originTable
+                .getResolvedSchema()
+                .getPrimaryKey()
+                .ifPresent(pk -> primaryKeys.addAll(pk.getColumns()));
+        Set<String> columnsToDrop = new HashSet<>();
+        dropColumn
+                .getColumnList()
                 .forEach(
-                        column -> {
-                            if (oldColumnName.equals(column.getName())) {
-                                buildNewColumnFromOriginColumn(builder, column, newColumnName);
-                            } else {
-                                builder.fromColumns(Collections.singletonList(column));
+                        identifier -> {
+                            String name = getColumnName((SqlIdentifier) identifier);
+                            if (!columnsToDrop.add(name)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "%sDuplicate column `%s`.", EX_MSG_PREFIX, name));
                             }
                         });
-        // build primary key
-        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
-        if (originPrimaryKey.isPresent()) {
-            List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames();
-            String constrainName = originPrimaryKey.get().getConstraintName();
-            List<String> newPrimaryKeyNames =
-                    originPrimaryKeyNames.stream()
-                            .map(pkName -> pkName.equals(oldColumnName) ? newColumnName : pkName)
-                            .collect(Collectors.toList());
-            builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
-        }
-
-        // build watermark
-        originSchema
-                .getWatermarkSpecs()
-                .forEach(
-                        watermarkSpec ->
-                                builder.watermark(
-                                        watermarkSpec.getColumnName(),
-                                        watermarkSpec.getWatermarkExpression()));
 
-        // generate new schema
-        return builder.build();
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        for (SqlNode columnIdentifier : dropColumn.getColumnList()) {
+            String columnToDrop = getColumnName((SqlIdentifier) columnIdentifier);
+            // validate the column to drop exists in the table schema, is not a primary key and
+            // does not derive any computed column
+            validateColumnName(
+                    columnToDrop,
+                    tableColumns,
+                    originTable.getResolvedSchema(),
+                    ((CatalogTable) originTable.getResolvedTable()).getPartitionKeys(),
+                    primaryKeys,
+                    columnsToDrop);
+            validateWatermark(originTable, columnToDrop, tableColumns);
+        }
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> {
+                    if (!columnsToDrop.contains(column.getName())) {
+                        builder.fromColumns(Collections.singletonList(column));
+                    }
+                });
+        buildUpdatedPrimaryKey(schemaBuilder, originTable, (pk) -> pk);
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
+
+    public Schema applySchemaChange(
+            SqlAlterTableDropConstraint dropConstraint, ContextResolvedTable originTable) {
+        Optional<UniqueConstraint> pkConstraint = originTable.getResolvedSchema().getPrimaryKey();
+        if (!pkConstraint.isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "%sThe base table does not define any primary key.", EX_MSG_PREFIX));
+        }
+        SqlIdentifier constraintIdentifier = dropConstraint.getConstraintName();
+        String constraintName = pkConstraint.get().getName();
+        if (constraintIdentifier != null
+                && !constraintIdentifier.getSimple().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "%sThe base table does not define a primary key constraint named '%s'. "
+                                    + "Available constraint name: ['%s'].",
+                            EX_MSG_PREFIX, constraintIdentifier.getSimple(), constraintName));
+        }
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> builder.fromColumns(Collections.singletonList(column)));
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
+
+    public Schema applySchemaChange(ContextResolvedTable originTable) {

Review Comment:
   It's hard for other to understand what's the usage of this method from the signature. How about
   ```
   applySchemaChange(SqlAlterWatermark, ContextResolvedTable)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org