You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/13 17:24:00 UTC

[incubator-iceberg] branch master updated: Test metrics in files with multiple row groups (#365)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a736453  Test metrics in files with multiple row groups (#365)
a736453 is described below

commit a7364536c71d9e1f794f47b72eded64b140f4f0d
Author: manishmalhotrawork <ma...@gmail.com>
AuthorDate: Tue Aug 13 10:23:55 2019 -0700

    Test metrics in files with multiple row groups (#365)
---
 .../test/java/org/apache/iceberg/TestMetrics.java  | 176 ++++++++++++++++-----
 .../iceberg/parquet/ParquetWritingTestUtils.java   |   4 +
 .../apache/iceberg/parquet/TestParquetMetrics.java |  16 ++
 3 files changed, 160 insertions(+), 36 deletions(-)

diff --git a/core/src/test/java/org/apache/iceberg/TestMetrics.java b/core/src/test/java/org/apache/iceberg/TestMetrics.java
index 379d297..f7decb9 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetrics.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetrics.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.File;
@@ -27,6 +28,8 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.avro.generic.GenericData;
@@ -35,6 +38,7 @@ import org.apache.avro.generic.GenericFixed;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.BinaryType;
 import org.apache.iceberg.types.Types.BooleanType;
 import org.apache.iceberg.types.Types.DateType;
@@ -54,15 +58,48 @@ import org.apache.iceberg.types.Types.UUIDType;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.iceberg.Files.localInput;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
 import static org.apache.iceberg.types.Conversions.fromByteBuffer;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
-
 /**
  * Tests for Metrics.
  */
 public abstract class TestMetrics {
 
+  public static final int ROW_GROUP_SIZE = 1600;
+  private static final StructType LEAF_STRUCT_TYPE = StructType.of(
+      optional(5, "leafLongCol", LongType.get()),
+      optional(6, "leafBinaryCol", BinaryType.get())
+  );
+
+  private static final StructType NESTED_STRUCT_TYPE = StructType.of(
+      required(3, "longCol", LongType.get()),
+      required(4, "leafStructCol", LEAF_STRUCT_TYPE)
+  );
+
+  private static final Schema NESTED_SCHEMA = new Schema(
+      required(1, "intCol", IntegerType.get()),
+      required(2, "nestedStructCol", NESTED_STRUCT_TYPE)
+  );
+
+  private static final Schema SIMPLE_SCHEMA = new Schema(
+      optional(1, "booleanCol", BooleanType.get()),
+      required(2, "intCol", IntegerType.get()),
+      optional(3, "longCol", LongType.get()),
+      required(4, "floatCol", FloatType.get()),
+      optional(5, "doubleCol", DoubleType.get()),
+      optional(6, "decimalCol", DecimalType.of(10, 2)),
+      required(7, "stringCol", StringType.get()),
+      optional(8, "dateCol", DateType.get()),
+      required(9, "timeCol", TimeType.get()),
+      required(10, "timestampCol", TimestampType.withoutZone()),
+      optional(11, "uuidCol", UUIDType.get()),
+      required(12, "fixedCol", FixedType.ofLength(4)),
+      required(13, "binaryCol", BinaryType.get())
+  );
+
   private final UUID uuid = UUID.randomUUID();
   private final GenericFixed fixed = new GenericData.Fixed(
       org.apache.avro.Schema.createFixed("fixedCol", null, null, 4),
@@ -72,25 +109,14 @@ public abstract class TestMetrics {
 
   public abstract File writeRecords(Schema schema, Record... records) throws IOException;
 
+  public abstract File writeRecords(Schema schema, Map<String, String> properties, GenericData.Record... records)
+      throws IOException;
+
+  public abstract int splitCount(File parquetFile) throws IOException;
+
   @Test
   public void testMetricsForTopLevelFields() throws IOException {
-    Schema schema = new Schema(
-        optional(1, "booleanCol", BooleanType.get()),
-        required(2, "intCol", IntegerType.get()),
-        optional(3, "longCol", LongType.get()),
-        required(4, "floatCol", FloatType.get()),
-        optional(5, "doubleCol", DoubleType.get()),
-        optional(6, "decimalCol", DecimalType.of(10, 2)),
-        required(7, "stringCol", StringType.get()),
-        optional(8, "dateCol", DateType.get()),
-        required(9, "timeCol", TimeType.get()),
-        required(10, "timestampCol", TimestampType.withoutZone()),
-        optional(11, "uuidCol", UUIDType.get()),
-        required(12, "fixedCol", FixedType.ofLength(4)),
-        required(13, "binaryCol", BinaryType.get())
-    );
-
-    Record firstRecord = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+    Record firstRecord = new Record(AvroSchemaUtil.convert(SIMPLE_SCHEMA.asStruct()));
     firstRecord.put("booleanCol", true);
     firstRecord.put("intCol", 3);
     firstRecord.put("longCol", 5L);
@@ -104,7 +130,7 @@ public abstract class TestMetrics {
     firstRecord.put("uuidCol", uuid);
     firstRecord.put("fixedCol", fixed);
     firstRecord.put("binaryCol", "S".getBytes());
-    Record secondRecord = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+    Record secondRecord = new Record(AvroSchemaUtil.convert(SIMPLE_SCHEMA.asStruct()));
     secondRecord.put("booleanCol", false);
     secondRecord.put("intCol", Integer.MIN_VALUE);
     secondRecord.put("longCol", null);
@@ -119,7 +145,7 @@ public abstract class TestMetrics {
     secondRecord.put("fixedCol", fixed);
     secondRecord.put("binaryCol", "W".getBytes());
 
-    File recordsFile = writeRecords(schema, firstRecord, secondRecord);
+    File recordsFile = writeRecords(SIMPLE_SCHEMA, firstRecord, secondRecord);
 
     Metrics metrics = getMetrics(Files.localInput(recordsFile));
     Assert.assertEquals(2L, (long) metrics.recordCount());
@@ -180,30 +206,18 @@ public abstract class TestMetrics {
 
   @Test
   public void testMetricsForNestedStructFields() throws IOException {
-    StructType leafStructType = StructType.of(
-        optional(5, "leafLongCol", LongType.get()),
-        optional(6, "leafBinaryCol", BinaryType.get())
-    );
-    StructType nestedStructType = StructType.of(
-        required(3, "longCol", LongType.get()),
-        required(4, "leafStructCol", leafStructType)
-    );
-    Schema schema = new Schema(
-        required(1, "intCol", IntegerType.get()),
-        required(2, "nestedStructCol", nestedStructType)
-    );
 
-    Record leafStruct = new Record(AvroSchemaUtil.convert(leafStructType));
+    Record leafStruct = new Record(AvroSchemaUtil.convert(LEAF_STRUCT_TYPE));
     leafStruct.put("leafLongCol", 20L);
     leafStruct.put("leafBinaryCol", "A".getBytes());
-    Record nestedStruct = new Record(AvroSchemaUtil.convert(nestedStructType));
+    Record nestedStruct = new Record(AvroSchemaUtil.convert(NESTED_STRUCT_TYPE));
     nestedStruct.put("longCol", 100L);
     nestedStruct.put("leafStructCol", leafStruct);
-    Record record = new Record(AvroSchemaUtil.convert(schema.asStruct()));
+    Record record = new Record(AvroSchemaUtil.convert(NESTED_SCHEMA.asStruct()));
     record.put("intCol", Integer.MAX_VALUE);
     record.put("nestedStructCol", nestedStruct);
 
-    File recordsFile = writeRecords(schema, record);
+    File recordsFile = writeRecords(NESTED_SCHEMA, record);
 
     Metrics metrics = getMetrics(Files.localInput(recordsFile));
     Assert.assertEquals(1L, (long) metrics.recordCount());
@@ -270,6 +284,96 @@ public abstract class TestMetrics {
     assertBounds(1, IntegerType.get(), null, null, metrics);
   }
 
+  @Test
+  public void testMetricsForTopLevelWithMultipleRowGroup() throws Exception {
+    int recordCount = 201;
+    List<GenericData.Record> records = new ArrayList<>(recordCount);
+
+    for (int i = 0; i < recordCount; i++) {
+      GenericData.Record newRecord = new GenericData.Record(AvroSchemaUtil.convert(SIMPLE_SCHEMA.asStruct()));
+      newRecord.put("booleanCol", i == 0 ? false : true);
+      newRecord.put("intCol", i + 1);
+      newRecord.put("longCol", i == 0 ? null : i + 1L);
+      newRecord.put("floatCol", i + 1.0F);
+      newRecord.put("doubleCol", i == 0 ? null : i + 1.0D);
+      newRecord.put("decimalCol", i == 0 ? null : new BigDecimal(i + "").add(new BigDecimal("1.00")));
+      newRecord.put("stringCol", "AAA");
+      newRecord.put("dateCol", i + 1);
+      newRecord.put("timeCol", i + 1L);
+      newRecord.put("timestampCol", i + 1L);
+      newRecord.put("uuidCol", uuid);
+      newRecord.put("fixedCol", fixed);
+      newRecord.put("binaryCol", "S".getBytes());
+      records.add(newRecord);
+    }
+
+    // create parquet file with multiple row groups. by using smaller number of bytes
+    File parquetFile = writeRecords(
+        SIMPLE_SCHEMA,
+        ImmutableMap.of(PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(ROW_GROUP_SIZE)),
+        records.toArray(new GenericData.Record[] {}));
+
+    Assert.assertNotNull(parquetFile);
+    // rowgroup size should be > 1
+    Assert.assertEquals(3, splitCount(parquetFile));
+
+    Metrics metrics = getMetrics(localInput(parquetFile));
+    Assert.assertEquals(201L, (long) metrics.recordCount());
+    assertCounts(1, 201L, 0L, metrics);
+    assertBounds(1, Types.BooleanType.get(), false, true, metrics);
+    assertBounds(2, Types.IntegerType.get(), 1, 201, metrics);
+    assertCounts(3, 201L, 1L, metrics);
+    assertBounds(3, Types.LongType.get(), 2L, 201L, metrics);
+    assertCounts(4, 201L, 0L, metrics);
+    assertBounds(4, Types.FloatType.get(), 1.0F, 201.0F, metrics);
+    assertCounts(5, 201L, 1L, metrics);
+    assertBounds(5, Types.DoubleType.get(), 2.0D, 201.0D, metrics);
+    assertCounts(6, 201L, 1L, metrics);
+    assertBounds(6, Types.DecimalType.of(10, 2), new BigDecimal("2.00"),
+        new BigDecimal("201.00"), metrics);
+  }
+
+  @Test
+  public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOException {
+    int recordCount = 201;
+    List<Record> records = new ArrayList(recordCount);
+
+    for (int i = 0; i < recordCount; i++) {
+      Record newLeafStruct = new Record(AvroSchemaUtil.convert(LEAF_STRUCT_TYPE));
+      newLeafStruct.put("leafLongCol", i + 1L);
+      newLeafStruct.put("leafBinaryCol", "A".getBytes());
+      Record newNestedStruct = new Record(AvroSchemaUtil.convert(NESTED_STRUCT_TYPE));
+      newNestedStruct.put("longCol", i + 1L);
+      newNestedStruct.put("leafStructCol", newLeafStruct);
+      Record newRecord = new Record(AvroSchemaUtil.convert(NESTED_SCHEMA.asStruct()));
+      newRecord.put("intCol", i + 1);
+      newRecord.put("nestedStructCol", newNestedStruct);
+      records.add(newRecord);
+    }
+
+    // create parquet file with multiple row groups. by using smaller number of bytes
+    File parquetFile = writeRecords(
+        NESTED_SCHEMA,
+        ImmutableMap.of(PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(ROW_GROUP_SIZE)),
+        records.toArray(new GenericData.Record[] {}));
+
+    Assert.assertNotNull(parquetFile);
+    // rowgroup size should be > 1
+    Assert.assertEquals(3, splitCount(parquetFile));
+
+    Metrics metrics = getMetrics(localInput(parquetFile));
+    Assert.assertEquals(201L, (long) metrics.recordCount());
+    assertCounts(1, 201L, 0L, metrics);
+    assertBounds(1, IntegerType.get(), 1, 201, metrics);
+    assertCounts(3, 201L, 0L, metrics);
+    assertBounds(3, LongType.get(), 1L, 201L, metrics);
+    assertCounts(5, 201L, 0L, metrics);
+    assertBounds(5, LongType.get(), 1L, 201L, metrics);
+    assertCounts(6, 201L, 0L, metrics);
+    assertBounds(6, BinaryType.get(),
+        ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics);
+  }
+
   private void assertCounts(int fieldId, long valueCount, long nullValueCount, Metrics metrics) {
     Map<Integer, Long> valueCounts = metrics.valueCounts();
     Map<Integer, Long> nullValueCounts = metrics.nullValueCounts();
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
index f42bdbd..e964ea9 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
@@ -46,6 +46,10 @@ class ParquetWritingTestUtils {
     return writeRecords(temp, schema, Collections.emptyMap(), null, records);
   }
 
+  static File writeRecords(TemporaryFolder temp, Schema schema, Map<String, String> properties, GenericData.Record... records) throws IOException {
+    return writeRecords(temp, schema, properties, null, records);
+  }
+
   static File writeRecords(
       TemporaryFolder temp,
       Schema schema, Map<String, String> properties,
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
index a838c92..fb68404 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
@@ -21,14 +21,18 @@ package org.apache.iceberg.parquet;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Map;
 import org.apache.avro.generic.GenericData;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.TestMetrics;
 import org.apache.iceberg.io.InputFile;
+import org.apache.parquet.hadoop.ParquetFileReader;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
+import static org.apache.iceberg.Files.localInput;
+
 /**
  * Test Metrics for Parquet.
  */
@@ -46,4 +50,16 @@ public class TestParquetMetrics extends TestMetrics {
   public File writeRecords(Schema schema, GenericData.Record... records) throws IOException {
     return ParquetWritingTestUtils.writeRecords(temp, schema, records);
   }
+
+  @Override
+  public File writeRecords(Schema schema, Map<String, String> properties, GenericData.Record... records) throws IOException {
+    return ParquetWritingTestUtils.writeRecords(temp, schema, properties, records);
+  }
+
+  @Override
+  public int splitCount(File parquetFile) throws IOException {
+    try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) {
+      return reader.getRowGroups().size();
+    }
+  }
 }