You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/10/23 21:30:57 UTC

[iceberg] branch master updated: Parquet: Add test for Arrow buffer reallocation (#1480)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d308d3  Parquet: Add test for Arrow buffer reallocation (#1480)
3d308d3 is described below

commit 3d308d3c292ba06ebce554efe05561b8f2e79b51
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Fri Oct 23 14:30:49 2020 -0700

    Parquet: Add test for Arrow buffer reallocation (#1480)
---
 .../org/apache/iceberg/spark/data/TestHelpers.java |  2 +-
 ...estParquetDictionaryEncodedVectorizedReads.java | 11 +++--
 ...naryFallbackToPlainEncodingVectorizedReads.java |  8 +++-
 .../vectorized/TestParquetVectorizedReads.java     | 54 ++++++++++++++++++----
 4 files changed, 61 insertions(+), 14 deletions(-)

diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index c4d4bb4..d297d35 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -98,7 +98,7 @@ public class TestHelpers {
         if (checkArrowValidityVector) {
           ColumnVector columnVector = batch.column(i);
           ValueVector arrowVector = ((IcebergArrowColumnVector) columnVector).vectorAccessor().getVector();
-          Assert.assertEquals("Nullability doesn't match", expectedValue == null, arrowVector.isNull(rowId));
+          Assert.assertFalse("Nullability doesn't match", expectedValue == null ^ arrowVector.isNull(rowId));
         }
       }
     }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
index bd5c53d..d8f1ff3 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
@@ -25,9 +25,11 @@ import org.apache.avro.generic.GenericData;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Function;
 import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.spark.data.RandomData;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -38,8 +40,10 @@ import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DE
 public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVectorizedReads {
 
   @Override
-  Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage) {
-    return RandomData.generateDictionaryEncodableData(schema, numRecords, seed, nullPercentage);
+  Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage,
+                                            Function<GenericData.Record, GenericData.Record> transform) {
+    Iterable data = RandomData.generateDictionaryEncodableData(schema, numRecords, seed, nullPercentage);
+    return transform == IDENTITY ? data : Iterables.transform(data, transform);
   }
 
   @Test
@@ -82,6 +86,7 @@ public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVect
             FluentIterable.concat(dictionaryEncodableData, nonDictionaryData, dictionaryEncodableData),
             mixedFile,
             false,
-            true);
+            true,
+            BATCH_SIZE);
   }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java
index 3c1969e..5ceac3f 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java
@@ -27,6 +27,8 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Function;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.spark.data.RandomData;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -40,9 +42,11 @@ public class TestParquetDictionaryFallbackToPlainEncodingVectorizedReads extends
   }
 
   @Override
-  Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage) {
+  Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage,
+                                            Function<GenericData.Record, GenericData.Record> transform) {
     // TODO: take into account nullPercentage when generating fallback encoding data
-    return RandomData.generateFallbackData(schema, numRecords, seed, numRecords / 20);
+    Iterable data = RandomData.generateFallbackData(schema, numRecords, seed, numRecords / 20);
+    return transform == IDENTITY ? data : Iterables.transform(data, transform);
   }
 
   @Override
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
index 80c6cd1..fdef454 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
@@ -29,6 +29,9 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Function;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.data.AvroDataTest;
 import org.apache.iceberg.spark.data.RandomData;
@@ -49,6 +52,9 @@ import static org.apache.iceberg.types.Types.NestedField.required;
 
 public class TestParquetVectorizedReads extends AvroDataTest {
   private static final int NUM_ROWS = 200_000;
+  static final int BATCH_SIZE = 10_000;
+
+  static final Function<GenericData.Record, GenericData.Record> IDENTITY = record -> record;
 
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
@@ -56,15 +62,24 @@ public class TestParquetVectorizedReads extends AvroDataTest {
   }
 
   private void writeAndValidate(
-      Schema schema, int numRecords, long seed, float nullPercentage,
-      boolean setAndCheckArrowValidityVector, boolean reuseContainers)
+          Schema schema, int numRecords, long seed, float nullPercentage,
+          boolean setAndCheckArrowValidityVector, boolean reuseContainers)
+          throws IOException {
+    writeAndValidate(schema, numRecords, seed, nullPercentage,
+            setAndCheckArrowValidityVector, reuseContainers, BATCH_SIZE, IDENTITY);
+  }
+
+  private void writeAndValidate(
+          Schema schema, int numRecords, long seed, float nullPercentage,
+          boolean setAndCheckArrowValidityVector, boolean reuseContainers, int batchSize,
+          Function<GenericData.Record, GenericData.Record> transform)
       throws IOException {
     // Write test data
     Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(
         schema,
         type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get()));
 
-    Iterable<GenericData.Record> expected = generateData(schema, numRecords, seed, nullPercentage);
+    Iterable<GenericData.Record> expected = generateData(schema, numRecords, seed, nullPercentage, transform);
 
     // write a test parquet file using iceberg writer
     File testFile = temp.newFile();
@@ -73,15 +88,18 @@ public class TestParquetVectorizedReads extends AvroDataTest {
     try (FileAppender<GenericData.Record> writer = getParquetWriter(schema, testFile)) {
       writer.addAll(expected);
     }
-    assertRecordsMatch(schema, numRecords, expected, testFile, setAndCheckArrowValidityVector, reuseContainers);
+    assertRecordsMatch(schema, numRecords, expected, testFile, setAndCheckArrowValidityVector,
+            reuseContainers, batchSize);
   }
 
   protected int getNumRows() {
     return NUM_ROWS;
   }
 
-  Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage) {
-    return RandomData.generate(schema, numRecords, seed, nullPercentage);
+  Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage,
+                                            Function<GenericData.Record, GenericData.Record> transform) {
+    Iterable<GenericData.Record> data = RandomData.generate(schema, numRecords, seed, nullPercentage);
+    return transform == IDENTITY ? data : Iterables.transform(data, transform);
   }
 
   FileAppender<GenericData.Record> getParquetWriter(Schema schema, File testFile) throws IOException {
@@ -93,11 +111,11 @@ public class TestParquetVectorizedReads extends AvroDataTest {
 
   void assertRecordsMatch(
       Schema schema, int expectedSize, Iterable<GenericData.Record> expected, File testFile,
-      boolean setAndCheckArrowValidityBuffer, boolean reuseContainers)
+      boolean setAndCheckArrowValidityBuffer, boolean reuseContainers, int batchSize)
       throws IOException {
     Parquet.ReadBuilder readBuilder = Parquet.read(Files.localInput(testFile))
         .project(schema)
-        .recordsPerBatch(10000)
+        .recordsPerBatch(batchSize)
         .createBatchedReaderFunc(type -> VectorizedSparkParquetReaders.buildReader(
             schema,
             type,
@@ -193,4 +211,24 @@ public class TestParquetVectorizedReads extends AvroDataTest {
     writeAndValidate(TypeUtil.assignIncreasingFreshIds(new Schema(SUPPORTED_PRIMITIVES.fields())),
         getNumRows(), 0L, RandomData.DEFAULT_NULL_PERCENTAGE, true, false);
   }
+
+  @Test
+  public void testVectorizedReadsWithReallocatedArrowBuffers() throws IOException {
+    // With a batch size of 2, 256 bytes are allocated in the VarCharVector. By adding strings of
+    // length 512, the vector will need to be reallocated for storing the batch.
+    writeAndValidate(new Schema(
+            Lists.newArrayList(
+                SUPPORTED_PRIMITIVES.field("id"),
+                SUPPORTED_PRIMITIVES.field("data"))),
+        10, 0L, RandomData.DEFAULT_NULL_PERCENTAGE,
+        true, true, 2,
+        record -> {
+          if (record.get("data") != null) {
+            record.put("data", Strings.padEnd((String) record.get("data"), 512, 'a'));
+          } else {
+            record.put("data", Strings.padEnd("", 512, 'a'));
+          }
+          return record;
+        });
+  }
 }