You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/13 17:24:00 UTC
[incubator-iceberg] branch master updated: Test metrics in files
with multiple row groups (#365)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a736453 Test metrics in files with multiple row groups (#365)
a736453 is described below
commit a7364536c71d9e1f794f47b72eded64b140f4f0d
Author: manishmalhotrawork <ma...@gmail.com>
AuthorDate: Tue Aug 13 10:23:55 2019 -0700
Test metrics in files with multiple row groups (#365)
---
.../test/java/org/apache/iceberg/TestMetrics.java | 176 ++++++++++++++++-----
.../iceberg/parquet/ParquetWritingTestUtils.java | 4 +
.../apache/iceberg/parquet/TestParquetMetrics.java | 16 ++
3 files changed, 160 insertions(+), 36 deletions(-)
diff --git a/core/src/test/java/org/apache/iceberg/TestMetrics.java b/core/src/test/java/org/apache/iceberg/TestMetrics.java
index 379d297..f7decb9 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetrics.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetrics.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.File;
@@ -27,6 +28,8 @@ import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.avro.generic.GenericData;
@@ -35,6 +38,7 @@ import org.apache.avro.generic.GenericFixed;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.BinaryType;
import org.apache.iceberg.types.Types.BooleanType;
import org.apache.iceberg.types.Types.DateType;
@@ -54,15 +58,48 @@ import org.apache.iceberg.types.Types.UUIDType;
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.iceberg.Files.localInput;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.types.Conversions.fromByteBuffer;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
-
/**
* Tests for Metrics.
*/
public abstract class TestMetrics {
+ public static final int ROW_GROUP_SIZE = 1600;
+ private static final StructType LEAF_STRUCT_TYPE = StructType.of(
+ optional(5, "leafLongCol", LongType.get()),
+ optional(6, "leafBinaryCol", BinaryType.get())
+ );
+
+ private static final StructType NESTED_STRUCT_TYPE = StructType.of(
+ required(3, "longCol", LongType.get()),
+ required(4, "leafStructCol", LEAF_STRUCT_TYPE)
+ );
+
+ private static final Schema NESTED_SCHEMA = new Schema(
+ required(1, "intCol", IntegerType.get()),
+ required(2, "nestedStructCol", NESTED_STRUCT_TYPE)
+ );
+
+ private static final Schema SIMPLE_SCHEMA = new Schema(
+ optional(1, "booleanCol", BooleanType.get()),
+ required(2, "intCol", IntegerType.get()),
+ optional(3, "longCol", LongType.get()),
+ required(4, "floatCol", FloatType.get()),
+ optional(5, "doubleCol", DoubleType.get()),
+ optional(6, "decimalCol", DecimalType.of(10, 2)),
+ required(7, "stringCol", StringType.get()),
+ optional(8, "dateCol", DateType.get()),
+ required(9, "timeCol", TimeType.get()),
+ required(10, "timestampCol", TimestampType.withoutZone()),
+ optional(11, "uuidCol", UUIDType.get()),
+ required(12, "fixedCol", FixedType.ofLength(4)),
+ required(13, "binaryCol", BinaryType.get())
+ );
+
private final UUID uuid = UUID.randomUUID();
private final GenericFixed fixed = new GenericData.Fixed(
org.apache.avro.Schema.createFixed("fixedCol", null, null, 4),
@@ -72,25 +109,14 @@ public abstract class TestMetrics {
public abstract File writeRecords(Schema schema, Record... records) throws IOException;
+ public abstract File writeRecords(Schema schema, Map<String, String> properties, GenericData.Record... records)
+ throws IOException;
+
+ public abstract int splitCount(File parquetFile) throws IOException;
+
@Test
public void testMetricsForTopLevelFields() throws IOException {
- Schema schema = new Schema(
- optional(1, "booleanCol", BooleanType.get()),
- required(2, "intCol", IntegerType.get()),
- optional(3, "longCol", LongType.get()),
- required(4, "floatCol", FloatType.get()),
- optional(5, "doubleCol", DoubleType.get()),
- optional(6, "decimalCol", DecimalType.of(10, 2)),
- required(7, "stringCol", StringType.get()),
- optional(8, "dateCol", DateType.get()),
- required(9, "timeCol", TimeType.get()),
- required(10, "timestampCol", TimestampType.withoutZone()),
- optional(11, "uuidCol", UUIDType.get()),
- required(12, "fixedCol", FixedType.ofLength(4)),
- required(13, "binaryCol", BinaryType.get())
- );
-
- Record firstRecord = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+ Record firstRecord = new Record(AvroSchemaUtil.convert(SIMPLE_SCHEMA.asStruct()));
firstRecord.put("booleanCol", true);
firstRecord.put("intCol", 3);
firstRecord.put("longCol", 5L);
@@ -104,7 +130,7 @@ public abstract class TestMetrics {
firstRecord.put("uuidCol", uuid);
firstRecord.put("fixedCol", fixed);
firstRecord.put("binaryCol", "S".getBytes());
- Record secondRecord = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+ Record secondRecord = new Record(AvroSchemaUtil.convert(SIMPLE_SCHEMA.asStruct()));
secondRecord.put("booleanCol", false);
secondRecord.put("intCol", Integer.MIN_VALUE);
secondRecord.put("longCol", null);
@@ -119,7 +145,7 @@ public abstract class TestMetrics {
secondRecord.put("fixedCol", fixed);
secondRecord.put("binaryCol", "W".getBytes());
- File recordsFile = writeRecords(schema, firstRecord, secondRecord);
+ File recordsFile = writeRecords(SIMPLE_SCHEMA, firstRecord, secondRecord);
Metrics metrics = getMetrics(Files.localInput(recordsFile));
Assert.assertEquals(2L, (long) metrics.recordCount());
@@ -180,30 +206,18 @@ public abstract class TestMetrics {
@Test
public void testMetricsForNestedStructFields() throws IOException {
- StructType leafStructType = StructType.of(
- optional(5, "leafLongCol", LongType.get()),
- optional(6, "leafBinaryCol", BinaryType.get())
- );
- StructType nestedStructType = StructType.of(
- required(3, "longCol", LongType.get()),
- required(4, "leafStructCol", leafStructType)
- );
- Schema schema = new Schema(
- required(1, "intCol", IntegerType.get()),
- required(2, "nestedStructCol", nestedStructType)
- );
- Record leafStruct = new Record(AvroSchemaUtil.convert(leafStructType));
+ Record leafStruct = new Record(AvroSchemaUtil.convert(LEAF_STRUCT_TYPE));
leafStruct.put("leafLongCol", 20L);
leafStruct.put("leafBinaryCol", "A".getBytes());
- Record nestedStruct = new Record(AvroSchemaUtil.convert(nestedStructType));
+ Record nestedStruct = new Record(AvroSchemaUtil.convert(NESTED_STRUCT_TYPE));
nestedStruct.put("longCol", 100L);
nestedStruct.put("leafStructCol", leafStruct);
- Record record = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+ Record record = new Record(AvroSchemaUtil.convert(NESTED_SCHEMA.asStruct()));
record.put("intCol", Integer.MAX_VALUE);
record.put("nestedStructCol", nestedStruct);
- File recordsFile = writeRecords(schema, record);
+ File recordsFile = writeRecords(NESTED_SCHEMA, record);
Metrics metrics = getMetrics(Files.localInput(recordsFile));
Assert.assertEquals(1L, (long) metrics.recordCount());
@@ -270,6 +284,96 @@ public abstract class TestMetrics {
assertBounds(1, IntegerType.get(), null, null, metrics);
}
+ @Test
+ public void testMetricsForTopLevelWithMultipleRowGroup() throws Exception {
+ int recordCount = 201;
+ List<GenericData.Record> records = new ArrayList<>(recordCount);
+
+ for (int i = 0; i < recordCount; i++) {
+ GenericData.Record newRecord = new GenericData.Record(AvroSchemaUtil.convert(SIMPLE_SCHEMA.asStruct()));
+ newRecord.put("booleanCol", i == 0 ? false : true);
+ newRecord.put("intCol", i + 1);
+ newRecord.put("longCol", i == 0 ? null : i + 1L);
+ newRecord.put("floatCol", i + 1.0F);
+ newRecord.put("doubleCol", i == 0 ? null : i + 1.0D);
+ newRecord.put("decimalCol", i == 0 ? null : new BigDecimal(i + "").add(new BigDecimal("1.00")));
+ newRecord.put("stringCol", "AAA");
+ newRecord.put("dateCol", i + 1);
+ newRecord.put("timeCol", i + 1L);
+ newRecord.put("timestampCol", i + 1L);
+ newRecord.put("uuidCol", uuid);
+ newRecord.put("fixedCol", fixed);
+ newRecord.put("binaryCol", "S".getBytes());
+ records.add(newRecord);
+ }
+
+ // create parquet file with multiple row groups. by using smaller number of bytes
+ File parquetFile = writeRecords(
+ SIMPLE_SCHEMA,
+ ImmutableMap.of(PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(ROW_GROUP_SIZE)),
+ records.toArray(new GenericData.Record[] {}));
+
+ Assert.assertNotNull(parquetFile);
+ // rowgroup size should be > 1
+ Assert.assertEquals(3, splitCount(parquetFile));
+
+ Metrics metrics = getMetrics(localInput(parquetFile));
+ Assert.assertEquals(201L, (long) metrics.recordCount());
+ assertCounts(1, 201L, 0L, metrics);
+ assertBounds(1, Types.BooleanType.get(), false, true, metrics);
+ assertBounds(2, Types.IntegerType.get(), 1, 201, metrics);
+ assertCounts(3, 201L, 1L, metrics);
+ assertBounds(3, Types.LongType.get(), 2L, 201L, metrics);
+ assertCounts(4, 201L, 0L, metrics);
+ assertBounds(4, Types.FloatType.get(), 1.0F, 201.0F, metrics);
+ assertCounts(5, 201L, 1L, metrics);
+ assertBounds(5, Types.DoubleType.get(), 2.0D, 201.0D, metrics);
+ assertCounts(6, 201L, 1L, metrics);
+ assertBounds(6, Types.DecimalType.of(10, 2), new BigDecimal("2.00"),
+ new BigDecimal("201.00"), metrics);
+ }
+
+ @Test
+ public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOException {
+ int recordCount = 201;
+ List<Record> records = new ArrayList(recordCount);
+
+ for (int i = 0; i < recordCount; i++) {
+ Record newLeafStruct = new Record(AvroSchemaUtil.convert(LEAF_STRUCT_TYPE));
+ newLeafStruct.put("leafLongCol", i + 1L);
+ newLeafStruct.put("leafBinaryCol", "A".getBytes());
+ Record newNestedStruct = new Record(AvroSchemaUtil.convert(NESTED_STRUCT_TYPE));
+ newNestedStruct.put("longCol", i + 1L);
+ newNestedStruct.put("leafStructCol", newLeafStruct);
+ Record newRecord = new Record(AvroSchemaUtil.convert(NESTED_SCHEMA.asStruct()));
+ newRecord.put("intCol", i + 1);
+ newRecord.put("nestedStructCol", newNestedStruct);
+ records.add(newRecord);
+ }
+
+ // create parquet file with multiple row groups. by using smaller number of bytes
+ File parquetFile = writeRecords(
+ NESTED_SCHEMA,
+ ImmutableMap.of(PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(ROW_GROUP_SIZE)),
+ records.toArray(new GenericData.Record[] {}));
+
+ Assert.assertNotNull(parquetFile);
+ // rowgroup size should be > 1
+ Assert.assertEquals(3, splitCount(parquetFile));
+
+ Metrics metrics = getMetrics(localInput(parquetFile));
+ Assert.assertEquals(201L, (long) metrics.recordCount());
+ assertCounts(1, 201L, 0L, metrics);
+ assertBounds(1, IntegerType.get(), 1, 201, metrics);
+ assertCounts(3, 201L, 0L, metrics);
+ assertBounds(3, LongType.get(), 1L, 201L, metrics);
+ assertCounts(5, 201L, 0L, metrics);
+ assertBounds(5, LongType.get(), 1L, 201L, metrics);
+ assertCounts(6, 201L, 0L, metrics);
+ assertBounds(6, BinaryType.get(),
+ ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics);
+ }
+
private void assertCounts(int fieldId, long valueCount, long nullValueCount, Metrics metrics) {
Map<Integer, Long> valueCounts = metrics.valueCounts();
Map<Integer, Long> nullValueCounts = metrics.nullValueCounts();
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
index f42bdbd..e964ea9 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
@@ -46,6 +46,10 @@ class ParquetWritingTestUtils {
return writeRecords(temp, schema, Collections.emptyMap(), null, records);
}
+ static File writeRecords(TemporaryFolder temp, Schema schema, Map<String, String> properties, GenericData.Record... records) throws IOException {
+ return writeRecords(temp, schema, properties, null, records);
+ }
+
static File writeRecords(
TemporaryFolder temp,
Schema schema, Map<String, String> properties,
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
index a838c92..fb68404 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
@@ -21,14 +21,18 @@ package org.apache.iceberg.parquet;
import java.io.File;
import java.io.IOException;
+import java.util.Map;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TestMetrics;
import org.apache.iceberg.io.InputFile;
+import org.apache.parquet.hadoop.ParquetFileReader;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
+import static org.apache.iceberg.Files.localInput;
+
/**
* Test Metrics for Parquet.
*/
@@ -46,4 +50,16 @@ public class TestParquetMetrics extends TestMetrics {
public File writeRecords(Schema schema, GenericData.Record... records) throws IOException {
return ParquetWritingTestUtils.writeRecords(temp, schema, records);
}
+
+ @Override
+ public File writeRecords(Schema schema, Map<String, String> properties, GenericData.Record... records) throws IOException {
+ return ParquetWritingTestUtils.writeRecords(temp, schema, properties, records);
+ }
+
+ @Override
+ public int splitCount(File parquetFile) throws IOException {
+ try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) {
+ return reader.getRowGroups().size();
+ }
+ }
}