You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/11 07:01:36 UTC

[GitHub] [flink] lsyldliu opened a new pull request, #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

lsyldliu opened a new pull request, #19419:
URL: https://github.com/apache/flink/pull/19419

   ## What is the purpose of the change
   
   Support DROP column/constraint/watermark for ALTER TABLE statement
   
   ## Brief change log
   
     - *Support DROP column/constraint/watermark for ALTER TABLE statement*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
     - *Add unit tests in SqlToOperationConverterTest*
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on code in PR #19419:
URL: https://github.com/apache/flink/pull/19419#discussion_r848239830


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {

Review Comment:
   Some confuses about the code. Seems the constraints will always be primarykey?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java:
##########
@@ -20,24 +20,42 @@
 
 import org.apache.flink.table.catalog.ObjectIdentifier;
 
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
 /** Operation of "ALTER TABLE ADD [CONSTRAINT constraintName] ..." clause. * */
 public class AlterTableDropConstraintOperation extends AlterTableOperation {
-    private final String constraintName;
+
+    private final boolean isPrimaryKey;
+    private final @Nullable String constraintName;
 
     public AlterTableDropConstraintOperation(
-            ObjectIdentifier tableIdentifier, String constraintName) {
+            ObjectIdentifier tableIdentifier,
+            boolean isPrimaryKey,

Review Comment:
   Will it better to wrap `isPrimaryKey` and `constraintName` to a class may called `Constraint`? 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table

Review Comment:
   nit: 
   ```suggestion
           // validate primary key exists in table
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "CONSTRAINT [%s] does not exist in table %s",
+                            constraintName, tableIdentifier));
+        }
+
+        return new AlterTableDropConstraintOperation(tableIdentifier, isPrimaryKey, constraintName);
+    }
+
+    public static Operation convertDropWatermark(
+            ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        if (CollectionUtil.isNullOrEmpty(originSchema.getWatermarkSpecs())) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist watermark.", tableIdentifier));
+        }
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(originSchema.getColumns());
+
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> primaryKeyNames = originPrimaryKey.get().getColumnNames();
+            builder.primaryKeyNamed(constrainName, primaryKeyNames);
+        }
+        return new AlterTableSchemaOperation(
+                tableIdentifier,
+                CatalogTable.of(
+                        builder.build(),
+                        catalogTable.getComment(),
+                        catalogTable.getPartitionKeys(),
+                        catalogTable.getOptions()));
+    }
+
+    public static Operation convertDropColumns(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            ResolvedSchema originResolveSchema,
+            SqlAlterTableDropColumns sqlAlterTableDropColumns) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        List<String> originTableColumns =
+                originSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+
+        // filter the dropped column which is not in table firstly
+        List<String> toDropColumns =
+                sqlAlterTableDropColumns.getColumns().getList().stream()
+                        .map(SqlIdentifier.class::cast)
+                        .map(SqlIdentifier::getSimple)
+                        .filter(column -> originTableColumns.contains(column))
+                        .collect(Collectors.toList());
+
+        // validate column size
+        if (originTableColumns.size() == 1 && toDropColumns.size() > 0) {

Review Comment:
   What if the `toDropColumns` is equal to `originTableColumns`?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "CONSTRAINT [%s] does not exist in table %s",
+                            constraintName, tableIdentifier));
+        }
+
+        return new AlterTableDropConstraintOperation(tableIdentifier, isPrimaryKey, constraintName);
+    }
+
+    public static Operation convertDropWatermark(
+            ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        if (CollectionUtil.isNullOrEmpty(originSchema.getWatermarkSpecs())) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist watermark.", tableIdentifier));
+        }
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(originSchema.getColumns());
+
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();

Review Comment:
   should we also build constraint or primary key is the only constraint?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "CONSTRAINT [%s] does not exist in table %s",
+                            constraintName, tableIdentifier));
+        }
+
+        return new AlterTableDropConstraintOperation(tableIdentifier, isPrimaryKey, constraintName);
+    }
+
+    public static Operation convertDropWatermark(
+            ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        if (CollectionUtil.isNullOrEmpty(originSchema.getWatermarkSpecs())) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist watermark.", tableIdentifier));
+        }
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(originSchema.getColumns());
+
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> primaryKeyNames = originPrimaryKey.get().getColumnNames();
+            builder.primaryKeyNamed(constrainName, primaryKeyNames);
+        }
+        return new AlterTableSchemaOperation(
+                tableIdentifier,
+                CatalogTable.of(
+                        builder.build(),
+                        catalogTable.getComment(),
+                        catalogTable.getPartitionKeys(),
+                        catalogTable.getOptions()));
+    }
+
+    public static Operation convertDropColumns(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            ResolvedSchema originResolveSchema,
+            SqlAlterTableDropColumns sqlAlterTableDropColumns) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        List<String> originTableColumns =
+                originSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+
+        // filter the dropped column which is not in table firstly
+        List<String> toDropColumns =
+                sqlAlterTableDropColumns.getColumns().getList().stream()
+                        .map(SqlIdentifier.class::cast)
+                        .map(SqlIdentifier::getSimple)
+                        .filter(column -> originTableColumns.contains(column))
+                        .collect(Collectors.toList());
+
+        // validate column size
+        if (originTableColumns.size() == 1 && toDropColumns.size() > 0) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s only has one column, please use DROP TABLE syntax.",
+                            tableIdentifier));
+        }
+
+        // validate the dropped column is referenced by computed column
+        toDropColumns.forEach(
+                column -> validateComputedColumn(column, originTableColumns, originResolveSchema));
+
+        // validate the dropped column is referenced by watermark
+        toDropColumns.forEach(
+                column ->
+                        validateWatermark(
+                                column,
+                                originResolveSchema.getWatermarkSpecs(),
+                                originTableColumns));
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(
+                originSchema.getColumns().stream()
+                        .filter(originColumn -> !toDropColumns.contains(originColumn.getName()))
+                        .collect(Collectors.toList()));
+
+        // build watermark
+        originSchema
+                .getWatermarkSpecs()
+                .forEach(
+                        watermarkSpec ->
+                                builder.watermark(
+                                        watermarkSpec.getColumnName(),
+                                        watermarkSpec.getWatermarkExpression()));
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames();
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> newPrimaryKeyNames =
+                    originPrimaryKeyNames.stream()
+                            .filter(pkName -> !toDropColumns.contains(pkName))
+                            .collect(Collectors.toList());
+            if (newPrimaryKeyNames.size() > 0) {

Review Comment:
   What the behavior of drop one column of composite primary key in other databases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #19419:
URL: https://github.com/apache/flink/pull/19419#discussion_r858254419


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {

Review Comment:
   Yes, flink only support primary key constraint, doesn't support unique constraint



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #19419:
URL: https://github.com/apache/flink/pull/19419#discussion_r858258708


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java:
##########
@@ -20,24 +20,42 @@
 
 import org.apache.flink.table.catalog.ObjectIdentifier;
 
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
 /** Operation of "ALTER TABLE ADD [CONSTRAINT constraintName] ..." clause. * */
 public class AlterTableDropConstraintOperation extends AlterTableOperation {
-    private final String constraintName;
+
+    private final boolean isPrimaryKey;
+    private final @Nullable String constraintName;
 
     public AlterTableDropConstraintOperation(
-            ObjectIdentifier tableIdentifier, String constraintName) {
+            ObjectIdentifier tableIdentifier,
+            boolean isPrimaryKey,

Review Comment:
   I think it is not needed, here only two variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19419:
URL: https://github.com/apache/flink/pull/19419#issuecomment-1094628650

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "13888ef108ff779d1a42f20d7a4a5f52e53bd9a1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "13888ef108ff779d1a42f20d7a4a5f52e53bd9a1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 13888ef108ff779d1a42f20d7a4a5f52e53bd9a1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on code in PR #19419:
URL: https://github.com/apache/flink/pull/19419#discussion_r858313219


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "CONSTRAINT [%s] does not exist in table %s",
+                            constraintName, tableIdentifier));
+        }
+
+        return new AlterTableDropConstraintOperation(tableIdentifier, isPrimaryKey, constraintName);
+    }
+
+    public static Operation convertDropWatermark(
+            ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        if (CollectionUtil.isNullOrEmpty(originSchema.getWatermarkSpecs())) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist watermark.", tableIdentifier));
+        }
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(originSchema.getColumns());
+
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();

Review Comment:
   Nerver mind. As the primary key is the only constraint, it's okay to only build primary key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #19419:
URL: https://github.com/apache/flink/pull/19419#discussion_r858254419


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {

Review Comment:
   Yes, primary key also is constraint, it has a constraint name. we can drop primary key in two statement:
   
   1. `DROP PIMARY KEY ` directly
   2. Specify the constraint name, use `DROP CONSTRAINT xxxx`  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #19419:
URL: https://github.com/apache/flink/pull/19419#discussion_r858245938


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "CONSTRAINT [%s] does not exist in table %s",
+                            constraintName, tableIdentifier));
+        }
+
+        return new AlterTableDropConstraintOperation(tableIdentifier, isPrimaryKey, constraintName);
+    }
+
+    public static Operation convertDropWatermark(
+            ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        if (CollectionUtil.isNullOrEmpty(originSchema.getWatermarkSpecs())) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist watermark.", tableIdentifier));
+        }
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(originSchema.getColumns());
+
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> primaryKeyNames = originPrimaryKey.get().getColumnNames();
+            builder.primaryKeyNamed(constrainName, primaryKeyNames);
+        }
+        return new AlterTableSchemaOperation(
+                tableIdentifier,
+                CatalogTable.of(
+                        builder.build(),
+                        catalogTable.getComment(),
+                        catalogTable.getPartitionKeys(),
+                        catalogTable.getOptions()));
+    }
+
+    public static Operation convertDropColumns(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            ResolvedSchema originResolveSchema,
+            SqlAlterTableDropColumns sqlAlterTableDropColumns) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        List<String> originTableColumns =
+                originSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+
+        // filter the dropped column which is not in table firstly
+        List<String> toDropColumns =
+                sqlAlterTableDropColumns.getColumns().getList().stream()
+                        .map(SqlIdentifier.class::cast)
+                        .map(SqlIdentifier::getSimple)
+                        .filter(column -> originTableColumns.contains(column))
+                        .collect(Collectors.toList());
+
+        // validate column size
+        if (originTableColumns.size() == 1 && toDropColumns.size() > 0) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s only has one column, please use DROP TABLE syntax.",
+                            tableIdentifier));
+        }
+
+        // validate the dropped column is referenced by computed column
+        toDropColumns.forEach(
+                column -> validateComputedColumn(column, originTableColumns, originResolveSchema));
+
+        // validate the dropped column is referenced by watermark
+        toDropColumns.forEach(
+                column ->
+                        validateWatermark(
+                                column,
+                                originResolveSchema.getWatermarkSpecs(),
+                                originTableColumns));
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(
+                originSchema.getColumns().stream()
+                        .filter(originColumn -> !toDropColumns.contains(originColumn.getName()))
+                        .collect(Collectors.toList()));
+
+        // build watermark
+        originSchema
+                .getWatermarkSpecs()
+                .forEach(
+                        watermarkSpec ->
+                                builder.watermark(
+                                        watermarkSpec.getColumnName(),
+                                        watermarkSpec.getWatermarkExpression()));
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames();
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> newPrimaryKeyNames =
+                    originPrimaryKeyNames.stream()
+                            .filter(pkName -> !toDropColumns.contains(pkName))
+                            .collect(Collectors.toList());
+            if (newPrimaryKeyNames.size() > 0) {

Review Comment:
   mysql and postgresql.Here is some scenes when drop column:
   
   Constraint refers to this column and other column.Remove this column from Constraint.If constraint only refers to this column ,drop column and constraint together.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 closed pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

Posted by GitBox <gi...@apache.org>.
fsk119 closed pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement
URL: https://github.com/apache/flink/pull/19419


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #19419:
URL: https://github.com/apache/flink/pull/19419#issuecomment-1369055614

   Merged in the #21571


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #19419:
URL: https://github.com/apache/flink/pull/19419#discussion_r858260447


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "CONSTRAINT [%s] does not exist in table %s",
+                            constraintName, tableIdentifier));
+        }
+
+        return new AlterTableDropConstraintOperation(tableIdentifier, isPrimaryKey, constraintName);
+    }
+
+    public static Operation convertDropWatermark(
+            ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        if (CollectionUtil.isNullOrEmpty(originSchema.getWatermarkSpecs())) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist watermark.", tableIdentifier));
+        }
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(originSchema.getColumns());
+
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();

Review Comment:
   I did not understand the meaning of this sentence, can you give more context?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #19419: [FLINK-22317][table] Support DROP column/constraint/watermark for ALTER TABLE statement

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #19419:
URL: https://github.com/apache/flink/pull/19419#discussion_r858254419


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && !oriPrimaryKey.get().getConstraintName().equals(constraintName)) {

Review Comment:
   Yes, flink only support primary key constraint, doesn't support unique constraint. https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java#L1212



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org