You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/25 08:17:16 UTC

[GitHub] [flink-table-store] zjureel commented on a diff in pull request #396: [FLINK-27846] Schema evolution for reading data file

zjureel commented on code in PR #396:
URL: https://github.com/apache/flink-table-store/pull/396#discussion_r1032107447


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java:
##########
@@ -62,4 +72,193 @@ public static int[] createIndexMapping(
         }
         return null;
     }
+
+    /**
+     * Create index mapping from table projection to underlying data projection.
+     *
+     * @param tableProjection the table projection
+     * @param tableFields the fields in table
+     * @param dataProjection the underlying data projection
+     * @param dataFields the fields in underlying data
+     * @return the index mapping
+     */
+    @Nullable
+    public static int[] createIndexMapping(
+            int[] tableProjection,
+            List<DataField> tableFields,
+            int[] dataProjection,
+            List<DataField> dataFields) {
+        return createIndexMapping(
+                tableProjection,
+                tableProjection.length,
+                tableFields,
+                Collections.emptyList(),
+                dataProjection,
+                dataProjection.length,
+                dataFields,
+                Collections.emptyList());
+    }
+
+    /**
+     * Create index mapping from table projection to underlying data projection for key value.
+     *
+     * @param tableProjection the table projection
+     * @param tableKeyCount the key count in table
+     * @param tableKeyFields the key fields in table
+     * @param tableValueFields the value fields in table
+     * @param dataProjection the data projection
+     * @param dataKeyCount the data key count
+     * @param dataKeyFields the fields in underlying data
+     * @param dataValueFields the fields in underlying data
+     * @return the index mapping
+     */
+    @Nullable
+    public static int[] createIndexMapping(
+            int[] tableProjection,
+            int tableKeyCount,
+            List<DataField> tableKeyFields,
+            List<DataField> tableValueFields,
+            int[] dataProjection,
+            int dataKeyCount,
+            List<DataField> dataKeyFields,
+            List<DataField> dataValueFields) {
+        List<Integer> tableKeyFieldIdList =
+                tableKeyFields.stream().map(DataField::id).collect(Collectors.toList());
+        List<Integer> dataKeyFieldIdList =
+                dataKeyFields.stream().map(DataField::id).collect(Collectors.toList());
+        int[] indexMapping = new int[tableProjection.length];
+
+        int[] dataKeyProjection = Arrays.copyOf(dataProjection, dataKeyCount);
+        for (int i = 0; i < tableKeyCount; i++) {
+            int fieldId = tableKeyFieldIdList.get(tableProjection[i]);
+            int dataFieldIndex = dataKeyFieldIdList.indexOf(fieldId);
+            indexMapping[i] = Ints.indexOf(dataKeyProjection, dataFieldIndex);
+        }
+        if (tableProjection.length >= tableKeyCount + 2) {
+            // seq and value kind
+            for (int i = tableKeyCount; i < tableKeyCount + 2; i++) {
+                indexMapping[i] = i + dataKeyCount - tableKeyCount;
+            }
+
+            int[] dataValueProjection =
+                    Arrays.copyOfRange(dataProjection, dataKeyCount + 2, dataProjection.length);
+            for (int i = 0; i < dataValueProjection.length; i++) {
+                dataValueProjection[i] = dataValueProjection[i] - dataKeyFields.size() - 2;
+            }
+            List<Integer> tableValueFieldIdList =
+                    tableValueFields.stream().map(DataField::id).collect(Collectors.toList());
+            List<Integer> dataValueFieldIdList =
+                    dataValueFields.stream().map(DataField::id).collect(Collectors.toList());
+            for (int i = tableKeyCount + 2; i < tableProjection.length; i++) {
+                int fieldId =
+                        tableValueFieldIdList.get(tableProjection[i] - tableKeyFields.size() - 2);
+                int dataFieldIndex = dataValueFieldIdList.indexOf(fieldId);
+                int dataValueIndex = Ints.indexOf(dataValueProjection, dataFieldIndex);
+                indexMapping[i] =
+                        dataValueIndex < 0 ? dataValueIndex : dataValueIndex + dataKeyCount + 2;
+            }
+        }
+
+        for (int i = 0; i < indexMapping.length; i++) {
+            if (indexMapping[i] != i) {
+                return indexMapping;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Create data projection from table projection.
+     *
+     * @param tableFields the fields of table
+     * @param dataFields the fields of underlying data
+     * @param tableProjection the projection of table
+     * @return the projection of data
+     */
+    public static int[][] createDataProjection(
+            List<DataField> tableFields, List<DataField> dataFields, int[][] tableProjection) {
+        List<Integer> tableFieldIdList =

Review Comment:
   @JingsongLi Table Store doesn't support nest projection now? If it does, maybe we don't need to check nest projection here, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org