You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/18 22:26:36 UTC

[incubator-iceberg] branch master updated: Fix collection of bounds for small decimals in ParquetMetrics (#131)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7169e21  Fix collection of bounds for small decimals in ParquetMetrics (#131)
7169e21 is described below

commit 7169e21fc4948ee079191a23dc7a4ce271d1224d
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Mar 18 22:26:31 2019 +0000

    Fix collection of bounds for small decimals in ParquetMetrics (#131)
---
 .../iceberg/parquet/ParquetConversions.java        |  66 +++--
 .../netflix/iceberg/parquet/ParquetMetrics.java    |   4 +-
 .../iceberg/parquet/TestParquetMetrics.java        | 266 +++++++++++++++++++++
 3 files changed, 299 insertions(+), 37 deletions(-)

diff --git a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetConversions.java b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetConversions.java
index e7a8024..b7d4d2d 100644
--- a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetConversions.java
+++ b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetConversions.java
@@ -21,14 +21,12 @@ package com.netflix.iceberg.parquet;
 
 import com.netflix.iceberg.expressions.Literal;
 import com.netflix.iceberg.types.Type;
-import com.netflix.iceberg.types.Types;
 import org.apache.commons.io.Charsets;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveType;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.util.UUID;
 import java.util.function.Function;
 
@@ -36,39 +34,37 @@ class ParquetConversions {
   private ParquetConversions() {
   }
 
-  static <T> Literal<T> fromParquetPrimitive(Type type, Object value) {
-    if (value instanceof Boolean) {
-      return Literal.of((Boolean) value).to(type);
-    } else if (value instanceof Integer) {
-      return Literal.of((Integer) value).to(type);
-    } else if (value instanceof Long) {
-      return Literal.of((Long) value).to(type);
-    } else if (value instanceof Float) {
-      return Literal.of((Float) value).to(type);
-    } else if (value instanceof Double) {
-      return Literal.of((Double) value).to(type);
-    } else if (value instanceof Binary) {
-      switch (type.typeId()) {
-        case STRING:
-          return Literal.of(Charsets.UTF_8.decode(((Binary) value).toByteBuffer())).to(type);
-        case UUID:
-          ByteBuffer buffer = ((Binary) value).toByteBuffer().order(ByteOrder.BIG_ENDIAN);
-          long mostSigBits = buffer.getLong();
-          long leastSigBits = buffer.getLong();
-          return Literal.of(new UUID(mostSigBits, leastSigBits)).to(type);
-        case FIXED:
-        case BINARY:
-          return Literal.of(((Binary) value).toByteBuffer()).to(type);
-        case DECIMAL:
-          Types.DecimalType decimal = (Types.DecimalType) type;
-          return Literal.of(
-              new BigDecimal(new BigInteger(((Binary) value).getBytes()), decimal.scale())
-          ).to(type);
-        default:
-          throw new IllegalArgumentException("Unsupported primitive type: " + type);
-      }
-    } else {
-      throw new IllegalArgumentException("Unsupported primitive value: " + value);
+  @SuppressWarnings("unchecked")
+  static <T> Literal<T> fromParquetPrimitive(Type type, PrimitiveType parquetType, Object value) {
+    switch (type.typeId()) {
+      case BOOLEAN:
+        return (Literal<T>) Literal.of((Boolean) value);
+      case INTEGER:
+      case DATE:
+        return (Literal<T>) Literal.of((Integer) value);
+      case LONG:
+      case TIME:
+      case TIMESTAMP:
+        return (Literal<T>) Literal.of((Long) value);
+      case FLOAT:
+        return (Literal<T>) Literal.of((Float) value);
+      case DOUBLE:
+        return (Literal<T>) Literal.of((Double) value);
+      case STRING:
+        Function<Object, Object> stringConversion = converterFromParquet(parquetType);
+        return (Literal<T>) Literal.of((CharSequence) stringConversion.apply(value));
+      case UUID:
+        Function<Object, Object> uuidConversion = converterFromParquet(parquetType);
+        return (Literal<T>) Literal.of((UUID) uuidConversion.apply(value));
+      case FIXED:
+      case BINARY:
+        Function<Object, Object> binaryConversion = converterFromParquet(parquetType);
+        return (Literal<T>) Literal.of((ByteBuffer) binaryConversion.apply(value));
+      case DECIMAL:
+        Function<Object, Object> decimalConversion = converterFromParquet(parquetType);
+        return (Literal<T>) Literal.of((BigDecimal) decimalConversion.apply(value));
+      default:
+        throw new IllegalArgumentException("Unsupported primitive type: " + type);
     }
   }
 
diff --git a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java
index 850588b..17ec12d 100644
--- a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java
+++ b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java
@@ -86,9 +86,9 @@ public class ParquetMetrics implements Serializable {
           Types.NestedField field = fileSchema.asStruct().field(fieldId);
           if (field != null && stats.hasNonNullValue()) {
             updateMin(lowerBounds, fieldId,
-                fromParquetPrimitive(field.type(), stats.genericGetMin()));
+                fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMin()));
             updateMax(upperBounds, fieldId,
-                fromParquetPrimitive(field.type(), stats.genericGetMax()));
+                fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMax()));
           }
         }
       }
diff --git a/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java b/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java
new file mode 100644
index 0000000..1e18ce1
--- /dev/null
+++ b/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg.parquet;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.netflix.iceberg.FileFormat;
+import com.netflix.iceberg.Metrics;
+import com.netflix.iceberg.Schema;
+import com.netflix.iceberg.avro.AvroSchemaUtil;
+import com.netflix.iceberg.io.FileAppender;
+import com.netflix.iceberg.types.Type;
+import com.netflix.iceberg.types.Types.BinaryType;
+import com.netflix.iceberg.types.Types.BooleanType;
+import com.netflix.iceberg.types.Types.DateType;
+import com.netflix.iceberg.types.Types.DecimalType;
+import com.netflix.iceberg.types.Types.DoubleType;
+import com.netflix.iceberg.types.Types.FixedType;
+import com.netflix.iceberg.types.Types.FloatType;
+import com.netflix.iceberg.types.Types.IntegerType;
+import com.netflix.iceberg.types.Types.ListType;
+import com.netflix.iceberg.types.Types.LongType;
+import com.netflix.iceberg.types.Types.MapType;
+import com.netflix.iceberg.types.Types.StringType;
+import com.netflix.iceberg.types.Types.StructType;
+import com.netflix.iceberg.types.Types.TimeType;
+import com.netflix.iceberg.types.Types.TimestampType;
+import com.netflix.iceberg.types.Types.UUIDType;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.commons.io.Charsets;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.Map;
+import java.util.UUID;
+
+import static com.netflix.iceberg.Files.localInput;
+import static com.netflix.iceberg.Files.localOutput;
+import static com.netflix.iceberg.types.Conversions.fromByteBuffer;
+import static com.netflix.iceberg.types.Types.NestedField.optional;
+import static com.netflix.iceberg.types.Types.NestedField.required;
+
+public class TestParquetMetrics {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private final UUID uuid = UUID.randomUUID();
+  private final GenericFixed fixed = new GenericData.Fixed(
+      org.apache.avro.Schema.createFixed("fixedCol", null, null, 4),
+      "abcd".getBytes(Charsets.UTF_8));
+
+  @Test
+  public void testMetricsForTopLevelFields() throws IOException {
+    Schema schema = new Schema(
+        optional(1, "booleanCol", BooleanType.get()),
+        required(2, "intCol", IntegerType.get()),
+        optional(3, "longCol", LongType.get()),
+        required(4, "floatCol", FloatType.get()),
+        optional(5, "doubleCol", DoubleType.get()),
+        optional(6, "decimalCol", DecimalType.of(10, 2)),
+        required(7, "stringCol", StringType.get()),
+        optional(8, "dateCol", DateType.get()),
+        required(9, "timeCol", TimeType.get()),
+        required(10, "timestampCol", TimestampType.withoutZone()),
+        optional(11, "uuidCol", UUIDType.get()),
+        required(12, "fixedCol", FixedType.ofLength(4)),
+        required(13, "binaryCol", BinaryType.get())
+    );
+
+    Record firstRecord = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+    firstRecord.put("booleanCol", true);
+    firstRecord.put("intCol", 3);
+    firstRecord.put("longCol", 5L);
+    firstRecord.put("floatCol", 2.0F);
+    firstRecord.put("doubleCol", 2.0D);
+    firstRecord.put("decimalCol", new BigDecimal("3.50"));
+    firstRecord.put("stringCol", "AAA");
+    firstRecord.put("dateCol", 1500);
+    firstRecord.put("timeCol", 2000L);
+    firstRecord.put("timestampCol", 0L);
+    firstRecord.put("uuidCol", uuid);
+    firstRecord.put("fixedCol", fixed);
+    firstRecord.put("binaryCol", "S".getBytes());
+    Record secondRecord = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+    secondRecord.put("booleanCol", false);
+    secondRecord.put("intCol", Integer.MIN_VALUE);
+    secondRecord.put("longCol", null);
+    secondRecord.put("floatCol", 1.0F);
+    secondRecord.put("doubleCol", null);
+    secondRecord.put("decimalCol", null);
+    secondRecord.put("stringCol", "ZZZ");
+    secondRecord.put("dateCol", null);
+    secondRecord.put("timeCol", 3000L);
+    secondRecord.put("timestampCol", 1000L);
+    secondRecord.put("uuidCol", null);
+    secondRecord.put("fixedCol", fixed);
+    secondRecord.put("binaryCol", "W".getBytes());
+
+    File parquetFile = writeRecords(schema, firstRecord, secondRecord);
+
+    Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+    Assert.assertEquals(2L, (long) metrics.recordCount());
+    assertCounts(1, 2L, 0L, metrics);
+    assertBounds(1, BooleanType.get(), false, true, metrics);
+    assertCounts(2, 2L, 0L, metrics);
+    assertBounds(2, IntegerType.get(), Integer.MIN_VALUE, 3, metrics);
+    assertCounts(3, 2L, 1L, metrics);
+    assertBounds(3, LongType.get(), 5L, 5L, metrics);
+    assertCounts(4, 2L, 0L, metrics);
+    assertBounds(4, FloatType.get(), 1.0F, 2.0F, metrics);
+    assertCounts(5, 2L, 1L, metrics);
+    assertBounds(5, DoubleType.get(), 2.0D, 2.0D, metrics);
+    assertCounts(6, 2L, 1L, metrics);
+    assertBounds(6, DecimalType.of(10, 2), new BigDecimal("3.50"), new BigDecimal("3.50"), metrics);
+    assertCounts(7, 2L, 0L, metrics);
+    assertBounds(7, StringType.get(), CharBuffer.wrap("AAA"), CharBuffer.wrap("ZZZ"), metrics);
+    assertCounts(8, 2L, 1L, metrics);
+    assertBounds(8, DateType.get(), 1500, 1500, metrics);
+    assertCounts(9, 2L, 0L, metrics);
+    assertBounds(9, TimeType.get(), 2000L, 3000L, metrics);
+    assertCounts(10, 2L, 0L, metrics);
+    assertBounds(10, TimestampType.withoutZone(), 0L, 1000L, metrics);
+    // TODO: enable once issue#126 is resolved
+    // assertCounts(11, 2L, 1L, metrics);
+    // assertBounds(11, UUIDType.get(), uuid, uuid, metrics);
+    assertCounts(12, 2L, 0L, metrics);
+    assertBounds(12, FixedType.ofLength(4),
+        ByteBuffer.wrap(fixed.bytes()), ByteBuffer.wrap(fixed.bytes()), metrics);
+    assertCounts(13, 2L, 0L, metrics);
+    assertBounds(13, FixedType.ofLength(4),
+        ByteBuffer.wrap("S".getBytes()), ByteBuffer.wrap("W".getBytes()), metrics);
+  }
+
+  @Test
+  public void testMetricsForDecimals() throws IOException {
+    Schema schema = new Schema(
+        required(1, "decimalAsInt32", DecimalType.of(4, 2)),
+        required(2, "decimalAsInt64", DecimalType.of(14, 2)),
+        required(3, "decimalAsFixed", DecimalType.of(22, 2))
+    );
+
+    Record record = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+    record.put("decimalAsInt32", new BigDecimal("2.55"));
+    record.put("decimalAsInt64", new BigDecimal("4.75"));
+    record.put("decimalAsFixed", new BigDecimal("5.80"));
+
+    File parquetFile = writeRecords(schema, record);
+
+    Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+    Assert.assertEquals(1L, (long) metrics.recordCount());
+    assertCounts(1, 1L, 0L, metrics);
+    assertBounds(1, DecimalType.of(4, 2), new BigDecimal("2.55"), new BigDecimal("2.55"), metrics);
+    assertCounts(2, 1L, 0L, metrics);
+    assertBounds(2, DecimalType.of(14, 2), new BigDecimal("4.75"), new BigDecimal("4.75"), metrics);
+    assertCounts(3, 1L, 0L, metrics);
+    assertBounds(3, DecimalType.of(22, 2), new BigDecimal("5.80"), new BigDecimal("5.80"), metrics);
+  }
+
+  @Test
+  public void testMetricsForListAndMapElements() throws IOException {
+    StructType structType = StructType.of(
+        required(1, "leafIntCol", IntegerType.get()),
+        optional(2, "leafStringCol", StringType.get())
+    );
+    Schema schema = new Schema(
+        optional(3, "intListCol", ListType.ofRequired(4, IntegerType.get())),
+        optional(5, "mapCol", MapType.ofRequired(6, 7, StringType.get(), structType))
+    );
+
+    Record record = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+    record.put("intListCol", Lists.newArrayList(10, 11, 12));
+    Record struct = new Record(AvroSchemaUtil.convert(structType));
+    struct.put("leafIntCol", 1);
+    struct.put("leafStringCol", "BBB");
+    Map<String, Record> map = Maps.newHashMap();
+    map.put("4", struct);
+    record.put(1, map);
+
+    File parquetFile = writeRecords(schema, record);
+
+    Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+    Assert.assertEquals(1L, (long) metrics.recordCount());
+    assertCounts(1, 1, 0, metrics);
+    assertBounds(1, IntegerType.get(), null, null, metrics);
+    assertCounts(2, 1, 0, metrics);
+    assertBounds(2, StringType.get(), null, null, metrics);
+    assertCounts(4, 3, 0, metrics);
+    assertBounds(4, IntegerType.get(), null, null, metrics);
+    assertCounts(6, 1, 0, metrics);
+    assertBounds(6, StringType.get(), null, null, metrics);
+  }
+
+  @Test
+  public void testMetricsForNullColumns() throws IOException {
+    Schema schema = new Schema(
+        optional(1, "intCol", IntegerType.get())
+    );
+    Record firstRecord = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+    firstRecord.put("intCol", null);
+    Record secondRecord = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+    secondRecord.put("intCol", null);
+
+    File parquetFile = writeRecords(schema, firstRecord, secondRecord);
+
+    Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+    Assert.assertEquals(2L, (long) metrics.recordCount());
+    assertCounts(1, 2, 2, metrics);
+    assertBounds(1, IntegerType.get(), null, null, metrics);
+  }
+
+  private void assertCounts(int fieldId, long valueCount, long nullValueCount, Metrics metrics) {
+    Map<Integer, Long> valueCounts = metrics.valueCounts();
+    Map<Integer, Long> nullValueCounts = metrics.nullValueCounts();
+    Assert.assertEquals(valueCount, (long) valueCounts.get(fieldId));
+    Assert.assertEquals(nullValueCount, (long) nullValueCounts.get(fieldId));
+  }
+
+  private <T> void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) {
+    Map<Integer, ByteBuffer> lowerBounds = metrics.lowerBounds();
+    Map<Integer, ByteBuffer> upperBounds = metrics.upperBounds();
+
+    Assert.assertEquals(
+        lowerBound,
+        lowerBounds.containsKey(fieldId) ? fromByteBuffer(type, lowerBounds.get(fieldId)) : null);
+    Assert.assertEquals(
+        upperBound,
+        upperBounds.containsKey(fieldId) ? fromByteBuffer(type, upperBounds.get(fieldId)) : null);
+  }
+
+  private File writeRecords(Schema schema, Record... records) throws IOException {
+    File tmpFolder = temp.newFolder("parquet");
+    String filename = UUID.randomUUID().toString();
+    File file = new File(tmpFolder, FileFormat.PARQUET.addExtension(filename));
+    try (FileAppender<Record> writer = Parquet.write(localOutput(file))
+        .schema(schema)
+        .build()) {
+      writer.addAll(Lists.newArrayList(records));
+    }
+    return file;
+  }
+}