You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2022/12/02 17:54:55 UTC

[iceberg] branch master updated: Flink: Port #4627 to Flink 1.14/1.15 (#6333)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bc465add69 Flink: Port #4627 to Flink 1.14/1.15 (#6333)
bc465add69 is described below

commit bc465add69cda31b5c6f32c9616a889d95be5471
Author: Xianyang Liu <li...@hotmail.com>
AuthorDate: Sat Dec 3 01:54:49 2022 +0800

    Flink: Port #4627 to Flink 1.14/1.15 (#6333)
---
 .../iceberg/flink/data/FlinkParquetReaders.java    | 11 ++++-
 .../iceberg/flink/source/TestFlinkInputFormat.java | 51 ++++++++++++++++++++++
 .../iceberg/flink/data/FlinkParquetReaders.java    | 11 ++++-
 .../iceberg/flink/source/TestFlinkInputFormat.java | 51 ++++++++++++++++++++++
 4 files changed, 122 insertions(+), 2 deletions(-)

diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index 4189d0ae42..ab7b1174c9 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -92,6 +92,7 @@ public class FlinkParquetReaders {
       // match the expected struct's order
       Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
       Map<Integer, Type> typesById = Maps.newHashMap();
+      Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
       List<Type> fields = struct.getFields();
       for (int i = 0; i < fields.size(); i += 1) {
         Type fieldType = fields.get(i);
@@ -101,6 +102,9 @@ public class FlinkParquetReaders {
             int id = fieldType.getId().intValue();
             readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
             typesById.put(id, fieldType);
+            if (idToConstant.containsKey(id)) {
+              maxDefinitionLevelsById.put(id, fieldD);
+            }
           }
         }
       }
@@ -110,11 +114,16 @@ public class FlinkParquetReaders {
       List<ParquetValueReader<?>> reorderedFields =
           Lists.newArrayListWithExpectedSize(expectedFields.size());
       List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
+      // Defaulting to parent max definition level
+      int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
       for (Types.NestedField field : expectedFields) {
         int id = field.fieldId();
         if (idToConstant.containsKey(id)) {
           // containsKey is used because the constant may be null
-          reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          int fieldMaxDefinitionLevel =
+              maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
+          reorderedFields.add(
+              ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
           types.add(null);
         } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
           reorderedFields.add(ParquetValueReaders.position());
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
index d21b8fd384..73d03710d3 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
@@ -21,12 +21,15 @@ package org.apache.iceberg.flink.source;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -34,9 +37,11 @@ import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
+import org.junit.Assume;
 import org.junit.Test;
 
 /** Test {@link FlinkInputFormat}. */
@@ -135,6 +140,52 @@ public class TestFlinkInputFormat extends TestFlinkSource {
     TestHelpers.assertRows(result, expected);
   }
 
+  @Test
+  public void testReadPartitionColumn() throws Exception {
+    Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat);
+
+    Schema nestedSchema =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.LongType.get()),
+            Types.NestedField.optional(
+                2,
+                "struct",
+                Types.StructType.of(
+                    Types.NestedField.optional(3, "innerId", Types.LongType.get()),
+                    Types.NestedField.optional(4, "innerName", Types.StringType.get()))));
+    PartitionSpec spec =
+        PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build();
+
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, nestedSchema, spec);
+    List<Record> records = RandomGenericData.generate(nestedSchema, 10, 0L);
+    GenericAppenderHelper appender = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+    for (Record record : records) {
+      org.apache.iceberg.TestHelpers.Row partition =
+          org.apache.iceberg.TestHelpers.Row.of(record.get(1, Record.class).get(1));
+      appender.appendToTable(partition, Collections.singletonList(record));
+    }
+
+    TableSchema projectedSchema =
+        TableSchema.builder()
+            .field("struct", DataTypes.ROW(DataTypes.FIELD("innerName", DataTypes.STRING())))
+            .build();
+    List<Row> result =
+        runFormat(
+            FlinkSource.forRowData()
+                .tableLoader(tableLoader())
+                .project(projectedSchema)
+                .buildFormat());
+
+    List<Row> expected = Lists.newArrayList();
+    for (Record record : records) {
+      Row nested = Row.of(((Record) record.get(1)).get(1));
+      expected.add(Row.of(nested));
+    }
+
+    TestHelpers.assertRows(result, expected);
+  }
+
   private List<Row> runFormat(FlinkInputFormat inputFormat) throws IOException {
     RowType rowType = FlinkSchemaUtil.convert(inputFormat.projectedSchema());
     return TestHelpers.readRows(inputFormat, rowType);
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index 4189d0ae42..ab7b1174c9 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -92,6 +92,7 @@ public class FlinkParquetReaders {
       // match the expected struct's order
       Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
       Map<Integer, Type> typesById = Maps.newHashMap();
+      Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
       List<Type> fields = struct.getFields();
       for (int i = 0; i < fields.size(); i += 1) {
         Type fieldType = fields.get(i);
@@ -101,6 +102,9 @@ public class FlinkParquetReaders {
             int id = fieldType.getId().intValue();
             readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
             typesById.put(id, fieldType);
+            if (idToConstant.containsKey(id)) {
+              maxDefinitionLevelsById.put(id, fieldD);
+            }
           }
         }
       }
@@ -110,11 +114,16 @@ public class FlinkParquetReaders {
       List<ParquetValueReader<?>> reorderedFields =
           Lists.newArrayListWithExpectedSize(expectedFields.size());
       List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
+      // Defaulting to parent max definition level
+      int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
       for (Types.NestedField field : expectedFields) {
         int id = field.fieldId();
         if (idToConstant.containsKey(id)) {
           // containsKey is used because the constant may be null
-          reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          int fieldMaxDefinitionLevel =
+              maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
+          reorderedFields.add(
+              ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
           types.add(null);
         } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
           reorderedFields.add(ParquetValueReaders.position());
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
index d21b8fd384..73d03710d3 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
@@ -21,12 +21,15 @@ package org.apache.iceberg.flink.source;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -34,9 +37,11 @@ import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
+import org.junit.Assume;
 import org.junit.Test;
 
 /** Test {@link FlinkInputFormat}. */
@@ -135,6 +140,52 @@ public class TestFlinkInputFormat extends TestFlinkSource {
     TestHelpers.assertRows(result, expected);
   }
 
+  @Test
+  public void testReadPartitionColumn() throws Exception {
+    Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat);
+
+    Schema nestedSchema =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.LongType.get()),
+            Types.NestedField.optional(
+                2,
+                "struct",
+                Types.StructType.of(
+                    Types.NestedField.optional(3, "innerId", Types.LongType.get()),
+                    Types.NestedField.optional(4, "innerName", Types.StringType.get()))));
+    PartitionSpec spec =
+        PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build();
+
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, nestedSchema, spec);
+    List<Record> records = RandomGenericData.generate(nestedSchema, 10, 0L);
+    GenericAppenderHelper appender = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+    for (Record record : records) {
+      org.apache.iceberg.TestHelpers.Row partition =
+          org.apache.iceberg.TestHelpers.Row.of(record.get(1, Record.class).get(1));
+      appender.appendToTable(partition, Collections.singletonList(record));
+    }
+
+    TableSchema projectedSchema =
+        TableSchema.builder()
+            .field("struct", DataTypes.ROW(DataTypes.FIELD("innerName", DataTypes.STRING())))
+            .build();
+    List<Row> result =
+        runFormat(
+            FlinkSource.forRowData()
+                .tableLoader(tableLoader())
+                .project(projectedSchema)
+                .buildFormat());
+
+    List<Row> expected = Lists.newArrayList();
+    for (Record record : records) {
+      Row nested = Row.of(((Record) record.get(1)).get(1));
+      expected.add(Row.of(nested));
+    }
+
+    TestHelpers.assertRows(result, expected);
+  }
+
   private List<Row> runFormat(FlinkInputFormat inputFormat) throws IOException {
     RowType rowType = FlinkSchemaUtil.convert(inputFormat.projectedSchema());
     return TestHelpers.readRows(inputFormat, rowType);