You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/20 20:48:40 UTC

[incubator-iceberg] branch master updated: Collect lower/upper bounds for nested struct fields in ParquetMetrics (#136)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c383dd8  Collect lower/upper bounds for nested struct fields in ParquetMetrics (#136)
c383dd8 is described below

commit c383dd87a89e35d622e9c458fd711931cbc5e96f
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Mar 20 20:48:35 2019 +0000

    Collect lower/upper bounds for nested struct fields in ParquetMetrics (#136)
---
 .../netflix/iceberg/parquet/ParquetMetrics.java    | 28 ++++++++++++---
 .../iceberg/parquet/TestParquetMetrics.java        | 42 +++++++++++++++++++++-
 2 files changed, 64 insertions(+), 6 deletions(-)

diff --git a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java
index 17ec12d..137b9ab 100644
--- a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java
+++ b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java
@@ -27,16 +27,19 @@ import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.expressions.Literal;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.types.Conversions;
+import com.netflix.iceberg.types.Type;
 import com.netflix.iceberg.types.Types;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -71,7 +74,8 @@ public class ParquetMetrics implements Serializable {
     for (BlockMetaData block : blocks) {
       rowCount += block.getRowCount();
       for (ColumnChunkMetaData column : block.getColumns()) {
-        int fieldId = fileSchema.aliasToId(column.getPath().toDotString());
+        ColumnPath path = column.getPath();
+        int fieldId = fileSchema.aliasToId(path.toDotString());
         increment(columnSizes, fieldId, column.getTotalSize());
         increment(valueCounts, fieldId, column.getValueCount());
 
@@ -81,10 +85,8 @@ public class ParquetMetrics implements Serializable {
         } else if (!stats.isEmpty()) {
           increment(nullValueCounts, fieldId, stats.getNumNulls());
 
-          // only add min/max stats for top-level fields
-          // TODO: allow struct nesting, but not maps or arrays
-          Types.NestedField field = fileSchema.asStruct().field(fieldId);
-          if (field != null && stats.hasNonNullValue()) {
+          Types.NestedField field = fileSchema.findField(fieldId);
+          if (field != null && stats.hasNonNullValue() && shouldStoreBounds(path, fileSchema)) {
             updateMin(lowerBounds, fieldId,
                 fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMin()));
             updateMax(upperBounds, fieldId,
@@ -105,6 +107,22 @@ public class ParquetMetrics implements Serializable {
         toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema, upperBounds));
   }
 
+  // we allow struct nesting, but not maps or arrays
+  private static boolean shouldStoreBounds(ColumnPath columnPath, Schema schema) {
+    Iterator<String> pathIterator = columnPath.iterator();
+    Type currentType = schema.asStruct();
+
+    while (pathIterator.hasNext()) {
+      if (currentType == null || !currentType.isStructType()) {
+        return false;
+      }
+      String fieldName = pathIterator.next();
+      currentType = currentType.asStructType().fieldType(fieldName);
+    }
+
+    return currentType != null && currentType.isPrimitiveType();
+  }
+
   private static void increment(Map<Integer, Long> columns, int fieldId, long amount) {
     if (columns != null) {
       if (columns.containsKey(fieldId)) {
diff --git a/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java b/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java
index 1e18ce1..3997a29 100644
--- a/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java
+++ b/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java
@@ -152,7 +152,7 @@ public class TestParquetMetrics {
     assertBounds(12, FixedType.ofLength(4),
         ByteBuffer.wrap(fixed.bytes()), ByteBuffer.wrap(fixed.bytes()), metrics);
     assertCounts(13, 2L, 0L, metrics);
-    assertBounds(13, FixedType.ofLength(4),
+    assertBounds(13, BinaryType.get(),
         ByteBuffer.wrap("S".getBytes()), ByteBuffer.wrap("W".getBytes()), metrics);
   }
 
@@ -182,6 +182,46 @@ public class TestParquetMetrics {
   }
 
   @Test
+  public void testMetricsForNestedStructFields() throws IOException {
+    StructType leafStructType = StructType.of(
+        optional(5, "leafLongCol", LongType.get()),
+        optional(6, "leafBinaryCol", BinaryType.get())
+    );
+    StructType nestedStructType = StructType.of(
+        required(3, "longCol", LongType.get()),
+        required(4, "leafStructCol", leafStructType)
+    );
+    Schema schema = new Schema(
+        required(1, "intCol", IntegerType.get()),
+        required(2, "nestedStructCol", nestedStructType)
+    );
+
+    Record leafStruct = new Record(AvroSchemaUtil.convert(leafStructType));
+    leafStruct.put("leafLongCol", 20L);
+    leafStruct.put("leafBinaryCol", "A".getBytes());
+    Record nestedStruct = new Record(AvroSchemaUtil.convert(nestedStructType));
+    nestedStruct.put("longCol", 100L);
+    nestedStruct.put("leafStructCol", leafStruct);
+    Record record = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+    record.put("intCol", Integer.MAX_VALUE);
+    record.put("nestedStructCol", nestedStruct);
+
+    File parquetFile = writeRecords(schema, record);
+
+    Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+    Assert.assertEquals(1L, (long) metrics.recordCount());
+    assertCounts(1, 1L, 0L, metrics);
+    assertBounds(1, IntegerType.get(), Integer.MAX_VALUE, Integer.MAX_VALUE, metrics);
+    assertCounts(3, 1L, 0L, metrics);
+    assertBounds(3, LongType.get(), 100L, 100L, metrics);
+    assertCounts(5, 1L, 0L, metrics);
+    assertBounds(5, LongType.get(), 20L, 20L, metrics);
+    assertCounts(6, 1L, 0L, metrics);
+    assertBounds(6, BinaryType.get(),
+        ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics);
+  }
+
+  @Test
   public void testMetricsForListAndMapElements() throws IOException {
     StructType structType = StructType.of(
         required(1, "leafIntCol", IntegerType.get()),