You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2022/12/02 17:54:55 UTC
[iceberg] branch master updated: Flink: Port #4627 to Flink 1.14/1.15 (#6333)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new bc465add69 Flink: Port #4627 to Flink 1.14/1.15 (#6333)
bc465add69 is described below
commit bc465add69cda31b5c6f32c9616a889d95be5471
Author: Xianyang Liu <li...@hotmail.com>
AuthorDate: Sat Dec 3 01:54:49 2022 +0800
Flink: Port #4627 to Flink 1.14/1.15 (#6333)
---
.../iceberg/flink/data/FlinkParquetReaders.java | 11 ++++-
.../iceberg/flink/source/TestFlinkInputFormat.java | 51 ++++++++++++++++++++++
.../iceberg/flink/data/FlinkParquetReaders.java | 11 ++++-
.../iceberg/flink/source/TestFlinkInputFormat.java | 51 ++++++++++++++++++++++
4 files changed, 122 insertions(+), 2 deletions(-)
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index 4189d0ae42..ab7b1174c9 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -92,6 +92,7 @@ public class FlinkParquetReaders {
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
+ Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
@@ -101,6 +102,9 @@ public class FlinkParquetReaders {
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
typesById.put(id, fieldType);
+ if (idToConstant.containsKey(id)) {
+ maxDefinitionLevelsById.put(id, fieldD);
+ }
}
}
}
@@ -110,11 +114,16 @@ public class FlinkParquetReaders {
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
+ // Defaulting to parent max definition level
+ int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
- reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+ int fieldMaxDefinitionLevel =
+ maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
+ reorderedFields.add(
+ ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
types.add(null);
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
index d21b8fd384..73d03710d3 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
@@ -21,12 +21,15 @@ package org.apache.iceberg.flink.source;
import static org.apache.iceberg.types.Types.NestedField.required;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -34,9 +37,11 @@ import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
+import org.junit.Assume;
import org.junit.Test;
/** Test {@link FlinkInputFormat}. */
@@ -135,6 +140,52 @@ public class TestFlinkInputFormat extends TestFlinkSource {
TestHelpers.assertRows(result, expected);
}
+ @Test
+ public void testReadPartitionColumn() throws Exception {
+ Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat);
+
+ Schema nestedSchema =
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(
+ 2,
+ "struct",
+ Types.StructType.of(
+ Types.NestedField.optional(3, "innerId", Types.LongType.get()),
+ Types.NestedField.optional(4, "innerName", Types.StringType.get()))));
+ PartitionSpec spec =
+ PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build();
+
+ Table table =
+ catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, nestedSchema, spec);
+ List<Record> records = RandomGenericData.generate(nestedSchema, 10, 0L);
+ GenericAppenderHelper appender = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+ for (Record record : records) {
+ org.apache.iceberg.TestHelpers.Row partition =
+ org.apache.iceberg.TestHelpers.Row.of(record.get(1, Record.class).get(1));
+ appender.appendToTable(partition, Collections.singletonList(record));
+ }
+
+ TableSchema projectedSchema =
+ TableSchema.builder()
+ .field("struct", DataTypes.ROW(DataTypes.FIELD("innerName", DataTypes.STRING())))
+ .build();
+ List<Row> result =
+ runFormat(
+ FlinkSource.forRowData()
+ .tableLoader(tableLoader())
+ .project(projectedSchema)
+ .buildFormat());
+
+ List<Row> expected = Lists.newArrayList();
+ for (Record record : records) {
+ Row nested = Row.of(((Record) record.get(1)).get(1));
+ expected.add(Row.of(nested));
+ }
+
+ TestHelpers.assertRows(result, expected);
+ }
+
private List<Row> runFormat(FlinkInputFormat inputFormat) throws IOException {
RowType rowType = FlinkSchemaUtil.convert(inputFormat.projectedSchema());
return TestHelpers.readRows(inputFormat, rowType);
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index 4189d0ae42..ab7b1174c9 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -92,6 +92,7 @@ public class FlinkParquetReaders {
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
+ Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
@@ -101,6 +102,9 @@ public class FlinkParquetReaders {
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
typesById.put(id, fieldType);
+ if (idToConstant.containsKey(id)) {
+ maxDefinitionLevelsById.put(id, fieldD);
+ }
}
}
}
@@ -110,11 +114,16 @@ public class FlinkParquetReaders {
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
+ // Defaulting to parent max definition level
+ int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
- reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+ int fieldMaxDefinitionLevel =
+ maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
+ reorderedFields.add(
+ ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
types.add(null);
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
index d21b8fd384..73d03710d3 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
@@ -21,12 +21,15 @@ package org.apache.iceberg.flink.source;
import static org.apache.iceberg.types.Types.NestedField.required;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -34,9 +37,11 @@ import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
+import org.junit.Assume;
import org.junit.Test;
/** Test {@link FlinkInputFormat}. */
@@ -135,6 +140,52 @@ public class TestFlinkInputFormat extends TestFlinkSource {
TestHelpers.assertRows(result, expected);
}
+ @Test
+ public void testReadPartitionColumn() throws Exception {
+ Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat);
+
+ Schema nestedSchema =
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(
+ 2,
+ "struct",
+ Types.StructType.of(
+ Types.NestedField.optional(3, "innerId", Types.LongType.get()),
+ Types.NestedField.optional(4, "innerName", Types.StringType.get()))));
+ PartitionSpec spec =
+ PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build();
+
+ Table table =
+ catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, nestedSchema, spec);
+ List<Record> records = RandomGenericData.generate(nestedSchema, 10, 0L);
+ GenericAppenderHelper appender = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+ for (Record record : records) {
+ org.apache.iceberg.TestHelpers.Row partition =
+ org.apache.iceberg.TestHelpers.Row.of(record.get(1, Record.class).get(1));
+ appender.appendToTable(partition, Collections.singletonList(record));
+ }
+
+ TableSchema projectedSchema =
+ TableSchema.builder()
+ .field("struct", DataTypes.ROW(DataTypes.FIELD("innerName", DataTypes.STRING())))
+ .build();
+ List<Row> result =
+ runFormat(
+ FlinkSource.forRowData()
+ .tableLoader(tableLoader())
+ .project(projectedSchema)
+ .buildFormat());
+
+ List<Row> expected = Lists.newArrayList();
+ for (Record record : records) {
+ Row nested = Row.of(((Record) record.get(1)).get(1));
+ expected.add(Row.of(nested));
+ }
+
+ TestHelpers.assertRows(result, expected);
+ }
+
private List<Row> runFormat(FlinkInputFormat inputFormat) throws IOException {
RowType rowType = FlinkSchemaUtil.convert(inputFormat.projectedSchema());
return TestHelpers.readRows(inputFormat, rowType);