You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/29 21:06:11 UTC
[iceberg] 09/09: Parquet: Fix map projection after map to key_value
rename (#3309)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch 0.12.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit a7859dc430a33568922047eb4fe8f4519b320715
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Tue Oct 19 08:05:48 2021 -0700
Parquet: Fix map projection after map to key_value rename (#3309)
---
.../org/apache/iceberg/parquet/PruneColumns.java | 15 +--
.../apache/iceberg/parquet/TestPruneColumns.java | 136 +++++++++++++++++++++
2 files changed, 138 insertions(+), 13 deletions(-)
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java
index acdda78..f181875 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java
@@ -104,12 +104,7 @@ class PruneColumns extends ParquetTypeVisitor<Type> {
return list;
} else if (element != null) {
if (!Objects.equal(element, originalElement)) {
- Integer listId = getId(list);
- // the element type was projected
- Type listType = Types.list(list.getRepetition())
- .element(element)
- .named(list.getName());
- return listId == null ? listType : listType.withId(listId);
+ return list.withNewFields(repeated.withNewFields(element));
}
return list;
}
@@ -129,14 +124,8 @@ class PruneColumns extends ParquetTypeVisitor<Type> {
if ((keyId != null && selectedIds.contains(keyId)) || (valueId != null && selectedIds.contains(valueId))) {
return map;
} else if (value != null) {
- Integer mapId = getId(map);
if (!Objects.equal(value, originalValue)) {
- Type mapType = Types.map(map.getRepetition())
- .key(originalKey)
- .value(value)
- .named(map.getName());
-
- return mapId == null ? mapType : mapType.withId(mapId);
+ return map.withNewFields(repeated.withNewFields(originalKey, value));
}
return map;
}
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestPruneColumns.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestPruneColumns.java
new file mode 100644
index 0000000..dfa7e64
--- /dev/null
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestPruneColumns.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.ListType;
+import org.apache.iceberg.types.Types.MapType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestPruneColumns {
+ @Test
+ public void testMapKeyValueName() {
+ MessageType fileSchema = Types.buildMessage()
+ .addField(Types.buildGroup(Type.Repetition.OPTIONAL)
+ .addField(Types.buildGroup(Type.Repetition.REPEATED)
+ .addField(Types.primitive(PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED)
+ .as(LogicalTypeAnnotation.stringType())
+ .id(2)
+ .named("key"))
+ .addField(Types.buildGroup(Type.Repetition.OPTIONAL)
+ .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(4).named("x"))
+ .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(5).named("y"))
+ .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(6).named("z"))
+ .id(3)
+ .named("value"))
+ .named("custom_key_value_name"))
+ .as(LogicalTypeAnnotation.mapType())
+ .id(1)
+ .named("m"))
+ .named("table");
+
+ // project map.value.x and map.value.y
+ Schema projection = new Schema(
+ NestedField.optional(1, "m", MapType.ofOptional(2, 3,
+ StringType.get(),
+ StructType.of(
+ NestedField.required(4, "x", DoubleType.get()),
+ NestedField.required(5, "y", DoubleType.get())
+ )
+ ))
+ );
+
+ MessageType expected = Types.buildMessage()
+ .addField(Types.buildGroup(Type.Repetition.OPTIONAL)
+ .addField(Types.buildGroup(Type.Repetition.REPEATED)
+ .addField(Types.primitive(PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED)
+ .as(LogicalTypeAnnotation.stringType())
+ .id(2)
+ .named("key"))
+ .addField(Types.buildGroup(Type.Repetition.OPTIONAL)
+ .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(4).named("x"))
+ .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(5).named("y"))
+ .id(3)
+ .named("value"))
+ .named("custom_key_value_name"))
+ .as(LogicalTypeAnnotation.mapType())
+ .id(1)
+ .named("m"))
+ .named("table");
+
+ MessageType actual = ParquetSchemaUtil.pruneColumns(fileSchema, projection);
+ Assert.assertEquals("Pruned schema should not rename repeated struct", expected, actual);
+ }
+
+ @Test
+ public void testListElementName() {
+ MessageType fileSchema = Types.buildMessage()
+ .addField(Types.buildGroup(Type.Repetition.OPTIONAL)
+ .addField(Types.buildGroup(Type.Repetition.REPEATED)
+ .addField(Types.buildGroup(Type.Repetition.OPTIONAL)
+ .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(4).named("x"))
+ .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(5).named("y"))
+ .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(6).named("z"))
+ .id(3)
+ .named("custom_element_name"))
+ .named("custom_repeated_name"))
+ .as(LogicalTypeAnnotation.listType())
+ .id(1)
+ .named("m"))
+ .named("table");
+
+ // project map.value.x and map.value.y
+ Schema projection = new Schema(
+ NestedField.optional(1, "m", ListType.ofOptional(3,
+ StructType.of(
+ NestedField.required(4, "x", DoubleType.get()),
+ NestedField.required(5, "y", DoubleType.get())
+ )
+ ))
+ );
+
+ MessageType expected = Types.buildMessage()
+ .addField(Types.buildGroup(Type.Repetition.OPTIONAL)
+ .addField(Types.buildGroup(Type.Repetition.REPEATED)
+ .addField(Types.buildGroup(Type.Repetition.OPTIONAL)
+ .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(4).named("x"))
+ .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(5).named("y"))
+ .id(3)
+ .named("custom_element_name"))
+ .named("custom_repeated_name"))
+ .as(LogicalTypeAnnotation.listType())
+ .id(1)
+ .named("m"))
+ .named("table");
+
+ MessageType actual = ParquetSchemaUtil.pruneColumns(fileSchema, projection);
+ Assert.assertEquals("Pruned schema should not rename repeated struct", expected, actual);
+ }
+}