You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/30 02:06:17 UTC

[GitHub] [flink] LadyForest commented on a diff in pull request #21571: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

LadyForest commented on code in PR #21571:
URL: https://github.com/apache/flink/pull/21571#discussion_r1059214774


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -115,77 +120,135 @@ public Schema applySchemaChange(SqlAlterTableSchema alterTableSchema, Schema ori
     }
 
     public Schema applySchemaChange(
-            SqlAlterTableRenameColumn renameColumn, ContextResolvedTable originalTable) {
-        String oldColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
+            SqlAlterTableRenameColumn renameColumn, ContextResolvedTable originTable) {
+        String originColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
         String newColumnName = getColumnName(renameColumn.getNewColumnIdentifier());
         List<String> tableColumns =
-                originalTable.getResolvedSchema().getColumns().stream()
+                originTable.getResolvedSchema().getColumns().stream()
                         .map(Column::getName)
                         .collect(Collectors.toList());
-        // validate old column is exists or new column isn't duplicated or old column isn't
-        // referenced by computed column
+        // validate origin column is exists, new column name does not collide with existed column
+        // names, and origin column isn't referenced by computed column
         validateColumnName(
-                oldColumnName,
+                originColumnName,
                 newColumnName,
                 tableColumns,
-                originalTable.getResolvedSchema(),
-                ((CatalogTable) originalTable.getResolvedTable()).getPartitionKeys());
+                originTable.getResolvedSchema(),
+                ((CatalogTable) originTable.getResolvedTable()).getPartitionKeys());
+        validateWatermark(originTable, originColumnName, tableColumns);
 
-        // validate old column isn't referenced by watermark
-        List<WatermarkSpec> watermarkSpecs = originalTable.getResolvedSchema().getWatermarkSpecs();
-        watermarkSpecs.forEach(
-                watermarkSpec -> {
-                    String rowtimeAttribute = watermarkSpec.getRowtimeAttribute();
-                    Set<String> referencedColumns =
-                            ColumnReferenceFinder.findReferencedColumn(
-                                    watermarkSpec.getWatermarkExpression(), tableColumns);
-                    if (oldColumnName.equals(rowtimeAttribute)
-                            || referencedColumns.contains(oldColumnName)) {
-                        throw new ValidationException(
-                                String.format(
-                                        "Old column %s is referred by watermark expression %s, "
-                                                + "currently doesn't allow to rename column which is "
-                                                + "referred by watermark expression.",
-                                        oldColumnName, watermarkSpec.asSummaryString()));
+        // generate new schema
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> {
+                    if (column.getName().equals(originColumnName)) {
+                        buildNewColumnFromOriginColumn(builder, column, newColumnName);
+                    } else {
+                        builder.fromColumns(Collections.singletonList(column));
                     }
                 });
+        buildUpdatedPrimaryKey(
+                schemaBuilder,
+                originTable,
+                (pk) -> pk.equals(originColumnName) ? newColumnName : pk);
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
 
-        Schema.Builder builder = Schema.newBuilder();
-        // build column
-        Schema originSchema = originalTable.getTable().getUnresolvedSchema();
-        originSchema
-                .getColumns()
+    public Schema applySchemaChange(
+            SqlAlterTableDropColumn dropColumn, ContextResolvedTable originTable) {
+        List<String> tableColumns =
+                originTable.getResolvedSchema().getColumns().stream()
+                        .map(Column::getName)
+                        .collect(Collectors.toList());
+        Set<String> primaryKeys = new HashSet<>();
+        originTable
+                .getResolvedSchema()
+                .getPrimaryKey()
+                .ifPresent(pk -> primaryKeys.addAll(pk.getColumns()));
+        Set<String> columnsToDrop = new HashSet<>();
+        dropColumn
+                .getColumnList()
                 .forEach(
-                        column -> {
-                            if (oldColumnName.equals(column.getName())) {
-                                buildNewColumnFromOriginColumn(builder, column, newColumnName);
-                            } else {
-                                builder.fromColumns(Collections.singletonList(column));
+                        identifier -> {
+                            String name = getColumnName((SqlIdentifier) identifier);
+                            if (!columnsToDrop.add(name)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "%sDuplicate column `%s`.", EX_MSG_PREFIX, name));
                             }
                         });
-        // build primary key
-        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
-        if (originPrimaryKey.isPresent()) {
-            List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames();
-            String constrainName = originPrimaryKey.get().getConstraintName();
-            List<String> newPrimaryKeyNames =
-                    originPrimaryKeyNames.stream()
-                            .map(pkName -> pkName.equals(oldColumnName) ? newColumnName : pkName)
-                            .collect(Collectors.toList());
-            builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
-        }
-
-        // build watermark
-        originSchema
-                .getWatermarkSpecs()
-                .forEach(
-                        watermarkSpec ->
-                                builder.watermark(
-                                        watermarkSpec.getColumnName(),
-                                        watermarkSpec.getWatermarkExpression()));
 
-        // generate new schema
-        return builder.build();
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        for (SqlNode columnIdentifier : dropColumn.getColumnList()) {
+            String columnToDrop = getColumnName((SqlIdentifier) columnIdentifier);
+            // validate the column to drop exists in the table schema, is not a primary key and
+            // does not derive any computed column
+            validateColumnName(
+                    columnToDrop,
+                    tableColumns,
+                    originTable.getResolvedSchema(),
+                    ((CatalogTable) originTable.getResolvedTable()).getPartitionKeys(),
+                    primaryKeys,
+                    columnsToDrop);
+            validateWatermark(originTable, columnToDrop, tableColumns);
+        }
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> {
+                    if (!columnsToDrop.contains(column.getName())) {
+                        builder.fromColumns(Collections.singletonList(column));
+                    }
+                });
+        buildUpdatedPrimaryKey(schemaBuilder, originTable, (pk) -> pk);
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
+
+    public Schema applySchemaChange(
+            SqlAlterTableDropConstraint dropConstraint, ContextResolvedTable originTable) {
+        Optional<UniqueConstraint> pkConstraint = originTable.getResolvedSchema().getPrimaryKey();
+        if (!pkConstraint.isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "%sThe base table does not define any primary key.", EX_MSG_PREFIX));
+        }
+        SqlIdentifier constraintIdentifier = dropConstraint.getConstraintName();
+        String constraintName = pkConstraint.get().getName();
+        if (constraintIdentifier != null
+                && !constraintIdentifier.getSimple().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "%sThe base table does not define a primary key constraint named '%s'. "
+                                    + "Available constraint name: ['%s'].",
+                            EX_MSG_PREFIX, constraintIdentifier.getSimple(), constraintName));
+        }
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> builder.fromColumns(Collections.singletonList(column)));
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
+
+    public Schema applySchemaChange(ContextResolvedTable originTable) {

Review Comment:
   > It's hard for other to understand what's the usage of this method from the signature. How about
   > 
   > ```
   > applySchemaChange(SqlAlterWatermark, ContextResolvedTable)
   > ```
   
   I agree that the method is not straightforward, but `SqlAlterWatermark` is not used. How about adding the method description?
   ```java
   /** Convert ALTER TABLE DROP WATERMARK to generate an updated {@link Schema}. */
   applySchemaChange(ContextResolvedTable)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org