You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/01/27 05:52:13 UTC

[GitHub] [iceberg] hililiwei opened a new pull request #3991: Flink: Support nested projection

hililiwei opened a new pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991


   Supports nested projections in flink.
   Add a transform action in RowDataFileScanTaskReade, flatten the nested data obtained from the file based on the field index paths.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817649340



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -179,4 +171,20 @@ public DynamicTableSource copy() {
   public String asSummaryString() {
     return "Iceberg table source";
   }
+
+  private static TableSchema projectSchema(TableSchema tableSchema, int[] projectedFields) {

Review comment:
       That's why I've added `topProjectedFields` and `projectedFields`
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r821685070



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
##########
@@ -83,6 +95,12 @@ public RowDataFileScanTaskReader(Schema tableSchema, Schema projectedSchema,
       iterable = CloseableIterable.transform(iterable, rowDataProjection::wrap);
     }
 
+    if (projectedFields != null) {
+      RowDataNestProjection rowDataProjection = RowDataNestProjection.create(tableSchema, projectedSchema,
+          projectedFields);
+      iterable = CloseableIterable.transform(iterable, rowDataProjection::wrap);
+    }

Review comment:
       Line95 is used to remove the extra column fields from the deleted file, but does not extract the nested fields.
   
   
   ![image](https://user-images.githubusercontent.com/59213263/157242838-69ff434c-b6ea-4b6c-b982-658bb98ed514.png)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r793275020



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Projects a (possibly nested) row data type, converts the original field obtained from the file to the flink output.
+ *
+ * <p>For example, original data with fields: {@code [id:Int,st:Struct[a:String,b:String] ...]}, Output
+ * projection Field: {@code [id,st.a]} with path: [[0],[1,0]], For the {@code st}(nested type) field, {@code a} is
+ * projected by taking it out of the nested body as an independent field.
+ */
+public class RowDataNestProjection implements RowData {
+  private final FieldGetter[] getters;
+  private final boolean nestFlat;
+  private final int[][] projectedFields;
+  private RowData rowData;
+
+  private RowDataNestProjection(RowType rowType,
+                                Types.StructType schema,
+                                Types.StructType rowStruct,
+                                Types.StructType projectType,
+                                int[][] projectedFields,
+                                boolean nestFlat) {
+    this.projectedFields = projectedFields;
+    this.nestFlat = nestFlat;
+
+    this.getters = new FieldGetter[projectedFields.length];
+    for (int i = 0; i < getters.length; i++) {
+      int[] projectedFieldOne = projectedFields[i];
+      int fieldIndex = projectedFieldOne[0];
+
+      Types.NestedField projectField = rowStruct.fields().get(fieldIndex);
+      Types.NestedField rowField = schema.field(projectField.fieldId());
+
+      getters[i] = createFieldGetter(rowType, rowField, fieldIndex, projectType, projectField, i, projectedFields,
+          projectedFieldOne);
+    }
+  }
+
+  private RowDataNestProjection(RowType rowType,
+                                Types.StructType schema,
+                                Types.StructType projectType,
+                                int[][] projectedFields,
+                                boolean nestFlat) {
+    this.projectedFields = projectedFields;
+    this.nestFlat = nestFlat;
+
+    this.getters = new FieldGetter[projectType.fields().size()];
+    for (int i = 0; i < getters.length; i++) {
+      int[] projectedFieldOne = projectedFields[0];
+      int fieldIndex = projectedFieldOne[0];
+      if (i == fieldIndex) {
+        Types.NestedField projectField = projectType.fields().get(i);
+        Types.NestedField rowField = schema.field(projectField.fieldId());
+
+        getters[i] = createFieldGetter(rowType, rowField, i, projectType, projectField, i, projectedFields,
+            projectedFieldOne);
+      }
+    }
+  }
+
+  private static FieldGetter createFieldGetter(RowType rowType,
+                                               Types.NestedField rowField,
+                                               int rowTypePosition,
+                                               Types.StructType projectFieldsType,
+                                               Types.NestedField projectField,
+                                               int projectFieldPosition,
+                                               int[][] projectedFields,
+                                               int[] projectedFieldOne) {
+    switch (projectField.type().typeId()) {
+      case STRUCT:
+        if (projectedFields == null || projectedFields[projectFieldPosition].length <= 1) {
+          return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition),
+              projectFieldsType.fields().indexOf(projectField));
+        }
+        RowType nestedRowType = (RowType) rowType.getTypeAt(rowTypePosition);
+        return row -> {
+          int[] target = new int[projectedFieldOne.length - 1];
+          System.arraycopy(projectedFieldOne, 1, target, 0, target.length);
+          int[][] temp = {target};
+          int rowIndex = projectFieldsType.fields().indexOf(projectField);
+          RowData nestedRow = rowIndex < 0 ? null : row.getRow(rowIndex,  nestedRowType.getFieldCount());
+
+          return RowDataNestProjection
+              .create(nestedRowType, rowField.type().asStructType(), projectField.type().asStructType(), temp, true)
+              .wrap(nestedRow);
+        };
+      case MAP:
+        Types.MapType projectedMap = projectField.type().asMapType();
+        Types.MapType originalMap = rowField.type().asMapType();
+
+        boolean keyProjectable = !projectedMap.keyType().isNestedType() ||
+            projectedMap.keyType().equals(originalMap.keyType());
+        boolean valueProjectable = !projectedMap.valueType().isNestedType() ||
+            projectedMap.valueType().equals(originalMap.valueType());
+        Preconditions.checkArgument(keyProjectable && valueProjectable,
+            "Cannot project a partial map key or value with non-primitive type. Trying to project <%s> out of <%s>",
+            projectField, rowField);
+
+        return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition),
+            projectFieldsType.fields().indexOf(projectField));
+
+      case LIST:
+        Types.ListType projectedList = projectField.type().asListType();
+        Types.ListType originalList = rowField.type().asListType();
+
+        boolean elementProjectable = !projectedList.elementType().isNestedType() ||
+            projectedList.elementType().equals(originalList.elementType());
+        Preconditions.checkArgument(elementProjectable,
+            "Cannot project a partial list element with non-primitive type. Trying to project <%s> out of <%s>",
+            projectField, rowField);
+
+        return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition),
+            projectFieldsType.fields().indexOf(projectField));
+      default:
+        return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition),
+            projectFieldsType.fields().indexOf(projectField));
+    }
+  }
+
+  public static RowDataNestProjection create(Schema schema, Schema projectedSchema, int[][] projectedFields) {
+    return RowDataNestProjection.create(FlinkSchemaUtil.convert(schema), schema.asStruct(), schema.asStruct(),
+        projectedSchema.asStruct(),
+        projectedFields, true);
+  }
+
+  public static RowDataNestProjection create(RowType rowType,
+                                             Types.StructType schema,
+                                             Types.StructType rowStructType,
+                                             Types.StructType projectedSchema,
+                                             int[][] projectedFields,
+                                             boolean nestFlat) {
+    return new RowDataNestProjection(rowType, schema, rowStructType, projectedSchema, projectedFields, nestFlat);
+  }
+
+  public static RowDataNestProjection create(RowType rowType, Types.StructType schema, Types.StructType projectedSchema,
+                                             int[][] projectedFields, boolean nestFlat) {
+    return new RowDataNestProjection(rowType, schema, projectedSchema, projectedFields, nestFlat);
+  }
+
+  public RowData wrap(RowData row) {
+    this.rowData = row;
+    return this;
+  }
+
+  private Object getValue(int pos) {
+    Object fieldValue = getters[pos].getFieldOrNull(rowData);
+    while (nestFlat && fieldValue != null && fieldValue.getClass().equals(RowDataNestProjection.class)) {
+      RowDataNestProjection rowDataNest = (RowDataNestProjection) fieldValue;
+      fieldValue = rowDataNest.getters[rowDataNest.projectedFields[0][0]].getFieldOrNull(rowDataNest);
+    }
+    return fieldValue;

Review comment:
       Here, the nested fields are drilled down to extract the fields that need to be obtained.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817645081



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -49,7 +51,8 @@
 public class IcebergTableSource
     implements ScanTableSource, SupportsProjectionPushDown, SupportsFilterPushDown, SupportsLimitPushDown {
 
-  private int[] projectedFields;
+  private int[][] projectedFields;
+  private int[] topProjectedFields;

Review comment:
       `topProjectedFields` is the index of the root level projection field. For example, if the projection field is `[[5, 1],[5, 2],[3, 1]]`, the top projection field will be `[5, 3]`.

##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -179,4 +171,20 @@ public DynamicTableSource copy() {
   public String asSummaryString() {
     return "Iceberg table source";
   }
+
+  private static TableSchema projectSchema(TableSchema tableSchema, int[] projectedFields) {
+    Preconditions.checkArgument(
+        org.apache.flink.table.utils.TableSchemaUtils.containsPhysicalColumnsOnly(tableSchema),

Review comment:
       nice. I put that in FlinkCompatibilityUtil.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817299454



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Projects a (possibly nested) row data type, converts the original field obtained from the file to the flink output.
+ *
+ * <p>For example, original data with fields: {@code [id:Int,st:Struct[a:String,b:String] ...]}, Output
+ * projection Field: {@code [id,st.a]} with path: [[0],[1,0]], For the {@code st}(nested type) field, {@code a} is
+ * projected by taking it out of the nested body as an independent field.
+ */
+public class RowDataNestProjection implements RowData {
+  private final FieldGetter[] getters;
+  private final boolean nestFlat;
+  private final int[][] projectedFields;
+  private RowData rowData;
+
+  private RowDataNestProjection(RowType rowType,
+                                Types.StructType schema,
+                                Types.StructType rowStruct,
+                                Types.StructType projectType,
+                                int[][] projectedFields,
+                                boolean nestFlat) {

Review comment:
       Why we need this flag ?  Shouldn't this always be `true` ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r825230845



##########
File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSqlNestProjection.java
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.Test;
+
+/**
+ * Test Flink SQL Nest Projection.
+ */
+public class TestFlinkSqlNestProjection extends TestFlinkSource {

Review comment:
       Refined the unit test cases and listed them as a separate class. It mainly includes the SQL nested projection of each data type. Of course, the focus is still on the processing of the Struct type.
   @openinx 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817647092



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Projects a (possibly nested) row data type, converts the original field obtained from the file to the flink output.
+ *
+ * <p>For example, original data with fields: {@code [id:Int,st:Struct[a:String,b:String] ...]}, Output
+ * projection Field: {@code [id,st.a]} with path: [[0],[1,0]], For the {@code st}(nested type) field, {@code a} is
+ * projected by taking it out of the nested body as an independent field.
+ */
+public class RowDataNestProjection implements RowData {
+  private final FieldGetter[] getters;
+  private final boolean nestFlat;
+  private final int[][] projectedFields;
+  private RowData rowData;
+
+  private RowDataNestProjection(RowType rowType,
+                                Types.StructType schema,
+                                Types.StructType rowStruct,
+                                Types.StructType projectType,
+                                int[][] projectedFields,
+                                boolean nestFlat) {
+    this.projectedFields = projectedFields;
+    this.nestFlat = nestFlat;
+
+    this.getters = new FieldGetter[projectedFields.length];
+    for (int i = 0; i < getters.length; i++) {
+      int[] projectedFieldOne = projectedFields[i];
+      int fieldIndex = projectedFieldOne[0];
+
+      Types.NestedField projectField = rowStruct.fields().get(fieldIndex);
+      Types.NestedField rowField = schema.field(projectField.fieldId());
+
+      getters[i] = createFieldGetter(rowType, rowField, fieldIndex, projectType, projectField, i, projectedFields,
+          projectedFieldOne);
+    }
+  }
+
+  private RowDataNestProjection(RowType rowType,
+                                Types.StructType schema,
+                                Types.StructType projectType,
+                                int[][] projectedFields,
+                                boolean nestFlat) {
+    this.projectedFields = projectedFields;
+    this.nestFlat = nestFlat;
+
+    this.getters = new FieldGetter[projectType.fields().size()];
+    for (int i = 0; i < getters.length; i++) {
+      int[] projectedFieldOne = projectedFields[0];
+      int fieldIndex = projectedFieldOne[0];
+      if (i == fieldIndex) {
+        Types.NestedField projectField = projectType.fields().get(i);
+        Types.NestedField rowField = schema.field(projectField.fieldId());
+
+        getters[i] = createFieldGetter(rowType, rowField, i, projectType, projectField, i, projectedFields,
+            projectedFieldOne);
+      }
+    }
+  }
+
+  private static FieldGetter createFieldGetter(RowType rowType,
+                                               Types.NestedField rowField,
+                                               int rowTypePosition,
+                                               Types.StructType projectFieldsType,
+                                               Types.NestedField projectField,
+                                               int projectFieldPosition,
+                                               int[][] projectedFields,
+                                               int[] projectedFieldOne) {
+    switch (projectField.type().typeId()) {
+      case STRUCT:
+        if (projectedFields == null || projectedFields[projectFieldPosition].length <= 1) {
+          return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition),
+              projectFieldsType.fields().indexOf(projectField));
+        }
+        RowType nestedRowType = (RowType) rowType.getTypeAt(rowTypePosition);
+        return row -> {
+          int[] target = new int[projectedFieldOne.length - 1];
+          System.arraycopy(projectedFieldOne, 1, target, 0, target.length);
+          int[][] temp = {target};
+          int rowIndex = projectFieldsType.fields().indexOf(projectField);
+          RowData nestedRow = rowIndex < 0 ? null : row.getRow(rowIndex,  nestedRowType.getFieldCount());
+
+          return RowDataNestProjection
+              .create(nestedRowType, rowField.type().asStructType(), projectField.type().asStructType(), temp, true)

Review comment:
       `projectField` is not static, it is pruned in:
   https://github.com/apache/iceberg/blob/cf6545e0d997775e3d9b7701eafa683b1cd5876b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java#L71-L91
   The projectField drills down based on the field index.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r821460096



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java
##########
@@ -39,7 +41,9 @@ private FlinkCompatibilityUtil() {
     return InternalTypeInfo.of(rowType);
   }
 
-  public static boolean isPhysicalColumn(TableColumn column) {
-    return column.isPhysical();
+  /** Returns true if there are only physical columns in the given {@link TableSchema}. */

Review comment:
       Nit: Please format the Javadoc like : 
   
   ```
     /**
      * Returns true if there are only physical columns in the given {@link TableSchema}.
      */
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r839156022



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
##########
@@ -55,11 +56,18 @@
   private final Schema projectedSchema;
   private final String nameMapping;
   private final boolean caseSensitive;
+  private int[][] projectedFields;

Review comment:
       We have already had a `projectedSchema`,  why do we still need a `projectedFields` here ?  Shouldn't we just convert the `projectedFields` into a `projectedSchema`, and then use the projected schema to go though the whole read path ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r821497630



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Projects a (possibly nested) row data type, converts the original field obtained from the file to the flink output.
+ *
+ * <p>For example, original data with fields: {@code [id:Int,st:Struct[a:String,b:String] ...]}, Output
+ * projection Field: {@code [id,st.a]} with path: [[0],[1,0]], For the {@code st}(nested type) field, {@code a} is
+ * projected by taking it out of the nested body as an independent field.
+ */
+public class RowDataNestProjection implements RowData {

Review comment:
       Could we unify this class with `RowDataProjection` into a single class ?  Using two different projection class to handle the nested case and non-nested projection does not make sense to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#issuecomment-1081545116


   Hi @openinx, according to the feedback from our team, I have updated a new version. When you are free, would you please help to have a look? thank you. 😃 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r821459191



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -469,11 +469,9 @@ private static void validateFlinkTable(CatalogBaseTable table) {
     Preconditions.checkArgument(table instanceof CatalogTable, "The Table should be a CatalogTable.");
 
     TableSchema schema = table.getSchema();
-    schema.getTableColumns().forEach(column -> {
-      if (!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
-        throw new UnsupportedOperationException("Creating table with computed columns is not supported yet.");
-      }
-    });
+    if (!FlinkCompatibilityUtil.containsPhysicalColumnsOnly(schema)) {

Review comment:
       How about naming it as `allPhysicalColumns` ?  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817302457



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Projects a (possibly nested) row data type, converts the original field obtained from the file to the flink output.
+ *
+ * <p>For example, original data with fields: {@code [id:Int,st:Struct[a:String,b:String] ...]}, Output
+ * projection Field: {@code [id,st.a]} with path: [[0],[1,0]], For the {@code st}(nested type) field, {@code a} is
+ * projected by taking it out of the nested body as an independent field.
+ */
+public class RowDataNestProjection implements RowData {
+  private final FieldGetter[] getters;
+  private final boolean nestFlat;
+  private final int[][] projectedFields;
+  private RowData rowData;
+
+  private RowDataNestProjection(RowType rowType,
+                                Types.StructType schema,
+                                Types.StructType rowStruct,
+                                Types.StructType projectType,
+                                int[][] projectedFields,
+                                boolean nestFlat) {
+    this.projectedFields = projectedFields;
+    this.nestFlat = nestFlat;
+
+    this.getters = new FieldGetter[projectedFields.length];
+    for (int i = 0; i < getters.length; i++) {
+      int[] projectedFieldOne = projectedFields[i];
+      int fieldIndex = projectedFieldOne[0];
+
+      Types.NestedField projectField = rowStruct.fields().get(fieldIndex);
+      Types.NestedField rowField = schema.field(projectField.fieldId());
+
+      getters[i] = createFieldGetter(rowType, rowField, fieldIndex, projectType, projectField, i, projectedFields,
+          projectedFieldOne);
+    }
+  }
+
+  private RowDataNestProjection(RowType rowType,
+                                Types.StructType schema,
+                                Types.StructType projectType,
+                                int[][] projectedFields,
+                                boolean nestFlat) {
+    this.projectedFields = projectedFields;
+    this.nestFlat = nestFlat;
+
+    this.getters = new FieldGetter[projectType.fields().size()];
+    for (int i = 0; i < getters.length; i++) {
+      int[] projectedFieldOne = projectedFields[0];
+      int fieldIndex = projectedFieldOne[0];
+      if (i == fieldIndex) {
+        Types.NestedField projectField = projectType.fields().get(i);
+        Types.NestedField rowField = schema.field(projectField.fieldId());
+
+        getters[i] = createFieldGetter(rowType, rowField, i, projectType, projectField, i, projectedFields,
+            projectedFieldOne);
+      }
+    }
+  }
+
+  private static FieldGetter createFieldGetter(RowType rowType,
+                                               Types.NestedField rowField,
+                                               int rowTypePosition,
+                                               Types.StructType projectFieldsType,
+                                               Types.NestedField projectField,
+                                               int projectFieldPosition,
+                                               int[][] projectedFields,
+                                               int[] projectedFieldOne) {
+    switch (projectField.type().typeId()) {
+      case STRUCT:
+        if (projectedFields == null || projectedFields[projectFieldPosition].length <= 1) {
+          return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition),
+              projectFieldsType.fields().indexOf(projectField));
+        }
+        RowType nestedRowType = (RowType) rowType.getTypeAt(rowTypePosition);
+        return row -> {
+          int[] target = new int[projectedFieldOne.length - 1];
+          System.arraycopy(projectedFieldOne, 1, target, 0, target.length);
+          int[][] temp = {target};
+          int rowIndex = projectFieldsType.fields().indexOf(projectField);
+          RowData nestedRow = rowIndex < 0 ? null : row.getRow(rowIndex,  nestedRowType.getFieldCount());
+
+          return RowDataNestProjection
+              .create(nestedRowType, rowField.type().asStructType(), projectField.type().asStructType(), temp, true)

Review comment:
       Should we also prune the `projectField`  to match the `nestedRowType` ?  I see the `projectField` is always the original one,  is there any thing wrong ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817301086



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Projects a (possibly nested) row data type, converts the original field obtained from the file to the flink output.
+ *
+ * <p>For example, original data with fields: {@code [id:Int,st:Struct[a:String,b:String] ...]}, Output
+ * projection Field: {@code [id,st.a]} with path: [[0],[1,0]], For the {@code st}(nested type) field, {@code a} is
+ * projected by taking it out of the nested body as an independent field.
+ */
+public class RowDataNestProjection implements RowData {
+  private final FieldGetter[] getters;
+  private final boolean nestFlat;
+  private final int[][] projectedFields;
+  private RowData rowData;
+
+  private RowDataNestProjection(RowType rowType,
+                                Types.StructType schema,
+                                Types.StructType rowStruct,
+                                Types.StructType projectType,
+                                int[][] projectedFields,
+                                boolean nestFlat) {
+    this.projectedFields = projectedFields;
+    this.nestFlat = nestFlat;
+
+    this.getters = new FieldGetter[projectedFields.length];
+    for (int i = 0; i < getters.length; i++) {
+      int[] projectedFieldOne = projectedFields[i];
+      int fieldIndex = projectedFieldOne[0];
+
+      Types.NestedField projectField = rowStruct.fields().get(fieldIndex);
+      Types.NestedField rowField = schema.field(projectField.fieldId());
+
+      getters[i] = createFieldGetter(rowType, rowField, fieldIndex, projectType, projectField, i, projectedFields,
+          projectedFieldOne);
+    }
+  }
+
+  private RowDataNestProjection(RowType rowType,
+                                Types.StructType schema,
+                                Types.StructType projectType,
+                                int[][] projectedFields,
+                                boolean nestFlat) {
+    this.projectedFields = projectedFields;
+    this.nestFlat = nestFlat;
+
+    this.getters = new FieldGetter[projectType.fields().size()];
+    for (int i = 0; i < getters.length; i++) {
+      int[] projectedFieldOne = projectedFields[0];
+      int fieldIndex = projectedFieldOne[0];
+      if (i == fieldIndex) {
+        Types.NestedField projectField = projectType.fields().get(i);
+        Types.NestedField rowField = schema.field(projectField.fieldId());
+
+        getters[i] = createFieldGetter(rowType, rowField, i, projectType, projectField, i, projectedFields,
+            projectedFieldOne);
+      }
+    }
+  }
+
+  private static FieldGetter createFieldGetter(RowType rowType,
+                                               Types.NestedField rowField,
+                                               int rowTypePosition,
+                                               Types.StructType projectFieldsType,
+                                               Types.NestedField projectField,
+                                               int projectFieldPosition,
+                                               int[][] projectedFields,
+                                               int[] projectedFieldOne) {
+    switch (projectField.type().typeId()) {
+      case STRUCT:
+        if (projectedFields == null || projectedFields[projectFieldPosition].length <= 1) {
+          return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition),
+              projectFieldsType.fields().indexOf(projectField));
+        }
+        RowType nestedRowType = (RowType) rowType.getTypeAt(rowTypePosition);
+        return row -> {
+          int[] target = new int[projectedFieldOne.length - 1];
+          System.arraycopy(projectedFieldOne, 1, target, 0, target.length);

Review comment:
       If we want to access the nested field from the original row, when we need to copy the project field index each time ?  That does not sounds acceptable for me,   could we choose another approach to track the nested project field index without cloning the index array ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r822522657



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Projects a (possibly nested) row data type, converts the original field obtained from the file to the flink output.
+ *
+ * <p>For example, original data with fields: {@code [id:Int,st:Struct[a:String,b:String] ...]}, Output
+ * projection Field: {@code [id,st.a]} with path: [[0],[1,0]], For the {@code st}(nested type) field, {@code a} is
+ * projected by taking it out of the nested body as an independent field.
+ */
+public class RowDataNestProjection implements RowData {

Review comment:
       Initial unification has been done, and I'm going to continue to try deep integration.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei edited a comment on pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei edited a comment on pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#issuecomment-1081545116


   Hi @openinx, according to the feedback from our team's internal trial, I have updated a new version. When you are free, would you please help to have a look? thank you. 😃 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817295029



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -179,4 +171,20 @@ public DynamicTableSource copy() {
   public String asSummaryString() {
     return "Iceberg table source";
   }
+
+  private static TableSchema projectSchema(TableSchema tableSchema, int[] projectedFields) {

Review comment:
       In this method, we only project the first level fields in the nested schema. I mean in theory we should project other deeper level fields for implementing the nested schema projection, right ? 
   
   In my view,  if we treat the  `tableSchema` as a tree data structure, then the nested projection schema should be a sub tree which is pruned from the whole `tableSchema` tree, and the sub tree should have the same root as the original tree. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817289152



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -49,7 +51,8 @@
 public class IcebergTableSource
     implements ScanTableSource, SupportsProjectionPushDown, SupportsFilterPushDown, SupportsLimitPushDown {
 
-  private int[] projectedFields;
+  private int[][] projectedFields;
+  private int[] topProjectedFields;

Review comment:
       What is this `topProjectedFields` variable meaning ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817292254



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -179,4 +171,20 @@ public DynamicTableSource copy() {
   public String asSummaryString() {
     return "Iceberg table source";
   }
+
+  private static TableSchema projectSchema(TableSchema tableSchema, int[] projectedFields) {
+    Preconditions.checkArgument(
+        org.apache.flink.table.utils.TableSchemaUtils.containsPhysicalColumnsOnly(tableSchema),

Review comment:
       I see the `org.apache.flink.table.utils.TableSchemaUtils`  has been marked as `Internal` annotation. I will suggest to add a utility method in our own iceberg utility class for avoiding introduce any flink private API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r793274140



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -90,12 +94,8 @@ private IcebergTableSource(TableLoader loader, TableSchema schema, Map<String, S
 
   @Override
   public void applyProjection(int[][] projectFields) {
-    this.projectedFields = new int[projectFields.length];
-    for (int i = 0; i < projectFields.length; i++) {
-      Preconditions.checkArgument(projectFields[i].length == 1,
-          "Don't support nested projection in iceberg source now.");
-      this.projectedFields[i] = projectFields[i][0];
-    }
+    this.projectedFields = projectFields;
+    this.topProjectedFields = Arrays.stream(projectFields).mapToInt(array -> array[0]).distinct().toArray();

Review comment:
       Use topProjectedFields to extract data from the original file. In this case, the nested fields in the data remain the same.
   Use projectedFields to flatten the original data. In this case, nested fields are extracted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r839152573



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
##########
@@ -63,16 +109,48 @@ public static RowDataProjection create(RowType rowType, Types.StructType schema,
     return new RowDataProjection(rowType, schema, projectedSchema);
   }
 
-  private final RowData.FieldGetter[] getters;
-  private RowData rowData;
+  private RowDataProjection(RowType rowType, Types.StructType rowStruct, Types.StructType projectType,

Review comment:
       I see the previous `RowDataProjection` has already supported accessing the values of nested projected schema.   I think the only thing that we need to do is:  converting the `int[][] projectedFieldIndexes` into a projected iceberg `Schema`,  then we can just call the `RowDataProjection#create(RowType rowType, Types.StructType schema, Types.StructType projectedSchema)` to access those projected values

##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -104,22 +105,15 @@ public void applyProjection(int[][] projectFields) {
         .tableLoader(loader)
         .properties(properties)
         .project(getProjectedSchema())
+        .projectedFields(projectedFields)
         .limit(limit)
         .filters(filters)
         .flinkConf(readableConfig)
         .build();
   }
 
   private TableSchema getProjectedSchema() {
-    if (projectedFields == null) {
-      return schema;
-    } else {
-      String[] fullNames = schema.getFieldNames();
-      DataType[] fullTypes = schema.getFieldDataTypes();
-      return TableSchema.builder().fields(
-          Arrays.stream(projectedFields).mapToObj(i -> fullNames[i]).toArray(String[]::new),
-          Arrays.stream(projectedFields).mapToObj(i -> fullTypes[i]).toArray(DataType[]::new)).build();
-    }
+    return projectedFields == null ? schema : projectSchema(schema, topProjectedFields);

Review comment:
       I still don't understand why the `projected schema` is the projected top-level schema, rather than the projected nested schema.   Let's say the original schema is a big tree, and all fields are nodes in this tree, then the `projected schema` should be a subtree from the original big tree. 

##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
##########
@@ -84,9 +162,51 @@ private RowDataProjection(RowType rowType, Types.StructType rowStruct, Types.Str
     }
   }
 
-  private static RowData.FieldGetter createFieldGetter(RowType rowType,
-                                                       int position,
-                                                       Types.NestedField rowField,
+  private static FieldGetter createFieldGetter(RowType rowType,

Review comment:
       Let's just revert those changes according to the above comments ? 

##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -104,22 +105,15 @@ public void applyProjection(int[][] projectFields) {
         .tableLoader(loader)
         .properties(properties)
         .project(getProjectedSchema())
+        .projectedFields(projectedFields)
         .limit(limit)
         .filters(filters)
         .flinkConf(readableConfig)
         .build();
   }
 
   private TableSchema getProjectedSchema() {
-    if (projectedFields == null) {
-      return schema;
-    } else {
-      String[] fullNames = schema.getFieldNames();
-      DataType[] fullTypes = schema.getFieldDataTypes();
-      return TableSchema.builder().fields(
-          Arrays.stream(projectedFields).mapToObj(i -> fullNames[i]).toArray(String[]::new),
-          Arrays.stream(projectedFields).mapToObj(i -> fullTypes[i]).toArray(DataType[]::new)).build();
-    }
+    return projectedFields == null ? schema : projectSchema(schema, topProjectedFields);

Review comment:
       So why do we must use the top-level projected schema ?   Will it introduce extra unnecessary fields when do the real row data projection ? 
   
   For example, we have an original schema: 
   
   ```
   id: bigint
   data: struct {
       f0: bigint,
       f1: bigint
   }
   ```
   
   The required projecting fields are : `<id, data.f0>`.   Here we will get a `<id, data>` as the projected schema,  but in fact we should get a `<id, data.f0>` as the projected schema. 
   
   If we encounter a  `RowData`,  will we also encode the value of `data.f1` to the end users ? 
   
   

##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -76,7 +81,7 @@ public IcebergTableSource(TableLoader loader, TableSchema schema, Map<String, St
   }
 
   private IcebergTableSource(TableLoader loader, TableSchema schema, Map<String, String> properties,
-                             int[] projectedFields, boolean isLimitPushDown,
+                             int[][] projectedFields, boolean isLimitPushDown,

Review comment:
       What's the value of `topProjectedFields` in this constructor ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817289425



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -49,7 +51,8 @@
 public class IcebergTableSource
     implements ScanTableSource, SupportsProjectionPushDown, SupportsFilterPushDown, SupportsLimitPushDown {
 
-  private int[] projectedFields;
+  private int[][] projectedFields;
+  private int[] topProjectedFields;

Review comment:
       I just cannot understand the underlying meaning from the variable name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817296545



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Projects a (possibly nested) row data type, converts the original field obtained from the file to the flink output.
+ *
+ * <p>For example, original data with fields: {@code [id:Int,st:Struct[a:String,b:String] ...]}, Output
+ * projection Field: {@code [id,st.a]} with path: [[0],[1,0]], For the {@code st}(nested type) field, {@code a} is
+ * projected by taking it out of the nested body as an independent field.
+ */
+public class RowDataNestProjection implements RowData {
+  private final FieldGetter[] getters;
+  private final boolean nestFlat;
+  private final int[][] projectedFields;
+  private RowData rowData;
+
+  private RowDataNestProjection(RowType rowType,

Review comment:
       Nit: Are you formatting the code with the latest checkstyle.xml ?  I see the format style are different after I applied my  iceberg idea checkstyle.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r839156172



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
##########
@@ -77,9 +85,19 @@ public RowDataFileScanTaskReader(Schema tableSchema, Schema projectedSchema,
     );
 
     // Project the RowData to remove the extra meta columns.
-    if (!projectedSchema.sameSchema(deletes.requiredSchema())) {
-      RowDataProjection rowDataProjection = RowDataProjection.create(
-          deletes.requiredRowType(), deletes.requiredSchema().asStruct(), projectedSchema.asStruct());
+    if (!projectedSchema.sameSchema(deletes.requiredSchema()) && projectedFields == null) {
+      Map<Integer, Integer> fieldIdToPosition = Maps.newHashMapWithExpectedSize(tableSchema.asStruct().fields().size());
+      for (int i = 0; i < tableSchema.asStruct().fields().size(); i++) {
+        fieldIdToPosition.put(tableSchema.asStruct().fields().get(i).fieldId(), i);
+      }
+      projectedFields = projectedSchema.columns().stream()
+          .map(field -> new int[] {fieldIdToPosition.get(field.fieldId())})
+          .toArray(int[][]::new);
+    }
+
+    if (projectedFields != null) {
+      RowDataProjection rowDataProjection = RowDataProjection.create(tableSchema, projectedSchema,
+          projectedFields);

Review comment:
       Those changes are not required in my thought, according to my above comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817652571



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Projects a (possibly nested) row data type, converts the original field obtained from the file to the flink output.
+ *
+ * <p>For example, original data with fields: {@code [id:Int,st:Struct[a:String,b:String] ...]}, Output
+ * projection Field: {@code [id,st.a]} with path: [[0],[1,0]], For the {@code st}(nested type) field, {@code a} is
+ * projected by taking it out of the nested body as an independent field.
+ */
+public class RowDataNestProjection implements RowData {
+  private final FieldGetter[] getters;
+  private final boolean nestFlat;
+  private final int[][] projectedFields;
+  private RowData rowData;
+
+  private RowDataNestProjection(RowType rowType,

Review comment:
       fixed. please try again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817648602



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -179,4 +171,20 @@ public DynamicTableSource copy() {
   public String asSummaryString() {
     return "Iceberg table source";
   }
+
+  private static TableSchema projectSchema(TableSchema tableSchema, int[] projectedFields) {

Review comment:
       I didn't use nested projection in the first place. Initially, I pull the data using the top-level projection(topProjectedFields mentioned above), and then prun the data based on projected field index(int[][]);
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r822521866



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
##########
@@ -83,6 +95,12 @@ public RowDataFileScanTaskReader(Schema tableSchema, Schema projectedSchema,
       iterable = CloseableIterable.transform(iterable, rowDataProjection::wrap);
     }
 
+    if (projectedFields != null) {
+      RowDataNestProjection rowDataProjection = RowDataNestProjection.create(tableSchema, projectedSchema,
+          projectedFields);
+      iterable = CloseableIterable.transform(iterable, rowDataProjection::wrap);
+    }

Review comment:
       I optimized this part and unified it. Please take a look. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r793302719



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
##########
@@ -83,6 +95,12 @@ public RowDataFileScanTaskReader(Schema tableSchema, Schema projectedSchema,
       iterable = CloseableIterable.transform(iterable, rowDataProjection::wrap);
     }
 
+    if (projectedFields != null) {
+      RowDataNestProjection rowDataProjection = RowDataNestProjection.create(tableSchema, projectedSchema,
+          projectedFields);
+      iterable = CloseableIterable.transform(iterable, rowDataProjection::wrap);
+    }
+

Review comment:
       Add the transform action to extract fields from the original data one by one.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#issuecomment-1041177678


   ping @stevenzwu , cloud you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817291446



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -179,4 +171,20 @@ public DynamicTableSource copy() {
   public String asSummaryString() {
     return "Iceberg table source";
   }
+
+  private static TableSchema projectSchema(TableSchema tableSchema, int[] projectedFields) {
+    Preconditions.checkArgument(
+        org.apache.flink.table.utils.TableSchemaUtils.containsPhysicalColumnsOnly(tableSchema),

Review comment:
       Nit:  It's recommend to import the package `org.apache.flink.table.utils.TableSchemaUtils` and then invoke the `TableSchemaUtils.containsPhysicalColumnsOnly` method here ( instead of calling the full qualifier name).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817292920



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -179,4 +171,20 @@ public DynamicTableSource copy() {
   public String asSummaryString() {
     return "Iceberg table source";
   }
+
+  private static TableSchema projectSchema(TableSchema tableSchema, int[] projectedFields) {
+    Preconditions.checkArgument(
+        org.apache.flink.table.utils.TableSchemaUtils.containsPhysicalColumnsOnly(tableSchema),

Review comment:
       [Here](https://github.com/apache/iceberg/blob/2d4b0ddc76fd47aa27ca4972d4f3a6f256921c58/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java#L472-L476) is another place to check the physical columns,  I think we can abstract them into a unified common method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
hililiwei commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r817646487



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Projects a (possibly nested) row data type, converts the original field obtained from the file to the flink output.
+ *
+ * <p>For example, original data with fields: {@code [id:Int,st:Struct[a:String,b:String] ...]}, Output
+ * projection Field: {@code [id,st.a]} with path: [[0],[1,0]], For the {@code st}(nested type) field, {@code a} is
+ * projected by taking it out of the nested body as an independent field.
+ */
+public class RowDataNestProjection implements RowData {
+  private final FieldGetter[] getters;
+  private final boolean nestFlat;
+  private final int[][] projectedFields;
+  private RowData rowData;
+
+  private RowDataNestProjection(RowType rowType,
+                                Types.StructType schema,
+                                Types.StructType rowStruct,
+                                Types.StructType projectType,
+                                int[][] projectedFields,
+                                boolean nestFlat) {

Review comment:
       Yes, no need for it. Removed.

##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Projects a (possibly nested) row data type, converts the original field obtained from the file to the flink output.
+ *
+ * <p>For example, original data with fields: {@code [id:Int,st:Struct[a:String,b:String] ...]}, Output
+ * projection Field: {@code [id,st.a]} with path: [[0],[1,0]], For the {@code st}(nested type) field, {@code a} is
+ * projected by taking it out of the nested body as an independent field.
+ */
+public class RowDataNestProjection implements RowData {
+  private final FieldGetter[] getters;
+  private final boolean nestFlat;
+  private final int[][] projectedFields;
+  private RowData rowData;
+
+  private RowDataNestProjection(RowType rowType,
+                                Types.StructType schema,
+                                Types.StructType rowStruct,
+                                Types.StructType projectType,
+                                int[][] projectedFields,
+                                boolean nestFlat) {
+    this.projectedFields = projectedFields;
+    this.nestFlat = nestFlat;
+
+    this.getters = new FieldGetter[projectedFields.length];
+    for (int i = 0; i < getters.length; i++) {
+      int[] projectedFieldOne = projectedFields[i];
+      int fieldIndex = projectedFieldOne[0];
+
+      Types.NestedField projectField = rowStruct.fields().get(fieldIndex);
+      Types.NestedField rowField = schema.field(projectField.fieldId());
+
+      getters[i] = createFieldGetter(rowType, rowField, fieldIndex, projectType, projectField, i, projectedFields,
+          projectedFieldOne);
+    }
+  }
+
+  private RowDataNestProjection(RowType rowType,
+                                Types.StructType schema,
+                                Types.StructType projectType,
+                                int[][] projectedFields,
+                                boolean nestFlat) {
+    this.projectedFields = projectedFields;
+    this.nestFlat = nestFlat;
+
+    this.getters = new FieldGetter[projectType.fields().size()];
+    for (int i = 0; i < getters.length; i++) {
+      int[] projectedFieldOne = projectedFields[0];
+      int fieldIndex = projectedFieldOne[0];
+      if (i == fieldIndex) {
+        Types.NestedField projectField = projectType.fields().get(i);
+        Types.NestedField rowField = schema.field(projectField.fieldId());
+
+        getters[i] = createFieldGetter(rowType, rowField, i, projectType, projectField, i, projectedFields,
+            projectedFieldOne);
+      }
+    }
+  }
+
+  private static FieldGetter createFieldGetter(RowType rowType,
+                                               Types.NestedField rowField,
+                                               int rowTypePosition,
+                                               Types.StructType projectFieldsType,
+                                               Types.NestedField projectField,
+                                               int projectFieldPosition,
+                                               int[][] projectedFields,
+                                               int[] projectedFieldOne) {
+    switch (projectField.type().typeId()) {
+      case STRUCT:
+        if (projectedFields == null || projectedFields[projectFieldPosition].length <= 1) {
+          return RowData.createFieldGetter(rowType.getTypeAt(rowTypePosition),
+              projectFieldsType.fields().indexOf(projectField));
+        }
+        RowType nestedRowType = (RowType) rowType.getTypeAt(rowTypePosition);
+        return row -> {
+          int[] target = new int[projectedFieldOne.length - 1];
+          System.arraycopy(projectedFieldOne, 1, target, 0, target.length);

Review comment:
       Thanks, it does seem to bottleneck. I've put it outside the inner class so that it will only run during the planning phase, greatly reducing the number of replications.
   For example: three times for `SELECT id, st.a, st.b, st2.a FROM t`, twice for `SELECT id, st.a, st.b FROM t`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #3991: Flink: Support nested projection

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r821476799



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
##########
@@ -83,6 +95,12 @@ public RowDataFileScanTaskReader(Schema tableSchema, Schema projectedSchema,
       iterable = CloseableIterable.transform(iterable, rowDataProjection::wrap);
     }
 
+    if (projectedFields != null) {
+      RowDataNestProjection rowDataProjection = RowDataNestProjection.create(tableSchema, projectedSchema,
+          projectedFields);
+      iterable = CloseableIterable.transform(iterable, rowDataProjection::wrap);
+    }

Review comment:
       Why do we need to project the iterable<RowData> twice ( we have already projected the fields in line95, so why need this extra projection...)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org