You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/20 20:48:40 UTC
[incubator-iceberg] branch master updated: Collect lower/upper
bounds for nested struct fields in ParquetMetrics (#136)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c383dd8 Collect lower/upper bounds for nested struct fields in ParquetMetrics (#136)
c383dd8 is described below
commit c383dd87a89e35d622e9c458fd711931cbc5e96f
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Mar 20 20:48:35 2019 +0000
Collect lower/upper bounds for nested struct fields in ParquetMetrics (#136)
---
.../netflix/iceberg/parquet/ParquetMetrics.java | 28 ++++++++++++---
.../iceberg/parquet/TestParquetMetrics.java | 42 +++++++++++++++++++++-
2 files changed, 64 insertions(+), 6 deletions(-)
diff --git a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java
index 17ec12d..137b9ab 100644
--- a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java
+++ b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java
@@ -27,16 +27,19 @@ import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.expressions.Literal;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.types.Conversions;
+import com.netflix.iceberg.types.Type;
import com.netflix.iceberg.types.Types;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -71,7 +74,8 @@ public class ParquetMetrics implements Serializable {
for (BlockMetaData block : blocks) {
rowCount += block.getRowCount();
for (ColumnChunkMetaData column : block.getColumns()) {
- int fieldId = fileSchema.aliasToId(column.getPath().toDotString());
+ ColumnPath path = column.getPath();
+ int fieldId = fileSchema.aliasToId(path.toDotString());
increment(columnSizes, fieldId, column.getTotalSize());
increment(valueCounts, fieldId, column.getValueCount());
@@ -81,10 +85,8 @@ public class ParquetMetrics implements Serializable {
} else if (!stats.isEmpty()) {
increment(nullValueCounts, fieldId, stats.getNumNulls());
- // only add min/max stats for top-level fields
- // TODO: allow struct nesting, but not maps or arrays
- Types.NestedField field = fileSchema.asStruct().field(fieldId);
- if (field != null && stats.hasNonNullValue()) {
+ Types.NestedField field = fileSchema.findField(fieldId);
+ if (field != null && stats.hasNonNullValue() && shouldStoreBounds(path, fileSchema)) {
updateMin(lowerBounds, fieldId,
fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMin()));
updateMax(upperBounds, fieldId,
@@ -105,6 +107,22 @@ public class ParquetMetrics implements Serializable {
toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema, upperBounds));
}
+ // we allow struct nesting, but not maps or arrays
+ private static boolean shouldStoreBounds(ColumnPath columnPath, Schema schema) {
+ Iterator<String> pathIterator = columnPath.iterator();
+ Type currentType = schema.asStruct();
+
+ while (pathIterator.hasNext()) {
+ if (currentType == null || !currentType.isStructType()) {
+ return false;
+ }
+ String fieldName = pathIterator.next();
+ currentType = currentType.asStructType().fieldType(fieldName);
+ }
+
+ return currentType != null && currentType.isPrimitiveType();
+ }
+
private static void increment(Map<Integer, Long> columns, int fieldId, long amount) {
if (columns != null) {
if (columns.containsKey(fieldId)) {
diff --git a/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java b/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java
index 1e18ce1..3997a29 100644
--- a/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java
+++ b/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java
@@ -152,7 +152,7 @@ public class TestParquetMetrics {
assertBounds(12, FixedType.ofLength(4),
ByteBuffer.wrap(fixed.bytes()), ByteBuffer.wrap(fixed.bytes()), metrics);
assertCounts(13, 2L, 0L, metrics);
- assertBounds(13, FixedType.ofLength(4),
+ assertBounds(13, BinaryType.get(),
ByteBuffer.wrap("S".getBytes()), ByteBuffer.wrap("W".getBytes()), metrics);
}
@@ -182,6 +182,46 @@ public class TestParquetMetrics {
}
@Test
+ public void testMetricsForNestedStructFields() throws IOException {
+ StructType leafStructType = StructType.of(
+ optional(5, "leafLongCol", LongType.get()),
+ optional(6, "leafBinaryCol", BinaryType.get())
+ );
+ StructType nestedStructType = StructType.of(
+ required(3, "longCol", LongType.get()),
+ required(4, "leafStructCol", leafStructType)
+ );
+ Schema schema = new Schema(
+ required(1, "intCol", IntegerType.get()),
+ required(2, "nestedStructCol", nestedStructType)
+ );
+
+ Record leafStruct = new Record(AvroSchemaUtil.convert(leafStructType));
+ leafStruct.put("leafLongCol", 20L);
+ leafStruct.put("leafBinaryCol", "A".getBytes());
+ Record nestedStruct = new Record(AvroSchemaUtil.convert(nestedStructType));
+ nestedStruct.put("longCol", 100L);
+ nestedStruct.put("leafStructCol", leafStruct);
+ Record record = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+ record.put("intCol", Integer.MAX_VALUE);
+ record.put("nestedStructCol", nestedStruct);
+
+ File parquetFile = writeRecords(schema, record);
+
+ Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+ Assert.assertEquals(1L, (long) metrics.recordCount());
+ assertCounts(1, 1L, 0L, metrics);
+ assertBounds(1, IntegerType.get(), Integer.MAX_VALUE, Integer.MAX_VALUE, metrics);
+ assertCounts(3, 1L, 0L, metrics);
+ assertBounds(3, LongType.get(), 100L, 100L, metrics);
+ assertCounts(5, 1L, 0L, metrics);
+ assertBounds(5, LongType.get(), 20L, 20L, metrics);
+ assertCounts(6, 1L, 0L, metrics);
+ assertBounds(6, BinaryType.get(),
+ ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics);
+ }
+
+ @Test
public void testMetricsForListAndMapElements() throws IOException {
StructType structType = StructType.of(
required(1, "leafIntCol", IntegerType.get()),