You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zhangjun0x01 (via GitHub)" <gi...@apache.org> on 2023/03/15 01:46:02 UTC

[GitHub] [flink-table-store] zhangjun0x01 opened a new pull request, #603: [FLINK-31459]add UPDATE COLUMN POSITION for flink table store

zhangjun0x01 opened a new pull request, #603:
URL: https://github.com/apache/flink-table-store/pull/603

   add UPDATE COLUMN POSITION for flink table store


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 commented on a diff in pull request #603: [FLINK-31459]add UPDATE COLUMN POSITION for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on code in PR #603:
URL: https://github.com/apache/flink-table-store/pull/603#discussion_r1137247002


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java:
##########
@@ -310,6 +311,27 @@ public TableSchema commitChanges(List<SchemaChange> changes) throws Exception {
                                             field.name(),
                                             field.type(),
                                             update.newDescription()));
+                } else if (change instanceof UpdateColumnPosition) {
+                    UpdateColumnPosition update = (UpdateColumnPosition) change;
+                    SchemaChange.Move move = update.move();
+
+                    // key: name ; value : index
+                    Map<String, Integer> map = new HashMap<>();
+                    for (int i = 0; i < newFields.size(); i++) {
+                        map.put(newFields.get(i).name(), i);
+                    }
+
+                    int fieldIndex = map.get(move.fieldName());
+                    int refIndex = 0;
+                    if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
+                        checkMoveIndexEqual(move, fieldIndex, refIndex);
+                        newFields.add(refIndex, newFields.remove(fieldIndex));
+                    } else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
+                        refIndex = map.get(move.referenceFieldName());
+                        checkMoveIndexEqual(move, fieldIndex, refIndex);
+                        newFields.add(refIndex + 1, newFields.remove(fieldIndex));

Review Comment:
   yes,it is a bug ,I fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 commented on a diff in pull request #603: [FLINK-31459]add UPDATE COLUMN POSITION for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on code in PR #603:
URL: https://github.com/apache/flink-table-store/pull/603#discussion_r1137247002


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java:
##########
@@ -310,6 +311,27 @@ public TableSchema commitChanges(List<SchemaChange> changes) throws Exception {
                                             field.name(),
                                             field.type(),
                                             update.newDescription()));
+                } else if (change instanceof UpdateColumnPosition) {
+                    UpdateColumnPosition update = (UpdateColumnPosition) change;
+                    SchemaChange.Move move = update.move();
+
+                    // key: name ; value : index
+                    Map<String, Integer> map = new HashMap<>();
+                    for (int i = 0; i < newFields.size(); i++) {
+                        map.put(newFields.get(i).name(), i);
+                    }
+
+                    int fieldIndex = map.get(move.fieldName());
+                    int refIndex = 0;
+                    if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
+                        checkMoveIndexEqual(move, fieldIndex, refIndex);
+                        newFields.add(refIndex, newFields.remove(fieldIndex));
+                    } else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
+                        refIndex = map.get(move.referenceFieldName());
+                        checkMoveIndexEqual(move, fieldIndex, refIndex);
+                        newFields.add(refIndex + 1, newFields.remove(fieldIndex));

Review Comment:
   At first, I ignored the case that fieldIndex is smaller than refIndex, I fix it and add the UT



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi merged pull request #603: [FLINK-31459]add UPDATE COLUMN POSITION for flink table store

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi merged PR #603:
URL: https://github.com/apache/incubator-paimon/pull/603


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 commented on a diff in pull request #603: [FLINK-31459]add UPDATE COLUMN POSITION for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on code in PR #603:
URL: https://github.com/apache/flink-table-store/pull/603#discussion_r1137247002


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java:
##########
@@ -310,6 +311,27 @@ public TableSchema commitChanges(List<SchemaChange> changes) throws Exception {
                                             field.name(),
                                             field.type(),
                                             update.newDescription()));
+                } else if (change instanceof UpdateColumnPosition) {
+                    UpdateColumnPosition update = (UpdateColumnPosition) change;
+                    SchemaChange.Move move = update.move();
+
+                    // key: name ; value : index
+                    Map<String, Integer> map = new HashMap<>();
+                    for (int i = 0; i < newFields.size(); i++) {
+                        map.put(newFields.get(i).name(), i);
+                    }
+
+                    int fieldIndex = map.get(move.fieldName());
+                    int refIndex = 0;
+                    if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
+                        checkMoveIndexEqual(move, fieldIndex, refIndex);
+                        newFields.add(refIndex, newFields.remove(fieldIndex));
+                    } else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
+                        refIndex = map.get(move.referenceFieldName());
+                        checkMoveIndexEqual(move, fieldIndex, refIndex);
+                        newFields.add(refIndex + 1, newFields.remove(fieldIndex));

Review Comment:
   yes,it is a bug ,I fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #603: [FLINK-31459]add UPDATE COLUMN POSITION for flink table store

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #603:
URL: https://github.com/apache/flink-table-store/pull/603#discussion_r1136894659


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java:
##########
@@ -310,6 +311,27 @@ public TableSchema commitChanges(List<SchemaChange> changes) throws Exception {
                                             field.name(),
                                             field.type(),
                                             update.newDescription()));
+                } else if (change instanceof UpdateColumnPosition) {
+                    UpdateColumnPosition update = (UpdateColumnPosition) change;
+                    SchemaChange.Move move = update.move();
+
+                    // key: name ; value : index
+                    Map<String, Integer> map = new HashMap<>();
+                    for (int i = 0; i < newFields.size(); i++) {
+                        map.put(newFields.get(i).name(), i);
+                    }
+
+                    int fieldIndex = map.get(move.fieldName());
+                    int refIndex = 0;
+                    if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
+                        checkMoveIndexEqual(move, fieldIndex, refIndex);
+                        newFields.add(refIndex, newFields.remove(fieldIndex));
+                    } else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
+                        refIndex = map.get(move.referenceFieldName());
+                        checkMoveIndexEqual(move, fieldIndex, refIndex);
+                        newFields.add(refIndex + 1, newFields.remove(fieldIndex));

Review Comment:
   Maybe bug here?
   Total has 4 fields.
   fieldIndex = 1;
   refIndex = 2;
   f0, f1, f2, f3
   after remove: f0 f2 f3
   and add(3)? becoming f0 f2 f3 f1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org