You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/10/23 21:30:57 UTC
[iceberg] branch master updated: Parquet: Add test for Arrow buffer
reallocation (#1480)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3d308d3 Parquet: Add test for Arrow buffer reallocation (#1480)
3d308d3 is described below
commit 3d308d3c292ba06ebce554efe05561b8f2e79b51
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Fri Oct 23 14:30:49 2020 -0700
Parquet: Add test for Arrow buffer reallocation (#1480)
---
.../org/apache/iceberg/spark/data/TestHelpers.java | 2 +-
...estParquetDictionaryEncodedVectorizedReads.java | 11 +++--
...naryFallbackToPlainEncodingVectorizedReads.java | 8 +++-
.../vectorized/TestParquetVectorizedReads.java | 54 ++++++++++++++++++----
4 files changed, 61 insertions(+), 14 deletions(-)
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index c4d4bb4..d297d35 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -98,7 +98,7 @@ public class TestHelpers {
if (checkArrowValidityVector) {
ColumnVector columnVector = batch.column(i);
ValueVector arrowVector = ((IcebergArrowColumnVector) columnVector).vectorAccessor().getVector();
- Assert.assertEquals("Nullability doesn't match", expectedValue == null, arrowVector.isNull(rowId));
+ Assert.assertFalse("Nullability doesn't match", expectedValue == null ^ arrowVector.isNull(rowId));
}
}
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
index bd5c53d..d8f1ff3 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
@@ -25,9 +25,11 @@ import org.apache.avro.generic.GenericData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Function;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.data.RandomData;
import org.junit.Assert;
import org.junit.Ignore;
@@ -38,8 +40,10 @@ import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DE
public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVectorizedReads {
@Override
- Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage) {
- return RandomData.generateDictionaryEncodableData(schema, numRecords, seed, nullPercentage);
+ Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage,
+ Function<GenericData.Record, GenericData.Record> transform) {
+ Iterable data = RandomData.generateDictionaryEncodableData(schema, numRecords, seed, nullPercentage);
+ return transform == IDENTITY ? data : Iterables.transform(data, transform);
}
@Test
@@ -82,6 +86,7 @@ public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVect
FluentIterable.concat(dictionaryEncodableData, nonDictionaryData, dictionaryEncodableData),
mixedFile,
false,
- true);
+ true,
+ BATCH_SIZE);
}
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java
index 3c1969e..5ceac3f 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java
@@ -27,6 +27,8 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Function;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.data.RandomData;
import org.junit.Ignore;
import org.junit.Test;
@@ -40,9 +42,11 @@ public class TestParquetDictionaryFallbackToPlainEncodingVectorizedReads extends
}
@Override
- Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage) {
+ Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage,
+ Function<GenericData.Record, GenericData.Record> transform) {
// TODO: take into account nullPercentage when generating fallback encoding data
- return RandomData.generateFallbackData(schema, numRecords, seed, numRecords / 20);
+ Iterable data = RandomData.generateFallbackData(schema, numRecords, seed, numRecords / 20);
+ return transform == IDENTITY ? data : Iterables.transform(data, transform);
}
@Override
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
index 80c6cd1..fdef454 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
@@ -29,6 +29,9 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Function;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.data.AvroDataTest;
import org.apache.iceberg.spark.data.RandomData;
@@ -49,6 +52,9 @@ import static org.apache.iceberg.types.Types.NestedField.required;
public class TestParquetVectorizedReads extends AvroDataTest {
private static final int NUM_ROWS = 200_000;
+ static final int BATCH_SIZE = 10_000;
+
+ static final Function<GenericData.Record, GenericData.Record> IDENTITY = record -> record;
@Override
protected void writeAndValidate(Schema schema) throws IOException {
@@ -56,15 +62,24 @@ public class TestParquetVectorizedReads extends AvroDataTest {
}
private void writeAndValidate(
- Schema schema, int numRecords, long seed, float nullPercentage,
- boolean setAndCheckArrowValidityVector, boolean reuseContainers)
+ Schema schema, int numRecords, long seed, float nullPercentage,
+ boolean setAndCheckArrowValidityVector, boolean reuseContainers)
+ throws IOException {
+ writeAndValidate(schema, numRecords, seed, nullPercentage,
+ setAndCheckArrowValidityVector, reuseContainers, BATCH_SIZE, IDENTITY);
+ }
+
+ private void writeAndValidate(
+ Schema schema, int numRecords, long seed, float nullPercentage,
+ boolean setAndCheckArrowValidityVector, boolean reuseContainers, int batchSize,
+ Function<GenericData.Record, GenericData.Record> transform)
throws IOException {
// Write test data
Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(
schema,
type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get()));
- Iterable<GenericData.Record> expected = generateData(schema, numRecords, seed, nullPercentage);
+ Iterable<GenericData.Record> expected = generateData(schema, numRecords, seed, nullPercentage, transform);
// write a test parquet file using iceberg writer
File testFile = temp.newFile();
@@ -73,15 +88,18 @@ public class TestParquetVectorizedReads extends AvroDataTest {
try (FileAppender<GenericData.Record> writer = getParquetWriter(schema, testFile)) {
writer.addAll(expected);
}
- assertRecordsMatch(schema, numRecords, expected, testFile, setAndCheckArrowValidityVector, reuseContainers);
+ assertRecordsMatch(schema, numRecords, expected, testFile, setAndCheckArrowValidityVector,
+ reuseContainers, batchSize);
}
protected int getNumRows() {
return NUM_ROWS;
}
- Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage) {
- return RandomData.generate(schema, numRecords, seed, nullPercentage);
+ Iterable<GenericData.Record> generateData(Schema schema, int numRecords, long seed, float nullPercentage,
+ Function<GenericData.Record, GenericData.Record> transform) {
+ Iterable<GenericData.Record> data = RandomData.generate(schema, numRecords, seed, nullPercentage);
+ return transform == IDENTITY ? data : Iterables.transform(data, transform);
}
FileAppender<GenericData.Record> getParquetWriter(Schema schema, File testFile) throws IOException {
@@ -93,11 +111,11 @@ public class TestParquetVectorizedReads extends AvroDataTest {
void assertRecordsMatch(
Schema schema, int expectedSize, Iterable<GenericData.Record> expected, File testFile,
- boolean setAndCheckArrowValidityBuffer, boolean reuseContainers)
+ boolean setAndCheckArrowValidityBuffer, boolean reuseContainers, int batchSize)
throws IOException {
Parquet.ReadBuilder readBuilder = Parquet.read(Files.localInput(testFile))
.project(schema)
- .recordsPerBatch(10000)
+ .recordsPerBatch(batchSize)
.createBatchedReaderFunc(type -> VectorizedSparkParquetReaders.buildReader(
schema,
type,
@@ -193,4 +211,24 @@ public class TestParquetVectorizedReads extends AvroDataTest {
writeAndValidate(TypeUtil.assignIncreasingFreshIds(new Schema(SUPPORTED_PRIMITIVES.fields())),
getNumRows(), 0L, RandomData.DEFAULT_NULL_PERCENTAGE, true, false);
}
+
+ @Test
+ public void testVectorizedReadsWithReallocatedArrowBuffers() throws IOException {
+ // With a batch size of 2, 256 bytes are allocated in the VarCharVector. By adding strings of
+ // length 512, the vector will need to be reallocated for storing the batch.
+ writeAndValidate(new Schema(
+ Lists.newArrayList(
+ SUPPORTED_PRIMITIVES.field("id"),
+ SUPPORTED_PRIMITIVES.field("data"))),
+ 10, 0L, RandomData.DEFAULT_NULL_PERCENTAGE,
+ true, true, 2,
+ record -> {
+ if (record.get("data") != null) {
+ record.put("data", Strings.padEnd((String) record.get("data"), 512, 'a'));
+ } else {
+ record.put("data", Strings.padEnd("", 512, 'a'));
+ }
+ return record;
+ });
+ }
}