You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/18 22:26:36 UTC
[incubator-iceberg] branch master updated: Fix collection of bounds
for small decimals in ParquetMetrics (#131)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7169e21 Fix collection of bounds for small decimals in ParquetMetrics (#131)
7169e21 is described below
commit 7169e21fc4948ee079191a23dc7a4ce271d1224d
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Mar 18 22:26:31 2019 +0000
Fix collection of bounds for small decimals in ParquetMetrics (#131)
---
.../iceberg/parquet/ParquetConversions.java | 66 +++--
.../netflix/iceberg/parquet/ParquetMetrics.java | 4 +-
.../iceberg/parquet/TestParquetMetrics.java | 266 +++++++++++++++++++++
3 files changed, 299 insertions(+), 37 deletions(-)
diff --git a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetConversions.java b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetConversions.java
index e7a8024..b7d4d2d 100644
--- a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetConversions.java
+++ b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetConversions.java
@@ -21,14 +21,12 @@ package com.netflix.iceberg.parquet;
import com.netflix.iceberg.expressions.Literal;
import com.netflix.iceberg.types.Type;
-import com.netflix.iceberg.types.Types;
import org.apache.commons.io.Charsets;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveType;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.util.UUID;
import java.util.function.Function;
@@ -36,39 +34,37 @@ class ParquetConversions {
private ParquetConversions() {
}
- static <T> Literal<T> fromParquetPrimitive(Type type, Object value) {
- if (value instanceof Boolean) {
- return Literal.of((Boolean) value).to(type);
- } else if (value instanceof Integer) {
- return Literal.of((Integer) value).to(type);
- } else if (value instanceof Long) {
- return Literal.of((Long) value).to(type);
- } else if (value instanceof Float) {
- return Literal.of((Float) value).to(type);
- } else if (value instanceof Double) {
- return Literal.of((Double) value).to(type);
- } else if (value instanceof Binary) {
- switch (type.typeId()) {
- case STRING:
- return Literal.of(Charsets.UTF_8.decode(((Binary) value).toByteBuffer())).to(type);
- case UUID:
- ByteBuffer buffer = ((Binary) value).toByteBuffer().order(ByteOrder.BIG_ENDIAN);
- long mostSigBits = buffer.getLong();
- long leastSigBits = buffer.getLong();
- return Literal.of(new UUID(mostSigBits, leastSigBits)).to(type);
- case FIXED:
- case BINARY:
- return Literal.of(((Binary) value).toByteBuffer()).to(type);
- case DECIMAL:
- Types.DecimalType decimal = (Types.DecimalType) type;
- return Literal.of(
- new BigDecimal(new BigInteger(((Binary) value).getBytes()), decimal.scale())
- ).to(type);
- default:
- throw new IllegalArgumentException("Unsupported primitive type: " + type);
- }
- } else {
- throw new IllegalArgumentException("Unsupported primitive value: " + value);
+ @SuppressWarnings("unchecked")
+ static <T> Literal<T> fromParquetPrimitive(Type type, PrimitiveType parquetType, Object value) {
+ switch (type.typeId()) {
+ case BOOLEAN:
+ return (Literal<T>) Literal.of((Boolean) value);
+ case INTEGER:
+ case DATE:
+ return (Literal<T>) Literal.of((Integer) value);
+ case LONG:
+ case TIME:
+ case TIMESTAMP:
+ return (Literal<T>) Literal.of((Long) value);
+ case FLOAT:
+ return (Literal<T>) Literal.of((Float) value);
+ case DOUBLE:
+ return (Literal<T>) Literal.of((Double) value);
+ case STRING:
+ Function<Object, Object> stringConversion = converterFromParquet(parquetType);
+ return (Literal<T>) Literal.of((CharSequence) stringConversion.apply(value));
+ case UUID:
+ Function<Object, Object> uuidConversion = converterFromParquet(parquetType);
+ return (Literal<T>) Literal.of((UUID) uuidConversion.apply(value));
+ case FIXED:
+ case BINARY:
+ Function<Object, Object> binaryConversion = converterFromParquet(parquetType);
+ return (Literal<T>) Literal.of((ByteBuffer) binaryConversion.apply(value));
+ case DECIMAL:
+ Function<Object, Object> decimalConversion = converterFromParquet(parquetType);
+ return (Literal<T>) Literal.of((BigDecimal) decimalConversion.apply(value));
+ default:
+ throw new IllegalArgumentException("Unsupported primitive type: " + type);
}
}
diff --git a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java
index 850588b..17ec12d 100644
--- a/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java
+++ b/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetrics.java
@@ -86,9 +86,9 @@ public class ParquetMetrics implements Serializable {
Types.NestedField field = fileSchema.asStruct().field(fieldId);
if (field != null && stats.hasNonNullValue()) {
updateMin(lowerBounds, fieldId,
- fromParquetPrimitive(field.type(), stats.genericGetMin()));
+ fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMin()));
updateMax(upperBounds, fieldId,
- fromParquetPrimitive(field.type(), stats.genericGetMax()));
+ fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMax()));
}
}
}
diff --git a/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java b/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java
new file mode 100644
index 0000000..1e18ce1
--- /dev/null
+++ b/parquet/src/test/java/com/netflix/iceberg/parquet/TestParquetMetrics.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg.parquet;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.netflix.iceberg.FileFormat;
+import com.netflix.iceberg.Metrics;
+import com.netflix.iceberg.Schema;
+import com.netflix.iceberg.avro.AvroSchemaUtil;
+import com.netflix.iceberg.io.FileAppender;
+import com.netflix.iceberg.types.Type;
+import com.netflix.iceberg.types.Types.BinaryType;
+import com.netflix.iceberg.types.Types.BooleanType;
+import com.netflix.iceberg.types.Types.DateType;
+import com.netflix.iceberg.types.Types.DecimalType;
+import com.netflix.iceberg.types.Types.DoubleType;
+import com.netflix.iceberg.types.Types.FixedType;
+import com.netflix.iceberg.types.Types.FloatType;
+import com.netflix.iceberg.types.Types.IntegerType;
+import com.netflix.iceberg.types.Types.ListType;
+import com.netflix.iceberg.types.Types.LongType;
+import com.netflix.iceberg.types.Types.MapType;
+import com.netflix.iceberg.types.Types.StringType;
+import com.netflix.iceberg.types.Types.StructType;
+import com.netflix.iceberg.types.Types.TimeType;
+import com.netflix.iceberg.types.Types.TimestampType;
+import com.netflix.iceberg.types.Types.UUIDType;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.commons.io.Charsets;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.Map;
+import java.util.UUID;
+
+import static com.netflix.iceberg.Files.localInput;
+import static com.netflix.iceberg.Files.localOutput;
+import static com.netflix.iceberg.types.Conversions.fromByteBuffer;
+import static com.netflix.iceberg.types.Types.NestedField.optional;
+import static com.netflix.iceberg.types.Types.NestedField.required;
+
+public class TestParquetMetrics {
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+ private final UUID uuid = UUID.randomUUID();
+ private final GenericFixed fixed = new GenericData.Fixed(
+ org.apache.avro.Schema.createFixed("fixedCol", null, null, 4),
+ "abcd".getBytes(Charsets.UTF_8));
+
+ @Test
+ public void testMetricsForTopLevelFields() throws IOException {
+ Schema schema = new Schema(
+ optional(1, "booleanCol", BooleanType.get()),
+ required(2, "intCol", IntegerType.get()),
+ optional(3, "longCol", LongType.get()),
+ required(4, "floatCol", FloatType.get()),
+ optional(5, "doubleCol", DoubleType.get()),
+ optional(6, "decimalCol", DecimalType.of(10, 2)),
+ required(7, "stringCol", StringType.get()),
+ optional(8, "dateCol", DateType.get()),
+ required(9, "timeCol", TimeType.get()),
+ required(10, "timestampCol", TimestampType.withoutZone()),
+ optional(11, "uuidCol", UUIDType.get()),
+ required(12, "fixedCol", FixedType.ofLength(4)),
+ required(13, "binaryCol", BinaryType.get())
+ );
+
+ Record firstRecord = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+ firstRecord.put("booleanCol", true);
+ firstRecord.put("intCol", 3);
+ firstRecord.put("longCol", 5L);
+ firstRecord.put("floatCol", 2.0F);
+ firstRecord.put("doubleCol", 2.0D);
+ firstRecord.put("decimalCol", new BigDecimal("3.50"));
+ firstRecord.put("stringCol", "AAA");
+ firstRecord.put("dateCol", 1500);
+ firstRecord.put("timeCol", 2000L);
+ firstRecord.put("timestampCol", 0L);
+ firstRecord.put("uuidCol", uuid);
+ firstRecord.put("fixedCol", fixed);
+ firstRecord.put("binaryCol", "S".getBytes());
+ Record secondRecord = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+ secondRecord.put("booleanCol", false);
+ secondRecord.put("intCol", Integer.MIN_VALUE);
+ secondRecord.put("longCol", null);
+ secondRecord.put("floatCol", 1.0F);
+ secondRecord.put("doubleCol", null);
+ secondRecord.put("decimalCol", null);
+ secondRecord.put("stringCol", "ZZZ");
+ secondRecord.put("dateCol", null);
+ secondRecord.put("timeCol", 3000L);
+ secondRecord.put("timestampCol", 1000L);
+ secondRecord.put("uuidCol", null);
+ secondRecord.put("fixedCol", fixed);
+ secondRecord.put("binaryCol", "W".getBytes());
+
+ File parquetFile = writeRecords(schema, firstRecord, secondRecord);
+
+ Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+ Assert.assertEquals(2L, (long) metrics.recordCount());
+ assertCounts(1, 2L, 0L, metrics);
+ assertBounds(1, BooleanType.get(), false, true, metrics);
+ assertCounts(2, 2L, 0L, metrics);
+ assertBounds(2, IntegerType.get(), Integer.MIN_VALUE, 3, metrics);
+ assertCounts(3, 2L, 1L, metrics);
+ assertBounds(3, LongType.get(), 5L, 5L, metrics);
+ assertCounts(4, 2L, 0L, metrics);
+ assertBounds(4, FloatType.get(), 1.0F, 2.0F, metrics);
+ assertCounts(5, 2L, 1L, metrics);
+ assertBounds(5, DoubleType.get(), 2.0D, 2.0D, metrics);
+ assertCounts(6, 2L, 1L, metrics);
+ assertBounds(6, DecimalType.of(10, 2), new BigDecimal("3.50"), new BigDecimal("3.50"), metrics);
+ assertCounts(7, 2L, 0L, metrics);
+ assertBounds(7, StringType.get(), CharBuffer.wrap("AAA"), CharBuffer.wrap("ZZZ"), metrics);
+ assertCounts(8, 2L, 1L, metrics);
+ assertBounds(8, DateType.get(), 1500, 1500, metrics);
+ assertCounts(9, 2L, 0L, metrics);
+ assertBounds(9, TimeType.get(), 2000L, 3000L, metrics);
+ assertCounts(10, 2L, 0L, metrics);
+ assertBounds(10, TimestampType.withoutZone(), 0L, 1000L, metrics);
+ // TODO: enable once issue#126 is resolved
+ // assertCounts(11, 2L, 1L, metrics);
+ // assertBounds(11, UUIDType.get(), uuid, uuid, metrics);
+ assertCounts(12, 2L, 0L, metrics);
+ assertBounds(12, FixedType.ofLength(4),
+ ByteBuffer.wrap(fixed.bytes()), ByteBuffer.wrap(fixed.bytes()), metrics);
+ assertCounts(13, 2L, 0L, metrics);
+ assertBounds(13, FixedType.ofLength(4),
+ ByteBuffer.wrap("S".getBytes()), ByteBuffer.wrap("W".getBytes()), metrics);
+ }
+
+ @Test
+ public void testMetricsForDecimals() throws IOException {
+ Schema schema = new Schema(
+ required(1, "decimalAsInt32", DecimalType.of(4, 2)),
+ required(2, "decimalAsInt64", DecimalType.of(14, 2)),
+ required(3, "decimalAsFixed", DecimalType.of(22, 2))
+ );
+
+ Record record = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+ record.put("decimalAsInt32", new BigDecimal("2.55"));
+ record.put("decimalAsInt64", new BigDecimal("4.75"));
+ record.put("decimalAsFixed", new BigDecimal("5.80"));
+
+ File parquetFile = writeRecords(schema, record);
+
+ Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+ Assert.assertEquals(1L, (long) metrics.recordCount());
+ assertCounts(1, 1L, 0L, metrics);
+ assertBounds(1, DecimalType.of(4, 2), new BigDecimal("2.55"), new BigDecimal("2.55"), metrics);
+ assertCounts(2, 1L, 0L, metrics);
+ assertBounds(2, DecimalType.of(14, 2), new BigDecimal("4.75"), new BigDecimal("4.75"), metrics);
+ assertCounts(3, 1L, 0L, metrics);
+ assertBounds(3, DecimalType.of(22, 2), new BigDecimal("5.80"), new BigDecimal("5.80"), metrics);
+ }
+
+ @Test
+ public void testMetricsForListAndMapElements() throws IOException {
+ StructType structType = StructType.of(
+ required(1, "leafIntCol", IntegerType.get()),
+ optional(2, "leafStringCol", StringType.get())
+ );
+ Schema schema = new Schema(
+ optional(3, "intListCol", ListType.ofRequired(4, IntegerType.get())),
+ optional(5, "mapCol", MapType.ofRequired(6, 7, StringType.get(), structType))
+ );
+
+ Record record = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+ record.put("intListCol", Lists.newArrayList(10, 11, 12));
+ Record struct = new Record(AvroSchemaUtil.convert(structType));
+ struct.put("leafIntCol", 1);
+ struct.put("leafStringCol", "BBB");
+ Map<String, Record> map = Maps.newHashMap();
+ map.put("4", struct);
+ record.put(1, map);
+
+ File parquetFile = writeRecords(schema, record);
+
+ Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+ Assert.assertEquals(1L, (long) metrics.recordCount());
+ assertCounts(1, 1, 0, metrics);
+ assertBounds(1, IntegerType.get(), null, null, metrics);
+ assertCounts(2, 1, 0, metrics);
+ assertBounds(2, StringType.get(), null, null, metrics);
+ assertCounts(4, 3, 0, metrics);
+ assertBounds(4, IntegerType.get(), null, null, metrics);
+ assertCounts(6, 1, 0, metrics);
+ assertBounds(6, StringType.get(), null, null, metrics);
+ }
+
+ @Test
+ public void testMetricsForNullColumns() throws IOException {
+ Schema schema = new Schema(
+ optional(1, "intCol", IntegerType.get())
+ );
+ Record firstRecord = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+ firstRecord.put("intCol", null);
+ Record secondRecord = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+ secondRecord.put("intCol", null);
+
+ File parquetFile = writeRecords(schema, firstRecord, secondRecord);
+
+ Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+ Assert.assertEquals(2L, (long) metrics.recordCount());
+ assertCounts(1, 2, 2, metrics);
+ assertBounds(1, IntegerType.get(), null, null, metrics);
+ }
+
+ private void assertCounts(int fieldId, long valueCount, long nullValueCount, Metrics metrics) {
+ Map<Integer, Long> valueCounts = metrics.valueCounts();
+ Map<Integer, Long> nullValueCounts = metrics.nullValueCounts();
+ Assert.assertEquals(valueCount, (long) valueCounts.get(fieldId));
+ Assert.assertEquals(nullValueCount, (long) nullValueCounts.get(fieldId));
+ }
+
+ private <T> void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) {
+ Map<Integer, ByteBuffer> lowerBounds = metrics.lowerBounds();
+ Map<Integer, ByteBuffer> upperBounds = metrics.upperBounds();
+
+ Assert.assertEquals(
+ lowerBound,
+ lowerBounds.containsKey(fieldId) ? fromByteBuffer(type, lowerBounds.get(fieldId)) : null);
+ Assert.assertEquals(
+ upperBound,
+ upperBounds.containsKey(fieldId) ? fromByteBuffer(type, upperBounds.get(fieldId)) : null);
+ }
+
+ private File writeRecords(Schema schema, Record... records) throws IOException {
+ File tmpFolder = temp.newFolder("parquet");
+ String filename = UUID.randomUUID().toString();
+ File file = new File(tmpFolder, FileFormat.PARQUET.addExtension(filename));
+ try (FileAppender<Record> writer = Parquet.write(localOutput(file))
+ .schema(schema)
+ .build()) {
+ writer.addAll(Lists.newArrayList(records));
+ }
+ return file;
+ }
+}