You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/24 18:19:02 UTC
(pinot) branch master updated: Enabling avroParquet to read Int96 as bytes (#12484)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 55cecc1824 Enabling avroParquet to read Int96 as bytes (#12484)
55cecc1824 is described below
commit 55cecc1824032d683b8fc12e868a37c083f73358
Author: swaminathanmanish <12...@users.noreply.github.com>
AuthorDate: Sat Feb 24 10:18:56 2024 -0800
Enabling avroParquet to read Int96 as bytes (#12484)
---
.../plugin/inputformat/parquet/ParquetUtils.java | 3 ++
.../parquet/ParquetRecordReaderTest.java | 43 +++++++++++++--------
.../src/test/resources/int96AvroParquet.parquet | Bin 0 -> 19853 bytes
3 files changed, 30 insertions(+), 16 deletions(-)
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
index f576a0a325..378c10ca84 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -100,6 +101,8 @@ public class ParquetUtils {
// in case that user's hadoop conf overwrite this item
Configuration conf = new Configuration();
conf.set("fs.defaultFS", DEFAULT_FS);
+ // To read Int96 as bytes.
+ conf.set(AvroReadSupport.READ_INT96_AS_FIXED, "true");
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
return conf;
}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index 8133a18850..a6fbb54247 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -125,19 +125,28 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
@Test
public void testComparison()
throws IOException {
- testComparison(_dataFile, SAMPLE_RECORDS_SIZE);
- testComparison(new File(getClass().getClassLoader().getResource("users.parquet").getFile()), 1);
- testComparison(new File(getClass().getClassLoader().getResource("test-comparison.gz.parquet").getFile()), 363667);
- testComparison(new File(getClass().getClassLoader().getResource("test-comparison.snappy.parquet").getFile()), 2870);
- testComparison(new File(getClass().getClassLoader().getResource("baseballStats.snappy.parquet").getFile()), 97889);
- testComparison(new File(getClass().getClassLoader().getResource("baseballStats.zstd.parquet").getFile()), 97889);
- testComparison(new File(getClass().getClassLoader().getResource("githubEvents.snappy.parquet").getFile()), 10000);
- testComparison(new File(getClass().getClassLoader().getResource("starbucksStores.snappy.parquet").getFile()), 6443);
- testComparison(new File(getClass().getClassLoader().getResource("airlineStats.snappy.parquet").getFile()), 19492);
- testComparison(new File(getClass().getClassLoader().getResource("githubActivities.gz.parquet").getFile()), 2000);
+ testComparison(_dataFile, SAMPLE_RECORDS_SIZE, false);
+ testComparison(new File(getClass().getClassLoader().getResource("users.parquet").getFile()), 1, false);
+ testComparison(new File(getClass().getClassLoader().getResource("test-comparison.gz.parquet").getFile()), 363667,
+ false);
+ testComparison(new File(getClass().getClassLoader().getResource("test-comparison.snappy.parquet").getFile()), 2870,
+ false);
+ testComparison(new File(getClass().getClassLoader().getResource("baseballStats.snappy.parquet").getFile()), 97889,
+ false);
+ testComparison(new File(getClass().getClassLoader().getResource("baseballStats.zstd.parquet").getFile()), 97889,
+ false);
+ testComparison(new File(getClass().getClassLoader().getResource("githubEvents.snappy.parquet").getFile()), 10000,
+ false);
+ testComparison(new File(getClass().getClassLoader().getResource("starbucksStores.snappy.parquet").getFile()), 6443,
+ false);
+ testComparison(new File(getClass().getClassLoader().getResource("airlineStats.snappy.parquet").getFile()), 19492,
+ false);
+ testComparison(new File(getClass().getClassLoader().getResource("githubActivities.gz.parquet").getFile()), 2000,
+ false);
+ testComparison(new File(getClass().getClassLoader().getResource("int96AvroParquet.parquet").getFile()), 1, true);
}
- private void testComparison(File dataFile, int totalRecords)
+ private void testComparison(File dataFile, int totalRecords, boolean skipIndividualRecordComparison)
throws IOException {
final ParquetRecordReader avroRecordReader = new ParquetRecordReader();
ParquetRecordReaderConfig avroRecordReaderConfig = new ParquetRecordReaderConfig();
@@ -150,14 +159,14 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
Assert.assertTrue(avroRecordReader.useAvroParquetRecordReader());
Assert.assertFalse(nativeRecordReader.useAvroParquetRecordReader());
- testComparison(avroRecordReader, nativeRecordReader, totalRecords);
+ testComparison(avroRecordReader, nativeRecordReader, totalRecords, skipIndividualRecordComparison);
avroRecordReader.rewind();
nativeRecordReader.rewind();
- testComparison(avroRecordReader, nativeRecordReader, totalRecords);
+ testComparison(avroRecordReader, nativeRecordReader, totalRecords, skipIndividualRecordComparison);
}
private void testComparison(ParquetRecordReader avroRecordReader, ParquetRecordReader nativeRecordReader,
- int totalRecords)
+ int totalRecords, boolean skipIndividualRecordComparison)
throws IOException {
GenericRow avroReuse = new GenericRow();
GenericRow nativeReuse = new GenericRow();
@@ -166,8 +175,10 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
Assert.assertTrue(nativeRecordReader.hasNext());
final GenericRow avroReaderRow = avroRecordReader.next(avroReuse);
final GenericRow nativeReaderRow = nativeRecordReader.next(nativeReuse);
- Assert.assertEquals(nativeReaderRow.toString(), avroReaderRow.toString());
- Assert.assertTrue(avroReaderRow.equals(nativeReaderRow));
+ if (!skipIndividualRecordComparison) {
+ Assert.assertEquals(nativeReaderRow.toString(), avroReaderRow.toString());
+ Assert.assertTrue(avroReaderRow.equals(nativeReaderRow));
+ }
recordsRead++;
}
Assert.assertFalse(nativeRecordReader.hasNext());
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/int96AvroParquet.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/int96AvroParquet.parquet
new file mode 100644
index 0000000000..857888fbb2
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/int96AvroParquet.parquet differ
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org