You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2024/02/27 02:36:32 UTC
(pinot) branch master updated: int96 parity with native parquet reader (#12496)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3267a749f8 int96 parity with native parquet reader (#12496)
3267a749f8 is described below
commit 3267a749f828e8941c753b13a514b1ae41c76ca2
Author: swaminathanmanish <12...@users.noreply.github.com>
AuthorDate: Mon Feb 26 18:36:26 2024 -0800
int96 parity with native parquet reader (#12496)
---
.../inputformat/avro/AvroRecordExtractor.java | 10 ++--
.../parquet/ParquetAvroRecordExtractor.java | 62 ++++++++++++++++++++++
.../parquet/ParquetAvroRecordReader.java | 6 +--
.../parquet/ParquetNativeRecordExtractor.java | 10 ++--
.../parquet/ParquetRecordReaderTest.java | 44 ++++++---------
5 files changed, 95 insertions(+), 37 deletions(-)
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
index c84b15e47c..d1a4dea0dc 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
@@ -68,7 +68,7 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> {
value = AvroSchemaUtil.applyLogicalType(field, value);
}
if (value != null) {
- value = convert(value);
+ value = transformValue(value, field);
}
to.putValue(fieldName, value);
}
@@ -80,7 +80,7 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> {
value = AvroSchemaUtil.applyLogicalType(field, value);
}
if (value != null) {
- value = convert(value);
+ value = transformValue(value, field);
}
to.putValue(fieldName, value);
}
@@ -88,6 +88,10 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> {
return to;
}
+ protected Object transformValue(Object value, Schema.Field field) {
+ return convert(value);
+ }
+
/**
* Returns whether the object is an Avro GenericRecord.
*/
@@ -116,7 +120,7 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> {
String fieldName = field.name();
Object fieldValue = record.get(fieldName);
if (fieldValue != null) {
- fieldValue = convert(fieldValue);
+ fieldValue = transformValue(fieldValue, field);
}
convertedMap.put(fieldName, fieldValue);
}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
new file mode 100644
index 0000000000..8f8ba25a1c
--- /dev/null
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+
+
+public class ParquetAvroRecordExtractor extends AvroRecordExtractor {
+
+ @Override
+ public void init(@Nullable Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig) {
+ super.init(fields, recordExtractorConfig);
+ }
+
+ @Override
+ protected Object transformValue(Object value, Schema.Field field) {
+ return handleDeprecatedTypes(convert(value), field);
+ }
+
+ Object handleDeprecatedTypes(Object value, Schema.Field field) {
+ Schema.Type avroColumnType = field.schema().getType();
+ if (avroColumnType == org.apache.avro.Schema.Type.UNION) {
+ org.apache.avro.Schema nonNullSchema = null;
+ for (org.apache.avro.Schema childFieldSchema : field.schema().getTypes()) {
+ if (childFieldSchema.getType() != org.apache.avro.Schema.Type.NULL) {
+ if (nonNullSchema == null) {
+ nonNullSchema = childFieldSchema;
+ } else {
+ throw new IllegalStateException("More than one non-null schema in UNION schema");
+ }
+ }
+ }
+
+ //INT96 is deprecated. We convert to long as we do in the native parquet extractor.
+ if (nonNullSchema.getName().equals(PrimitiveType.PrimitiveTypeName.INT96.name())) {
+ return ParquetNativeRecordExtractor.convertInt96ToLong((byte[]) value);
+ }
+ }
+ return value;
+ }
+}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
index 8caf384630..e1db085b8e 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
@@ -25,13 +25,11 @@ import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetReader;
-import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
-
/**
* Avro Record reader for Parquet file. This reader doesn't read parquet file with incompatible Avro schemas,
* e.g. INT96, DECIMAL. Please use {@link org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader}
@@ -44,7 +42,7 @@ public class ParquetAvroRecordReader implements RecordReader {
private static final String EXTENSION = "parquet";
private Path _dataFilePath;
- private AvroRecordExtractor _recordExtractor;
+ private ParquetAvroRecordExtractor _recordExtractor;
private ParquetReader<GenericRecord> _parquetReader;
private GenericRecord _nextRecord;
@@ -54,7 +52,7 @@ public class ParquetAvroRecordReader implements RecordReader {
File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
_dataFilePath = new Path(parquetFile.getAbsolutePath());
_parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
- _recordExtractor = new AvroRecordExtractor();
+ _recordExtractor = new ParquetAvroRecordExtractor();
_recordExtractor.init(fieldsToRead, null);
_nextRecord = _parquetReader.read();
}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
index d7a1dd0e22..8a67d63925 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
@@ -170,9 +170,7 @@ public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
return from.getValueToString(fieldIndex, index);
case INT96:
Binary int96 = from.getInt96(fieldIndex, index);
- ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
- return (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
- + buf.getLong(0) / NANOS_PER_MILLISECOND;
+ return convertInt96ToLong(int96.getBytes());
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
@@ -204,6 +202,12 @@ public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
return null;
}
+ public static long convertInt96ToLong(byte[] int96Bytes) {
+ ByteBuffer buf = ByteBuffer.wrap(int96Bytes).order(ByteOrder.LITTLE_ENDIAN);
+ return (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+ + buf.getLong(0) / NANOS_PER_MILLISECOND;
+ }
+
public Object[] extractList(Group group) {
int repFieldCount = group.getType().getFieldCount();
if (repFieldCount < 1) {
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index a6fbb54247..f286042cd4 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -125,28 +125,20 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
@Test
public void testComparison()
throws IOException {
- testComparison(_dataFile, SAMPLE_RECORDS_SIZE, false);
- testComparison(new File(getClass().getClassLoader().getResource("users.parquet").getFile()), 1, false);
- testComparison(new File(getClass().getClassLoader().getResource("test-comparison.gz.parquet").getFile()), 363667,
- false);
- testComparison(new File(getClass().getClassLoader().getResource("test-comparison.snappy.parquet").getFile()), 2870,
- false);
- testComparison(new File(getClass().getClassLoader().getResource("baseballStats.snappy.parquet").getFile()), 97889,
- false);
- testComparison(new File(getClass().getClassLoader().getResource("baseballStats.zstd.parquet").getFile()), 97889,
- false);
- testComparison(new File(getClass().getClassLoader().getResource("githubEvents.snappy.parquet").getFile()), 10000,
- false);
- testComparison(new File(getClass().getClassLoader().getResource("starbucksStores.snappy.parquet").getFile()), 6443,
- false);
- testComparison(new File(getClass().getClassLoader().getResource("airlineStats.snappy.parquet").getFile()), 19492,
- false);
- testComparison(new File(getClass().getClassLoader().getResource("githubActivities.gz.parquet").getFile()), 2000,
- false);
- testComparison(new File(getClass().getClassLoader().getResource("int96AvroParquet.parquet").getFile()), 1, true);
+ testComparison(_dataFile, SAMPLE_RECORDS_SIZE);
+ testComparison(new File(getClass().getClassLoader().getResource("users.parquet").getFile()), 1);
+ testComparison(new File(getClass().getClassLoader().getResource("test-comparison.gz.parquet").getFile()), 363667);
+ testComparison(new File(getClass().getClassLoader().getResource("test-comparison.snappy.parquet").getFile()), 2870);
+ testComparison(new File(getClass().getClassLoader().getResource("baseballStats.snappy.parquet").getFile()), 97889);
+ testComparison(new File(getClass().getClassLoader().getResource("baseballStats.zstd.parquet").getFile()), 97889);
+ testComparison(new File(getClass().getClassLoader().getResource("githubEvents.snappy.parquet").getFile()), 10000);
+ testComparison(new File(getClass().getClassLoader().getResource("starbucksStores.snappy.parquet").getFile()), 6443);
+ testComparison(new File(getClass().getClassLoader().getResource("airlineStats.snappy.parquet").getFile()), 19492);
+ testComparison(new File(getClass().getClassLoader().getResource("githubActivities.gz.parquet").getFile()), 2000);
+ testComparison(new File(getClass().getClassLoader().getResource("int96AvroParquet.parquet").getFile()), 1);
}
- private void testComparison(File dataFile, int totalRecords, boolean skipIndividualRecordComparison)
+ private void testComparison(File dataFile, int totalRecords)
throws IOException {
final ParquetRecordReader avroRecordReader = new ParquetRecordReader();
ParquetRecordReaderConfig avroRecordReaderConfig = new ParquetRecordReaderConfig();
@@ -159,14 +151,14 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
Assert.assertTrue(avroRecordReader.useAvroParquetRecordReader());
Assert.assertFalse(nativeRecordReader.useAvroParquetRecordReader());
- testComparison(avroRecordReader, nativeRecordReader, totalRecords, skipIndividualRecordComparison);
+ testComparison(avroRecordReader, nativeRecordReader, totalRecords);
avroRecordReader.rewind();
nativeRecordReader.rewind();
- testComparison(avroRecordReader, nativeRecordReader, totalRecords, skipIndividualRecordComparison);
+ testComparison(avroRecordReader, nativeRecordReader, totalRecords);
}
private void testComparison(ParquetRecordReader avroRecordReader, ParquetRecordReader nativeRecordReader,
- int totalRecords, boolean skipIndividualRecordComparison)
+ int totalRecords)
throws IOException {
GenericRow avroReuse = new GenericRow();
GenericRow nativeReuse = new GenericRow();
@@ -175,10 +167,8 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
Assert.assertTrue(nativeRecordReader.hasNext());
final GenericRow avroReaderRow = avroRecordReader.next(avroReuse);
final GenericRow nativeReaderRow = nativeRecordReader.next(nativeReuse);
- if (!skipIndividualRecordComparison) {
- Assert.assertEquals(nativeReaderRow.toString(), avroReaderRow.toString());
- Assert.assertTrue(avroReaderRow.equals(nativeReaderRow));
- }
+ Assert.assertEquals(nativeReaderRow.toString(), avroReaderRow.toString());
+ Assert.assertTrue(avroReaderRow.equals(nativeReaderRow));
recordsRead++;
}
Assert.assertFalse(nativeRecordReader.hasNext());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org