You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2024/02/27 02:36:32 UTC

(pinot) branch master updated: int96 parity with native parquet reader (#12496)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3267a749f8 int96 parity with native parquet reader (#12496)
3267a749f8 is described below

commit 3267a749f828e8941c753b13a514b1ae41c76ca2
Author: swaminathanmanish <12...@users.noreply.github.com>
AuthorDate: Mon Feb 26 18:36:26 2024 -0800

    int96 parity with native parquet reader (#12496)
---
 .../inputformat/avro/AvroRecordExtractor.java      | 10 ++--
 .../parquet/ParquetAvroRecordExtractor.java        | 62 ++++++++++++++++++++++
 .../parquet/ParquetAvroRecordReader.java           |  6 +--
 .../parquet/ParquetNativeRecordExtractor.java      | 10 ++--
 .../parquet/ParquetRecordReaderTest.java           | 44 ++++++---------
 5 files changed, 95 insertions(+), 37 deletions(-)

diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
index c84b15e47c..d1a4dea0dc 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
@@ -68,7 +68,7 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> {
           value = AvroSchemaUtil.applyLogicalType(field, value);
         }
         if (value != null) {
-          value = convert(value);
+          value = transformValue(value, field);
         }
         to.putValue(fieldName, value);
       }
@@ -80,7 +80,7 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> {
           value = AvroSchemaUtil.applyLogicalType(field, value);
         }
         if (value != null) {
-          value = convert(value);
+          value = transformValue(value, field);
         }
         to.putValue(fieldName, value);
       }
@@ -88,6 +88,10 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> {
     return to;
   }
 
+  protected Object transformValue(Object value, Schema.Field field) {
+    return convert(value);
+  }
+
   /**
    * Returns whether the object is an Avro GenericRecord.
    */
@@ -116,7 +120,7 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> {
       String fieldName = field.name();
       Object fieldValue = record.get(fieldName);
       if (fieldValue != null) {
-        fieldValue = convert(fieldValue);
+        fieldValue = transformValue(fieldValue, field);
       }
       convertedMap.put(fieldName, fieldValue);
     }
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
new file mode 100644
index 0000000000..8f8ba25a1c
--- /dev/null
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+
+
+public class ParquetAvroRecordExtractor extends AvroRecordExtractor {
+
+  @Override
+  public void init(@Nullable Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig) {
+   super.init(fields, recordExtractorConfig);
+  }
+
+  @Override
+  protected Object transformValue(Object value, Schema.Field field) {
+    return handleDeprecatedTypes(convert(value), field);
+  }
+
+  Object handleDeprecatedTypes(Object value, Schema.Field field) {
+    Schema.Type avroColumnType = field.schema().getType();
+    if (avroColumnType == org.apache.avro.Schema.Type.UNION) {
+      org.apache.avro.Schema nonNullSchema = null;
+      for (org.apache.avro.Schema childFieldSchema : field.schema().getTypes()) {
+        if (childFieldSchema.getType() != org.apache.avro.Schema.Type.NULL) {
+          if (nonNullSchema == null) {
+            nonNullSchema = childFieldSchema;
+          } else {
+            throw new IllegalStateException("More than one non-null schema in UNION schema");
+          }
+        }
+      }
+
+      //INT96 is deprecated. We convert to long as we do in the native parquet extractor.
+      if (nonNullSchema.getName().equals(PrimitiveType.PrimitiveTypeName.INT96.name())) {
+       return ParquetNativeRecordExtractor.convertInt96ToLong((byte[]) value);
+      }
+    }
+    return value;
+  }
+}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
index 8caf384630..e1db085b8e 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
@@ -25,13 +25,11 @@ import javax.annotation.Nullable;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetReader;
-import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 
-
 /**
  * Avro Record reader for Parquet file. This reader doesn't read parquet file with incompatible Avro schemas,
  * e.g. INT96, DECIMAL. Please use {@link org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader}
@@ -44,7 +42,7 @@ public class ParquetAvroRecordReader implements RecordReader {
   private static final String EXTENSION = "parquet";
 
   private Path _dataFilePath;
-  private AvroRecordExtractor _recordExtractor;
+  private ParquetAvroRecordExtractor _recordExtractor;
   private ParquetReader<GenericRecord> _parquetReader;
   private GenericRecord _nextRecord;
 
@@ -54,7 +52,7 @@ public class ParquetAvroRecordReader implements RecordReader {
     File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
     _dataFilePath = new Path(parquetFile.getAbsolutePath());
     _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
-    _recordExtractor = new AvroRecordExtractor();
+    _recordExtractor = new ParquetAvroRecordExtractor();
     _recordExtractor.init(fieldsToRead, null);
     _nextRecord = _parquetReader.read();
   }
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
index d7a1dd0e22..8a67d63925 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
@@ -170,9 +170,7 @@ public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
           return from.getValueToString(fieldIndex, index);
         case INT96:
           Binary int96 = from.getInt96(fieldIndex, index);
-          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
-          return (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
-              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return convertInt96ToLong(int96.getBytes());
         case BINARY:
         case FIXED_LEN_BYTE_ARRAY:
           if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
@@ -204,6 +202,12 @@ public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
     return null;
   }
 
+  public static long convertInt96ToLong(byte[] int96Bytes) {
+    ByteBuffer buf = ByteBuffer.wrap(int96Bytes).order(ByteOrder.LITTLE_ENDIAN);
+    return (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+        + buf.getLong(0) / NANOS_PER_MILLISECOND;
+  }
+
   public Object[] extractList(Group group) {
     int repFieldCount = group.getType().getFieldCount();
     if (repFieldCount < 1) {
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index a6fbb54247..f286042cd4 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -125,28 +125,20 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
   @Test
   public void testComparison()
       throws IOException {
-    testComparison(_dataFile, SAMPLE_RECORDS_SIZE, false);
-    testComparison(new File(getClass().getClassLoader().getResource("users.parquet").getFile()), 1, false);
-    testComparison(new File(getClass().getClassLoader().getResource("test-comparison.gz.parquet").getFile()), 363667,
-        false);
-    testComparison(new File(getClass().getClassLoader().getResource("test-comparison.snappy.parquet").getFile()), 2870,
-        false);
-    testComparison(new File(getClass().getClassLoader().getResource("baseballStats.snappy.parquet").getFile()), 97889,
-        false);
-    testComparison(new File(getClass().getClassLoader().getResource("baseballStats.zstd.parquet").getFile()), 97889,
-        false);
-    testComparison(new File(getClass().getClassLoader().getResource("githubEvents.snappy.parquet").getFile()), 10000,
-        false);
-    testComparison(new File(getClass().getClassLoader().getResource("starbucksStores.snappy.parquet").getFile()), 6443,
-        false);
-    testComparison(new File(getClass().getClassLoader().getResource("airlineStats.snappy.parquet").getFile()), 19492,
-        false);
-    testComparison(new File(getClass().getClassLoader().getResource("githubActivities.gz.parquet").getFile()), 2000,
-        false);
-    testComparison(new File(getClass().getClassLoader().getResource("int96AvroParquet.parquet").getFile()), 1, true);
+    testComparison(_dataFile, SAMPLE_RECORDS_SIZE);
+    testComparison(new File(getClass().getClassLoader().getResource("users.parquet").getFile()), 1);
+    testComparison(new File(getClass().getClassLoader().getResource("test-comparison.gz.parquet").getFile()), 363667);
+    testComparison(new File(getClass().getClassLoader().getResource("test-comparison.snappy.parquet").getFile()), 2870);
+    testComparison(new File(getClass().getClassLoader().getResource("baseballStats.snappy.parquet").getFile()), 97889);
+    testComparison(new File(getClass().getClassLoader().getResource("baseballStats.zstd.parquet").getFile()), 97889);
+    testComparison(new File(getClass().getClassLoader().getResource("githubEvents.snappy.parquet").getFile()), 10000);
+    testComparison(new File(getClass().getClassLoader().getResource("starbucksStores.snappy.parquet").getFile()), 6443);
+    testComparison(new File(getClass().getClassLoader().getResource("airlineStats.snappy.parquet").getFile()), 19492);
+    testComparison(new File(getClass().getClassLoader().getResource("githubActivities.gz.parquet").getFile()), 2000);
+    testComparison(new File(getClass().getClassLoader().getResource("int96AvroParquet.parquet").getFile()), 1);
   }
 
-  private void testComparison(File dataFile, int totalRecords, boolean skipIndividualRecordComparison)
+  private void testComparison(File dataFile, int totalRecords)
       throws IOException {
     final ParquetRecordReader avroRecordReader = new ParquetRecordReader();
     ParquetRecordReaderConfig avroRecordReaderConfig = new ParquetRecordReaderConfig();
@@ -159,14 +151,14 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
     Assert.assertTrue(avroRecordReader.useAvroParquetRecordReader());
     Assert.assertFalse(nativeRecordReader.useAvroParquetRecordReader());
 
-    testComparison(avroRecordReader, nativeRecordReader, totalRecords, skipIndividualRecordComparison);
+    testComparison(avroRecordReader, nativeRecordReader, totalRecords);
     avroRecordReader.rewind();
     nativeRecordReader.rewind();
-    testComparison(avroRecordReader, nativeRecordReader, totalRecords, skipIndividualRecordComparison);
+    testComparison(avroRecordReader, nativeRecordReader, totalRecords);
   }
 
   private void testComparison(ParquetRecordReader avroRecordReader, ParquetRecordReader nativeRecordReader,
-      int totalRecords, boolean skipIndividualRecordComparison)
+      int totalRecords)
       throws IOException {
     GenericRow avroReuse = new GenericRow();
     GenericRow nativeReuse = new GenericRow();
@@ -175,10 +167,8 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
       Assert.assertTrue(nativeRecordReader.hasNext());
       final GenericRow avroReaderRow = avroRecordReader.next(avroReuse);
       final GenericRow nativeReaderRow = nativeRecordReader.next(nativeReuse);
-      if (!skipIndividualRecordComparison) {
-        Assert.assertEquals(nativeReaderRow.toString(), avroReaderRow.toString());
-        Assert.assertTrue(avroReaderRow.equals(nativeReaderRow));
-      }
+      Assert.assertEquals(nativeReaderRow.toString(), avroReaderRow.toString());
+      Assert.assertTrue(avroReaderRow.equals(nativeReaderRow));
       recordsRead++;
     }
     Assert.assertFalse(nativeRecordReader.hasNext());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org