You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/15 11:35:37 UTC

[GitHub] [flink] luoyuxia commented on a diff in pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

luoyuxia commented on code in PR #19419:
URL: https://github.com/apache/flink/pull/19419#discussion_r848239830


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {

Review Comment:
   Some confuses about the code. Seems the constraints will always be primarykey?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java:
##########
@@ -20,24 +20,42 @@
 
 import org.apache.flink.table.catalog.ObjectIdentifier;
 
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
 /** Operation of "ALTER TABLE ADD [CONSTRAINT constraintName] ..." clause. * */
 public class AlterTableDropConstraintOperation extends AlterTableOperation {
-    private final String constraintName;
+
+    private final boolean isPrimaryKey;
+    private final @Nullable String constraintName;
 
     public AlterTableDropConstraintOperation(
-            ObjectIdentifier tableIdentifier, String constraintName) {
+            ObjectIdentifier tableIdentifier,
+            boolean isPrimaryKey,

Review Comment:
   Will it better to wrap `isPrimaryKey` and `constraintName` to a class may called `Constraint`? 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table

Review Comment:
   nit: 
   ```suggestion
           // validate primary key exists in table
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "CONSTRAINT [%s] does not exist in table %s",
+                            constraintName, tableIdentifier));
+        }
+
+        return new AlterTableDropConstraintOperation(tableIdentifier, isPrimaryKey, constraintName);
+    }
+
+    public static Operation convertDropWatermark(
+            ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        if (CollectionUtil.isNullOrEmpty(originSchema.getWatermarkSpecs())) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist watermark.", tableIdentifier));
+        }
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(originSchema.getColumns());
+
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> primaryKeyNames = originPrimaryKey.get().getColumnNames();
+            builder.primaryKeyNamed(constrainName, primaryKeyNames);
+        }
+        return new AlterTableSchemaOperation(
+                tableIdentifier,
+                CatalogTable.of(
+                        builder.build(),
+                        catalogTable.getComment(),
+                        catalogTable.getPartitionKeys(),
+                        catalogTable.getOptions()));
+    }
+
+    public static Operation convertDropColumns(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            ResolvedSchema originResolveSchema,
+            SqlAlterTableDropColumns sqlAlterTableDropColumns) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        List<String> originTableColumns =
+                originSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+
+        // filter the dropped column which is not in table firstly
+        List<String> toDropColumns =
+                sqlAlterTableDropColumns.getColumns().getList().stream()
+                        .map(SqlIdentifier.class::cast)
+                        .map(SqlIdentifier::getSimple)
+                        .filter(column -> originTableColumns.contains(column))
+                        .collect(Collectors.toList());
+
+        // validate column size
+        if (originTableColumns.size() == 1 && toDropColumns.size() > 0) {

Review Comment:
   What if the `toDropColumns` is equal to `originTableColumns`?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "CONSTRAINT [%s] does not exist in table %s",
+                            constraintName, tableIdentifier));
+        }
+
+        return new AlterTableDropConstraintOperation(tableIdentifier, isPrimaryKey, constraintName);
+    }
+
+    public static Operation convertDropWatermark(
+            ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        if (CollectionUtil.isNullOrEmpty(originSchema.getWatermarkSpecs())) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist watermark.", tableIdentifier));
+        }
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(originSchema.getColumns());
+
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();

Review Comment:
   should we also build constraint or primary key is the only constraint?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "CONSTRAINT [%s] does not exist in table %s",
+                            constraintName, tableIdentifier));
+        }
+
+        return new AlterTableDropConstraintOperation(tableIdentifier, isPrimaryKey, constraintName);
+    }
+
+    public static Operation convertDropWatermark(
+            ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        if (CollectionUtil.isNullOrEmpty(originSchema.getWatermarkSpecs())) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist watermark.", tableIdentifier));
+        }
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(originSchema.getColumns());
+
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> primaryKeyNames = originPrimaryKey.get().getColumnNames();
+            builder.primaryKeyNamed(constrainName, primaryKeyNames);
+        }
+        return new AlterTableSchemaOperation(
+                tableIdentifier,
+                CatalogTable.of(
+                        builder.build(),
+                        catalogTable.getComment(),
+                        catalogTable.getPartitionKeys(),
+                        catalogTable.getOptions()));
+    }
+
+    public static Operation convertDropColumns(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            ResolvedSchema originResolveSchema,
+            SqlAlterTableDropColumns sqlAlterTableDropColumns) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        List<String> originTableColumns =
+                originSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+
+        // filter the dropped column which is not in table firstly
+        List<String> toDropColumns =
+                sqlAlterTableDropColumns.getColumns().getList().stream()
+                        .map(SqlIdentifier.class::cast)
+                        .map(SqlIdentifier::getSimple)
+                        .filter(column -> originTableColumns.contains(column))
+                        .collect(Collectors.toList());
+
+        // validate column size
+        if (originTableColumns.size() == 1 && toDropColumns.size() > 0) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s only has one column, please use DROP TABLE syntax.",
+                            tableIdentifier));
+        }
+
+        // validate the dropped column is referenced by computed column
+        toDropColumns.forEach(
+                column -> validateComputedColumn(column, originTableColumns, originResolveSchema));
+
+        // validate the dropped column is referenced by watermark
+        toDropColumns.forEach(
+                column ->
+                        validateWatermark(
+                                column,
+                                originResolveSchema.getWatermarkSpecs(),
+                                originTableColumns));
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(
+                originSchema.getColumns().stream()
+                        .filter(originColumn -> !toDropColumns.contains(originColumn.getName()))
+                        .collect(Collectors.toList()));
+
+        // build watermark
+        originSchema
+                .getWatermarkSpecs()
+                .forEach(
+                        watermarkSpec ->
+                                builder.watermark(
+                                        watermarkSpec.getColumnName(),
+                                        watermarkSpec.getWatermarkExpression()));
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames();
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> newPrimaryKeyNames =
+                    originPrimaryKeyNames.stream()
+                            .filter(pkName -> !toDropColumns.contains(pkName))
+                            .collect(Collectors.toList());
+            if (newPrimaryKeyNames.size() > 0) {

Review Comment:
   What the behavior of drop one column of composite primary key in other databases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org