You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/26 03:57:35 UTC

[GitHub] [flink] lsyldliu commented on a diff in pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

lsyldliu commented on code in PR #19419:
URL: https://github.com/apache/flink/pull/19419#discussion_r858245938


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "CONSTRAINT [%s] does not exist in table %s",
+                            constraintName, tableIdentifier));
+        }
+
+        return new AlterTableDropConstraintOperation(tableIdentifier, isPrimaryKey, constraintName);
+    }
+
+    public static Operation convertDropWatermark(
+            ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        if (CollectionUtil.isNullOrEmpty(originSchema.getWatermarkSpecs())) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist watermark.", tableIdentifier));
+        }
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(originSchema.getColumns());
+
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> primaryKeyNames = originPrimaryKey.get().getColumnNames();
+            builder.primaryKeyNamed(constrainName, primaryKeyNames);
+        }
+        return new AlterTableSchemaOperation(
+                tableIdentifier,
+                CatalogTable.of(
+                        builder.build(),
+                        catalogTable.getComment(),
+                        catalogTable.getPartitionKeys(),
+                        catalogTable.getOptions()));
+    }
+
+    public static Operation convertDropColumns(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            ResolvedSchema originResolveSchema,
+            SqlAlterTableDropColumns sqlAlterTableDropColumns) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        List<String> originTableColumns =
+                originSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+
+        // filter the dropped column which is not in table firstly
+        List<String> toDropColumns =
+                sqlAlterTableDropColumns.getColumns().getList().stream()
+                        .map(SqlIdentifier.class::cast)
+                        .map(SqlIdentifier::getSimple)
+                        .filter(column -> originTableColumns.contains(column))
+                        .collect(Collectors.toList());
+
+        // validate column size
+        if (originTableColumns.size() == 1 && toDropColumns.size() > 0) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s only has one column, please use DROP TABLE syntax.",
+                            tableIdentifier));
+        }
+
+        // validate the dropped column is referenced by computed column
+        toDropColumns.forEach(
+                column -> validateComputedColumn(column, originTableColumns, originResolveSchema));
+
+        // validate the dropped column is referenced by watermark
+        toDropColumns.forEach(
+                column ->
+                        validateWatermark(
+                                column,
+                                originResolveSchema.getWatermarkSpecs(),
+                                originTableColumns));
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(
+                originSchema.getColumns().stream()
+                        .filter(originColumn -> !toDropColumns.contains(originColumn.getName()))
+                        .collect(Collectors.toList()));
+
+        // build watermark
+        originSchema
+                .getWatermarkSpecs()
+                .forEach(
+                        watermarkSpec ->
+                                builder.watermark(
+                                        watermarkSpec.getColumnName(),
+                                        watermarkSpec.getWatermarkExpression()));
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames();
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> newPrimaryKeyNames =
+                    originPrimaryKeyNames.stream()
+                            .filter(pkName -> !toDropColumns.contains(pkName))
+                            .collect(Collectors.toList());
+            if (newPrimaryKeyNames.size() > 0) {

Review Comment:
   mysql and postgresql.Here is some scenes when drop column:
   
   Constraint refers to this column and other column.Remove this column from Constraint.If constraint only refers to this column ,drop column and constraint together.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org