You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/29 08:07:48 UTC

[GitHub] [flink] LadyForest commented on a diff in pull request #21566: [FLINK-30495][table-api] Introduce TableChange to represent ADD change

LadyForest commented on code in PR #21566:
URL: https://github.com/apache/flink/pull/21566#discussion_r1058792233


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -89,8 +93,15 @@ public static Operation convertAddReplaceColumns(
             }
             setWatermarkAndPK(builder, catalogTable.getSchema());
         }
+        List<TableChange> tableChanges = new ArrayList<>();
         for (SqlNode sqlNode : addReplaceColumns.getNewColumns()) {
-            builder.add(toTableColumn((SqlTableColumn) sqlNode, sqlValidator));
+            TableColumn tableColumn = toTableColumn((SqlTableColumn) sqlNode, sqlValidator);
+            builder.add(tableColumn);
+            if (!addReplaceColumns.isReplace()) {
+                tableChanges.add(

Review Comment:
   Try best to preserve the comment?
   ```suggestion
                   tableChanges.add(
                           TableChange.add(
                                   Column.physical(tableColumn.getName(), tableColumn.getType())
                                           .withComment(
                                                   ((SqlTableColumn) sqlNode)
                                                           .getComment()
                                                           .map(SqlCharStringLiteral.class::cast)
                                                           .map(c -> c.getValueAs(String.class))
                                                           .orElse(null))));
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -161,4 +377,70 @@ public String toString() {
             return "ResetOption{" + "key='" + key + '\'' + '}';
         }
     }
+
+    // --------------------------------------------------------------------------------------------
+
+    /** The position of the modified or added column. */
+    @PublicEvolving
+    interface ColumnPosition {
+
+        /** Get the position to place the column at the first. */
+        static ColumnPosition first() {
+            return First.INSTANCE;
+        }
+
+        /** Get the position to place the column after the specified column. */
+        static ColumnPosition after(String column) {
+            return new After(column);
+        }
+    }
+
+    /** Column position FIRST means the specified column should be the first column. */
+    @PublicEvolving
+    final class First implements ColumnPosition {
+        private static final First INSTANCE = new First();
+
+        private First() {}
+
+        @Override
+        public String toString() {
+            return "FIRST";
+        }
+    }
+
+    /** Column position AFTER means the specified column should be put after the given `column`. */
+    @PublicEvolving
+    final class After implements ColumnPosition {
+        private final String column;
+
+        private After(String column) {
+            this.column = column;
+        }
+
+        public String column() {
+            return column;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof After)) {
+                return false;
+            }
+            After after = (After) o;
+            return Objects.equals(column, after.column);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(column);
+        }
+
+        @Override
+        public String toString() {
+            return "AFTER " + column;

Review Comment:
   Nit
   ```suggestion
               return String.format("AFTER `%s`", column);
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -106,13 +117,17 @@ public static Operation convertAddReplaceColumns(
         Map<String, String> newProperties = new HashMap<>(catalogTable.getOptions());
         newProperties.putAll(extractProperties(addReplaceColumns.getProperties()));
 
-        return new AlterTableSchemaOperation(
-                tableIdentifier,
+        CatalogTableImpl newTable =
                 new CatalogTableImpl(
                         builder.build(),
                         catalogTable.getPartitionKeys(),
                         newProperties,
-                        catalogTable.getComment()));
+                        catalogTable.getComment());
+        if (addReplaceColumns.isReplace()) {

Review Comment:
   Could you explain more about why differentiate add/replace operations?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -270,7 +283,7 @@ private void updateColumn(List<SqlNode> alterColumnPositions) {
         }
 
         private void updatePrimaryKey(SqlTableConstraint alterPrimaryKey) {
-            checkPrimaryKeyExists();
+            checkAndGeneratePrimaryKeyChange();

Review Comment:
   Nit: `checkAndCollectPrimaryKeyChange`?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -301,7 +314,7 @@ private void updatePrimaryKeyNullability(String columnName) {
         }
 
         private void updateWatermark(SqlWatermark alterWatermarkSpec) {
-            checkWatermarkExists();
+            checkAndGenerateWatermarkChange();

Review Comment:
   Nit: `checkAndCollectWatermarkChange`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org