You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/29 12:48:00 UTC

[GitHub] [flink] fsk119 opened a new pull request, #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

fsk119 opened a new pull request, #21577:
URL: https://github.com/apache/flink/pull/21577

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *Introduce TableChange to represent ALTER TABLE MODIFY syntax.*
   
   
   ## Brief change log
   
     - *Collect table changes during the sql to operation*
     - *Add the modify changes in the `TableChange`*
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as *SqlToOperationConverterTest*.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] LadyForest commented on a diff in pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #21577:
URL: https://github.com/apache/flink/pull/21577#discussion_r1060300795


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -685,18 +731,18 @@ String getComment(SqlTableColumn column) {
     // --------------------------------------------------------------------------------------------
 
     private void validateColumnName(
-            String originColumnName,
+            String oldColumnName,
             String newColumnName,
-            ResolvedSchema originSchemas,
+            ResolvedSchema oldSchemas,
             List<String> partitionKeys) {
         validateColumnName(
-                originColumnName,
-                originSchemas,
+                oldColumnName,
+                oldSchemas,

Review Comment:
   Nit: `oldSchemas` -> `oldSchema` ?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -685,18 +731,18 @@ String getComment(SqlTableColumn column) {
     // --------------------------------------------------------------------------------------------
 
     private void validateColumnName(
-            String originColumnName,
+            String oldColumnName,
             String newColumnName,
-            ResolvedSchema originSchemas,
+            ResolvedSchema oldSchemas,

Review Comment:
   Nit: `oldSchemas` -> `oldSchema` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #21577:
URL: https://github.com/apache/flink/pull/21577#discussion_r1060260536


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -698,7 +782,7 @@ private AlterSchemaStrategy computeAlterSchemaStrategy(SqlAlterTableSchema alter
                         alterTableSchema.getClass().getCanonicalName()));
     }
 
-    private static <T> T unwrap(Optional<T> value) {
+    private <T> T unwrap(Optional<T> value) {

Review Comment:
   ModifySchemaConverter and AddSchemaConverter are both static classes and can only use static methods in the parent class. But now I change them to class-level inner class, so I think we can remove the static keyword now. 
   
   BTW, I think we should narrow the scope of the visibility. With the `static` keyword, the visibility is much larger. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 closed pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

Posted by GitBox <gi...@apache.org>.
fsk119 closed pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change
URL: https://github.com/apache/flink/pull/21577


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #21577:
URL: https://github.com/apache/flink/pull/21577#issuecomment-1369338527

   Thanks for the review!
   
   >Since the MODIFY operation is split into a list of changes, I would like to know whether the order matters and how the external catalog copes with them. For example, does API expose a requirement that it should be an atomic operation, or ? does the external catalog decide it?
   
   Yes, the order matters. The catalog developer needs to make sure the execution in order and the execution is atomic. The logic may looks like(the idea is from the Spark JdbcTablecatalog#alterTable)
   
   ```
   
   default void alterTable(
               ObjectPath tablePath,
               CatalogBaseTable newTable,
               List<TableChange> tableChanges,
               boolean ignoreIfNotExists)
               throws TableNotExistException, CatalogException {
       // make sure all table changes are supported.
       validate(tableChanges);
       // jdbc catalog can work like
       conn.setAutoCommit(false);
       Statement statement = conn.createStatement()
       try {
           statement.setQueryTimeout(options.queryTimeout);
           for (String sql = dialect.alterTable(tableName, changes, metaData.getDatabaseMajorVersion)) {
              statement.executeUpdate(sql)
           }
           conn.commit()
       } catch (Exception e) {
         if (conn != null) {
           conn.rollback();
         }
         throw e;
       } finally {
         statement.close();
         conn.setAutoCommit(true);
       }
   }
   
   ``` 
   
   > The "oldColumn" and "originColumn" are interchangeably used across the changelog, and can we align them to improve the readability?
   
   Thanks for pointing it out. I agree with you we should align the term in the codes. I am prone to use the term "oldColumn" instead because we already have used this in the FLIP-273. WDYT?
   
   > SchemaConverter#updatePositionAndCollectColumnChange and OperationConverterUtils#buildColumnChange are very similar. Do we have a plan to do some refactor work?
   
   Yes. I think we can refactor this after all parts finish. WDYT?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] LadyForest commented on a diff in pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #21577:
URL: https://github.com/apache/flink/pull/21577#discussion_r1059352101


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -94,6 +96,137 @@ static AddWatermark add(WatermarkSpec watermarkSpec) {
         return new AddWatermark(watermarkSpec);
     }
 
+    /**
+     * A table change to modify a column. The modification includes:
+     *
+     * <ul>
+     *   <li>change column data type
+     *   <li>reorder column position
+     *   <li>modify column comment
+     *   <li>change the computed expression
+     * </ul>
+     *
+     * <p>Some fine-grained column changes are built in the {@link

Review Comment:
   Nit: built in -> represented by



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -94,6 +96,137 @@ static AddWatermark add(WatermarkSpec watermarkSpec) {
         return new AddWatermark(watermarkSpec);
     }
 
+    /**
+     * A table change to modify a column. The modification includes:
+     *
+     * <ul>
+     *   <li>change column data type
+     *   <li>reorder column position
+     *   <li>modify column comment
+     *   <li>change the computed expression
+     * </ul>
+     *
+     * <p>Some fine-grained column changes are built in the {@link
+     * TableChange#modifyPhysicalColumnType}, {@link TableChange#modifyColumnName}, {@link
+     * TableChange#modifyColumnComment} and {@link TableChange#modifyColumnPosition}.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_definition&gt; COMMENT '&lt;column_comment&gt;' &lt;column_position&gt;
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param newColumn the definition of the new column.
+     * @param columnPosition the new position of the column.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyColumn modify(
+            Column oldColumn, Column newColumn, @Nullable ColumnPosition columnPosition) {
+        return new ModifyColumn(oldColumn, newColumn, columnPosition);
+    }
+
+    /**
+     * A table change that modify the physical column data type.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;new_column_type&gt;
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param newType the type of the new column.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyPhysicalColumnType modifyPhysicalColumnType(Column oldColumn, DataType newType) {
+        return new ModifyPhysicalColumnType(oldColumn, newType);
+    }
+
+    /**
+     * A table change to modify the column name.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; RENAME &lt;old_column_name&gt; TO &lt;new_column_name&gt;
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param newName the name of the new column.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyColumnName modifyColumnName(Column oldColumn, String newName) {
+        return new ModifyColumnName(oldColumn, newName);
+    }
+
+    /**
+     * A table change to modify the column comment.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;original_column_type&gt; COMMENT '&lt;new_column_comment&gt;'
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param newComment the modified comment.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyColumnComment modifyColumnComment(Column oldColumn, String newComment) {
+        return new ModifyColumnComment(oldColumn, newComment);
+    }
+
+    /**
+     * A table change to modify the column position.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;original_column_type&gt; &lt;column_position&gt;
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param columnPosition the new position of the column.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyColumnPosition modifyColumnPosition(
+            Column oldColumn, ColumnPosition columnPosition) {
+        return new ModifyColumnPosition(oldColumn, columnPosition);
+    }
+
+    /**
+     * A table change to add a unique constraint.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY PRIMARY KEY (&lt;column_name&gt;...) NOT ENFORCED;
+     * </pre>
+     *
+     * @param newConstraint the modified constraint definition.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyUniqueConstraint modify(UniqueConstraint newConstraint) {
+        return new ModifyUniqueConstraint(newConstraint);
+    }
+
+    /**
+     * A table change to add a watermark.

Review Comment:
   modify?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -151,12 +157,35 @@ public static Operation convertChangeColumn(
         TableSchema oldSchema = catalogTable.getSchema();
         boolean first = changeColumn.isFirst();
         String after = changeColumn.getAfter() == null ? null : changeColumn.getAfter().getSimple();
-        TableColumn newTableColumn = toTableColumn(changeColumn.getNewColumn(), sqlValidator);
+        TableColumn.PhysicalColumn newTableColumn =
+                toTableColumn(changeColumn.getNewColumn(), sqlValidator);
         TableSchema newSchema = changeColumn(oldSchema, oldName, newTableColumn, first, after);
         Map<String, String> newProperties = new HashMap<>(catalogTable.getOptions());
         newProperties.putAll(extractProperties(changeColumn.getProperties()));
-        return new AlterTableSchemaOperation(
+
+        List<TableChange> tableChanges =
+                buildColumnChange(
+                        catalogTable
+                                .getResolvedSchema()
+                                .getColumn(oldName)
+                                .orElseThrow(
+                                        () ->
+                                                new ValidationException(
+                                                        "Failed to get old column: " + oldName)),
+                        Column.physical(newTableColumn.getName(), newTableColumn.getType())
+                                .withComment(
+                                        changeColumn
+                                                .getNewColumn()

Review Comment:
   Add a util method?
   
   ```java
   @Nullable
   private static String getComment(SqlTableColumn column) {
       return column.getComment()
              .map(SqlCharStringLiteral.class::cast)
               .map(c -> c.getValueAs(String.class))
               .orElse(null);
   }
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -276,6 +409,359 @@ public String toString() {
         }
     }
 
+    // --------------------------------------------------------------------------------------------
+    // Modify Change
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * A base schema change to modify a column. The modification includes:
+     *
+     * <ul>
+     *   <li>change column data type
+     *   <li>reorder column position
+     *   <li>modify column comment
+     *   <li>change the computed expression
+     * </ul>
+     *
+     * <p>Some fine-grained column changes are defined in the {@link ModifyPhysicalColumnType},
+     * {@link ModifyColumnComment}, {@link ModifyColumnPosition} and {@link ModifyColumnName}.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_definition&gt; COMMENT '&lt;column_comment&gt;' &lt;column_position&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyColumn implements TableChange {
+
+        protected final Column oldColumn;
+        protected final Column newColumn;
+
+        protected final @Nullable ColumnPosition newPosition;
+
+        public ModifyColumn(
+                Column oldColumn, Column newColumn, @Nullable ColumnPosition newPosition) {
+            this.oldColumn = oldColumn;
+            this.newColumn = newColumn;
+            this.newPosition = newPosition;
+        }
+
+        /** Returns the original {@link Column} instance. */
+        public Column getOldColumn() {
+            return oldColumn;
+        }
+
+        /** Returns the modified {@link Column} instance. */
+        public Column getNewColumn() {
+            return newColumn;
+        }
+
+        /**
+         * Returns the position of the modified {@link Column} instance. When the return value is
+         * null, it means modify the column at the original position. When the return value is
+         * FIRST, it means move the modified column to the first. When the return value is AFTER, it
+         * means move the column after the referred column.
+         */
+        public @Nullable ColumnPosition getNewPosition() {
+            return newPosition;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof ModifyColumn)) {
+                return false;
+            }
+            ModifyColumn that = (ModifyColumn) o;
+            return Objects.equals(oldColumn, that.oldColumn)
+                    && Objects.equals(newColumn, that.newColumn)
+                    && Objects.equals(newPosition, that.newPosition);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(oldColumn, newColumn, newPosition);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyColumn{"
+                    + "oldColumn="
+                    + oldColumn
+                    + ", newColumn="
+                    + newColumn
+                    + ", newPosition="
+                    + newPosition
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change to modify the column comment.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;original_column_type&gt; COMMENT '&lt;new_column_comment&gt;'
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyColumnComment extends ModifyColumn {
+
+        private final String newComment;
+
+        private ModifyColumnComment(Column oldColumn, String newComment) {
+            super(oldColumn, oldColumn.withComment(newComment), null);
+            this.newComment = newComment;
+        }
+
+        /** Get the new comment for the column. */
+        public String getNewComment() {
+            return newComment;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return (o instanceof ModifyColumnComment) && super.equals(o);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyColumnComment{"
+                    + "Column="
+                    + oldColumn
+                    + ", newComment='"
+                    + newComment
+                    + '\''
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change to modify the column position.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;original_column_type&gt; &lt;column_position&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyColumnPosition extends ModifyColumn {
+
+        public ModifyColumnPosition(Column oldColumn, ColumnPosition newPosition) {
+            super(oldColumn, oldColumn, newPosition);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return (o instanceof ModifyColumnPosition) && super.equals(o);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyColumnPosition{"
+                    + "Column="
+                    + oldColumn
+                    + ", newPosition="
+                    + newPosition
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change that modify the physical column data type.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;new_column_type&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyPhysicalColumnType extends ModifyColumn {
+
+        private ModifyPhysicalColumnType(Column oldColumn, DataType newType) {
+            super(oldColumn, oldColumn.copy(newType), null);
+            Preconditions.checkArgument(oldColumn.isPhysical());
+        }
+
+        /** Get the column type for the new column. */
+        public DataType getNewType() {
+            return newColumn.getDataType();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return (o instanceof ModifyPhysicalColumnType) && super.equals(o);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyPhysicalColumnType{"
+                    + "Column="
+                    + oldColumn
+                    + ", newType="
+                    + getNewType()
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change to modify the column name.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; RENAME &lt;old_column_name&gt; TO &lt;new_column_name&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyColumnName extends ModifyColumn {
+
+        private ModifyColumnName(Column oldColumn, String newName) {
+            super(oldColumn, createNewColumn(oldColumn, newName), null);
+        }
+
+        private static Column createNewColumn(Column oldColumn, String newName) {
+            if (oldColumn instanceof Column.PhysicalColumn) {
+                return Column.physical(newName, oldColumn.getDataType())
+                        .withComment(oldColumn.comment);
+            } else if (oldColumn instanceof Column.MetadataColumn) {
+                Column.MetadataColumn metadataColumn = (Column.MetadataColumn) oldColumn;
+                return Column.metadata(
+                                newName,
+                                oldColumn.getDataType(),
+                                metadataColumn.getMetadataKey().orElse(null),
+                                metadataColumn.isVirtual())
+                        .withComment(oldColumn.comment);
+            } else {
+                return Column.computed(newName, ((Column.ComputedColumn) oldColumn).getExpression())
+                        .withComment(oldColumn.comment);
+            }
+        }
+
+        /** Returns the origin column name. */
+        public String getOldColumnName() {
+            return oldColumn.getName();
+        }
+
+        /** Returns the new column name after renaming the column name. */
+        public String getNewColumnName() {
+            return newColumn.getName();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return (o instanceof ModifyColumnName) && super.equals(o);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyColumnName{"
+                    + "Column="
+                    + oldColumn
+                    + ", newName="
+                    + getNewColumnName()
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change to modify an unique constraint.

Review Comment:
   Nit: an -> a



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -94,6 +96,137 @@ static AddWatermark add(WatermarkSpec watermarkSpec) {
         return new AddWatermark(watermarkSpec);
     }
 
+    /**
+     * A table change to modify a column. The modification includes:
+     *
+     * <ul>
+     *   <li>change column data type
+     *   <li>reorder column position
+     *   <li>modify column comment
+     *   <li>change the computed expression
+     * </ul>
+     *
+     * <p>Some fine-grained column changes are built in the {@link
+     * TableChange#modifyPhysicalColumnType}, {@link TableChange#modifyColumnName}, {@link
+     * TableChange#modifyColumnComment} and {@link TableChange#modifyColumnPosition}.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_definition&gt; COMMENT '&lt;column_comment&gt;' &lt;column_position&gt;
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param newColumn the definition of the new column.
+     * @param columnPosition the new position of the column.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyColumn modify(
+            Column oldColumn, Column newColumn, @Nullable ColumnPosition columnPosition) {
+        return new ModifyColumn(oldColumn, newColumn, columnPosition);
+    }
+
+    /**
+     * A table change that modify the physical column data type.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;new_column_type&gt;
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param newType the type of the new column.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyPhysicalColumnType modifyPhysicalColumnType(Column oldColumn, DataType newType) {
+        return new ModifyPhysicalColumnType(oldColumn, newType);
+    }
+
+    /**
+     * A table change to modify the column name.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; RENAME &lt;old_column_name&gt; TO &lt;new_column_name&gt;
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param newName the name of the new column.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyColumnName modifyColumnName(Column oldColumn, String newName) {
+        return new ModifyColumnName(oldColumn, newName);
+    }
+
+    /**
+     * A table change to modify the column comment.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;original_column_type&gt; COMMENT '&lt;new_column_comment&gt;'
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param newComment the modified comment.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyColumnComment modifyColumnComment(Column oldColumn, String newComment) {
+        return new ModifyColumnComment(oldColumn, newComment);
+    }
+
+    /**
+     * A table change to modify the column position.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;original_column_type&gt; &lt;column_position&gt;
+     * </pre>
+     *
+     * @param oldColumn the definition of the old column.
+     * @param columnPosition the new position of the column.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyColumnPosition modifyColumnPosition(
+            Column oldColumn, ColumnPosition columnPosition) {
+        return new ModifyColumnPosition(oldColumn, columnPosition);
+    }
+
+    /**
+     * A table change to add a unique constraint.

Review Comment:
   found a typo here: add -> modify



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -698,7 +782,7 @@ private AlterSchemaStrategy computeAlterSchemaStrategy(SqlAlterTableSchema alter
                         alterTableSchema.getClass().getCanonicalName()));
     }
 
-    private static <T> T unwrap(Optional<T> value) {
+    private <T> T unwrap(Optional<T> value) {

Review Comment:
   Why change this?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -276,6 +409,359 @@ public String toString() {
         }
     }
 
+    // --------------------------------------------------------------------------------------------
+    // Modify Change
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * A base schema change to modify a column. The modification includes:
+     *
+     * <ul>
+     *   <li>change column data type
+     *   <li>reorder column position
+     *   <li>modify column comment
+     *   <li>change the computed expression
+     * </ul>
+     *
+     * <p>Some fine-grained column changes are defined in the {@link ModifyPhysicalColumnType},
+     * {@link ModifyColumnComment}, {@link ModifyColumnPosition} and {@link ModifyColumnName}.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_definition&gt; COMMENT '&lt;column_comment&gt;' &lt;column_position&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyColumn implements TableChange {
+
+        protected final Column oldColumn;
+        protected final Column newColumn;
+
+        protected final @Nullable ColumnPosition newPosition;
+
+        public ModifyColumn(
+                Column oldColumn, Column newColumn, @Nullable ColumnPosition newPosition) {
+            this.oldColumn = oldColumn;
+            this.newColumn = newColumn;
+            this.newPosition = newPosition;
+        }
+
+        /** Returns the original {@link Column} instance. */
+        public Column getOldColumn() {
+            return oldColumn;
+        }
+
+        /** Returns the modified {@link Column} instance. */
+        public Column getNewColumn() {
+            return newColumn;
+        }
+
+        /**
+         * Returns the position of the modified {@link Column} instance. When the return value is
+         * null, it means modify the column at the original position. When the return value is
+         * FIRST, it means move the modified column to the first. When the return value is AFTER, it
+         * means move the column after the referred column.
+         */
+        public @Nullable ColumnPosition getNewPosition() {
+            return newPosition;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof ModifyColumn)) {
+                return false;
+            }
+            ModifyColumn that = (ModifyColumn) o;
+            return Objects.equals(oldColumn, that.oldColumn)
+                    && Objects.equals(newColumn, that.newColumn)
+                    && Objects.equals(newPosition, that.newPosition);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(oldColumn, newColumn, newPosition);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyColumn{"
+                    + "oldColumn="
+                    + oldColumn
+                    + ", newColumn="
+                    + newColumn
+                    + ", newPosition="
+                    + newPosition
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change to modify the column comment.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;original_column_type&gt; COMMENT '&lt;new_column_comment&gt;'
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyColumnComment extends ModifyColumn {
+
+        private final String newComment;
+
+        private ModifyColumnComment(Column oldColumn, String newComment) {
+            super(oldColumn, oldColumn.withComment(newComment), null);
+            this.newComment = newComment;
+        }
+
+        /** Get the new comment for the column. */
+        public String getNewComment() {
+            return newComment;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return (o instanceof ModifyColumnComment) && super.equals(o);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyColumnComment{"
+                    + "Column="
+                    + oldColumn
+                    + ", newComment='"
+                    + newComment
+                    + '\''
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change to modify the column position.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;original_column_type&gt; &lt;column_position&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyColumnPosition extends ModifyColumn {
+
+        public ModifyColumnPosition(Column oldColumn, ColumnPosition newPosition) {
+            super(oldColumn, oldColumn, newPosition);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return (o instanceof ModifyColumnPosition) && super.equals(o);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyColumnPosition{"
+                    + "Column="
+                    + oldColumn
+                    + ", newPosition="
+                    + newPosition
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change that modify the physical column data type.

Review Comment:
   Do we need to support modifying the metadata column type as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] LadyForest commented on pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

Posted by GitBox <gi...@apache.org>.
LadyForest commented on PR #21577:
URL: https://github.com/apache/flink/pull/21577#issuecomment-1369359579

   > Thanks for the review!
   > 
   > > Since the MODIFY operation is split into a list of changes, I would like to know whether the order matters and how the external catalog copes with them. For example, does API expose a requirement that it should be an atomic operation, or ? does the external catalog decide it?
   > 
   > Yes, the order matters. The catalog developer needs to make sure the execution in order and the execution is atomic. The logic may looks like(the idea is from the Spark JdbcTablecatalog#alterTable)
   > 
   > ```
   > 
   > default void alterTable(
   >             ObjectPath tablePath,
   >             CatalogBaseTable newTable,
   >             List<TableChange> tableChanges,
   >             boolean ignoreIfNotExists)
   >             throws TableNotExistException, CatalogException {
   >     // make sure all table changes are supported.
   >     validate(tableChanges);
   >     // jdbc catalog can work like
   >     conn.setAutoCommit(false);
   >     Statement statement = conn.createStatement()
   >     try {
   >         statement.setQueryTimeout(options.queryTimeout);
   >         for (String sql = dialect.alterTable(tableName, changes, metaData.getDatabaseMajorVersion)) {
   >            statement.executeUpdate(sql)
   >         }
   >         conn.commit()
   >     } catch (Exception e) {
   >       if (conn != null) {
   >         conn.rollback();
   >       }
   >       throw e;
   >     } finally {
   >       statement.close();
   >       conn.setAutoCommit(true);
   >     }
   > }
   > ```
   > 
   > > The "oldColumn" and "originColumn" are interchangeably used across the changelog, and can we align them to improve the readability?
   > 
   > Thanks for pointing it out. I agree with you we should align the term in the codes. I am prone to use the term "oldColumn" instead because we already have used this in the FLIP-273. WDYT?
   > 
   > > SchemaConverter#updatePositionAndCollectColumnChange and OperationConverterUtils#buildColumnChange are very similar. Do we have a plan to do some refactor work?
   > 
   > Yes. I think we can refactor this after all parts finish. WDYT?
   
   Thanks for the detailed explanation. +1 for refactoring after all the subtasks finished


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21577:
URL: https://github.com/apache/flink/pull/21577#issuecomment-1367299309

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d5a2d2426b44a608d76e2c540145c511d53d94fb",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d5a2d2426b44a608d76e2c540145c511d53d94fb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d5a2d2426b44a608d76e2c540145c511d53d94fb UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #21577:
URL: https://github.com/apache/flink/pull/21577#issuecomment-1369692355

   The failed test is because of FLINK-30257. Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #21577: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #21577:
URL: https://github.com/apache/flink/pull/21577#discussion_r1060254282


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -276,6 +409,359 @@ public String toString() {
         }
     }
 
+    // --------------------------------------------------------------------------------------------
+    // Modify Change
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * A base schema change to modify a column. The modification includes:
+     *
+     * <ul>
+     *   <li>change column data type
+     *   <li>reorder column position
+     *   <li>modify column comment
+     *   <li>change the computed expression
+     * </ul>
+     *
+     * <p>Some fine-grained column changes are defined in the {@link ModifyPhysicalColumnType},
+     * {@link ModifyColumnComment}, {@link ModifyColumnPosition} and {@link ModifyColumnName}.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_definition&gt; COMMENT '&lt;column_comment&gt;' &lt;column_position&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyColumn implements TableChange {
+
+        protected final Column oldColumn;
+        protected final Column newColumn;
+
+        protected final @Nullable ColumnPosition newPosition;
+
+        public ModifyColumn(
+                Column oldColumn, Column newColumn, @Nullable ColumnPosition newPosition) {
+            this.oldColumn = oldColumn;
+            this.newColumn = newColumn;
+            this.newPosition = newPosition;
+        }
+
+        /** Returns the original {@link Column} instance. */
+        public Column getOldColumn() {
+            return oldColumn;
+        }
+
+        /** Returns the modified {@link Column} instance. */
+        public Column getNewColumn() {
+            return newColumn;
+        }
+
+        /**
+         * Returns the position of the modified {@link Column} instance. When the return value is
+         * null, it means modify the column at the original position. When the return value is
+         * FIRST, it means move the modified column to the first. When the return value is AFTER, it
+         * means move the column after the referred column.
+         */
+        public @Nullable ColumnPosition getNewPosition() {
+            return newPosition;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof ModifyColumn)) {
+                return false;
+            }
+            ModifyColumn that = (ModifyColumn) o;
+            return Objects.equals(oldColumn, that.oldColumn)
+                    && Objects.equals(newColumn, that.newColumn)
+                    && Objects.equals(newPosition, that.newPosition);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(oldColumn, newColumn, newPosition);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyColumn{"
+                    + "oldColumn="
+                    + oldColumn
+                    + ", newColumn="
+                    + newColumn
+                    + ", newPosition="
+                    + newPosition
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change to modify the column comment.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;original_column_type&gt; COMMENT '&lt;new_column_comment&gt;'
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyColumnComment extends ModifyColumn {
+
+        private final String newComment;
+
+        private ModifyColumnComment(Column oldColumn, String newComment) {
+            super(oldColumn, oldColumn.withComment(newComment), null);
+            this.newComment = newComment;
+        }
+
+        /** Get the new comment for the column. */
+        public String getNewComment() {
+            return newComment;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return (o instanceof ModifyColumnComment) && super.equals(o);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyColumnComment{"
+                    + "Column="
+                    + oldColumn
+                    + ", newComment='"
+                    + newComment
+                    + '\''
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change to modify the column position.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; MODIFY &lt;column_name&gt; &lt;original_column_type&gt; &lt;column_position&gt;
+     * </pre>
+     */
+    @PublicEvolving
+    class ModifyColumnPosition extends ModifyColumn {
+
+        public ModifyColumnPosition(Column oldColumn, ColumnPosition newPosition) {
+            super(oldColumn, oldColumn, newPosition);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return (o instanceof ModifyColumnPosition) && super.equals(o);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyColumnPosition{"
+                    + "Column="
+                    + oldColumn
+                    + ", newPosition="
+                    + newPosition
+                    + '}';
+        }
+    }
+
+    /**
+     * A table change that modify the physical column data type.

Review Comment:
   No. If users want to support modifying the metadata column type, we should introduce another class named `ModifyMetadataColumn`, which may also involve the change of the metadata key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org