You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/02/02 20:11:38 UTC

[incubator-pinot] branch native-parquet-support created (now 5a7f950)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch native-parquet-support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 5a7f950  Adding native parquet record reader support

This branch includes the following new commits:

     new 5a7f950  Adding native parquet record reader support

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding native parquet record reader support

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch native-parquet-support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 5a7f950d50126a4299bfecb131cf8505044e9b3c
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Tue Feb 2 12:11:10 2021 -0800

    Adding native parquet record reader support
---
 ...ordReader.java => ParquetAvroRecordReader.java} |   6 +-
 .../parquet/ParquetNativeRecordExtractor.java      | 263 +++++++++++++++++++++
 .../parquet/ParquetNativeRecordReader.java         | 129 ++++++++++
 .../inputformat/parquet/ParquetRecordReader.java   |  38 ++-
 .../parquet/ParquetRecordReaderConfig.java         |  43 ++++
 .../plugin/inputformat/parquet/ParquetUtils.java   |  18 +-
 .../parquet/ParquetRecordReaderTest.java           |  65 ++++-
 ...test-file-with-int96-and-decimal.snappy.parquet | Bin 0 -> 19659 bytes
 .../apache/pinot/spi/data/readers/GenericRow.java  |  53 ++++-
 .../spi/data/readers/AbstractRecordReaderTest.java |   5 +-
 10 files changed, 585 insertions(+), 35 deletions(-)

diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
similarity index 92%
copy from pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
copy to pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
index 10ce618..9377cc6 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
@@ -34,7 +34,7 @@ import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 /**
  * Record reader for Parquet file.
  */
-public class ParquetRecordReader implements RecordReader {
+public class ParquetAvroRecordReader implements RecordReader {
   private Path _dataFilePath;
   private AvroRecordExtractor _recordExtractor;
   private ParquetReader<GenericRecord> _parquetReader;
@@ -44,7 +44,7 @@ public class ParquetRecordReader implements RecordReader {
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
     _dataFilePath = new Path(dataFile.getAbsolutePath());
-    _parquetReader = ParquetUtils.getParquetReader(_dataFilePath);
+    _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
     _recordExtractor = new AvroRecordExtractor();
     _recordExtractor.init(fieldsToRead, null);
     _nextRecord = _parquetReader.read();
@@ -73,7 +73,7 @@ public class ParquetRecordReader implements RecordReader {
   public void rewind()
       throws IOException {
     _parquetReader.close();
-    _parquetReader = ParquetUtils.getParquetReader(_dataFilePath);
+    _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
     _nextRecord = _parquetReader.read();
   }
 
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
new file mode 100644
index 0000000..96c171e
--- /dev/null
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    if (_extractAll) {
+      List<Type> fields = from.getType().getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, from.getType().getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int field) {
+    int valueCount = from.getFieldRepetitionCount(field);
+    Type fieldType = from.getType().getType(field);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, field, fieldType, 0);
+    }
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, field, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int field, Type fieldType, int index) {
+    String valueToString = from.getValueToString(field, index);
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(field, index);
+        case INT64:
+          return from.getLong(field, index);
+        case FLOAT:
+          return from.getFloat(field, index);
+        case DOUBLE:
+          return from.getDouble(field, index);
+        case BOOLEAN:
+          return from.getValueToString(field, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          final OriginalType originalType = fieldType.getOriginalType();
+          if (fieldType.getOriginalType() == OriginalType.UTF8) {
+            return from.getValueToString(field, index);
+          }
+          if (fieldType.getOriginalType() == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(field, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(field, index);
+        case INT96:
+          Binary int96 = from.getInt96(field, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if (fieldType.isRepetition(Type.Repetition.OPTIONAL)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+      Group group = from.getGroup(field, index);
+      return extractMap(group);
+    } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+      Group group = from.getGroup(field, index);
+      if (fieldType.getOriginalType() == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    }
+    return null;
+  }
+
+  public Object[] extractList(Group group) {
+    int repFieldCount = group.getType().getFieldCount();
+    if (repFieldCount < 1) {
+      return null;
+    }
+    Object[] list = new Object[repFieldCount];
+    for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+      list[repFieldIdx] = extractValue(group, repFieldIdx);
+    }
+    if (repFieldCount == 1 && list[0] == null) {
+      return null;
+    }
+    if (repFieldCount == 1 && list[0].getClass().isArray()) {
+      return (Object[]) list[0];
+    }
+    return list;
+  }
+
+  public Map<String, Object> extractMap(Group group) {
+    final int repFieldCount = group.getType().getFieldCount();
+    if (repFieldCount < 1) {
+      return null;
+    }
+    Map<String, Object> resultMap = new HashMap<>();
+    for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+      Object value = extractValue(group, repFieldIdx);
+      resultMap.put(group.getType().getType(repFieldIdx).getName(), value);
+    }
+    return resultMap;
+  }
+
+  public static BigDecimal binaryToDecimal(Binary value, int precision, int scale) {
+    /*
+     * Precision <= 18 checks for the max number of digits for an unscaled long,
+     * else treat with big integer conversion
+     */
+    if (precision <= 18) {
+      ByteBuffer buffer = value.toByteBuffer();
+      byte[] bytes = buffer.array();
+      int start = buffer.arrayOffset() + buffer.position();
+      int end = buffer.arrayOffset() + buffer.limit();
+      long unscaled = 0L;
+      int i = start;
+      while (i < end) {
+        unscaled = (unscaled << 8 | bytes[i] & 0xff);
+        i++;
+      }
+      int bits = 8 * (end - start);
+      long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits);
+      if (unscaledNew <= -pow(10, 18) || unscaledNew >= pow(10, 18)) {
+        return new BigDecimal(unscaledNew);
+      } else {
+        return BigDecimal.valueOf(unscaledNew / pow(10, scale));
+      }
+    } else {
+      return new BigDecimal(new BigInteger(value.getBytes()), scale);
+    }
+  }
+
+  @Override
+  public Object convertMap(Object value) {
+    Map<Object, Object> map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Map.Entry<Object, Object> entry : map.entrySet()) {
+      Object mapKey = entry.getKey();
+      Object mapValue = entry.getValue();
+      if (mapKey != null) {
+        Object convertedMapValue = null;
+        if (mapValue != null) {
+          convertedMapValue = convert(mapValue);
+        }
+        convertedMap.put(convertSingleValue(entry.getKey()), convertedMapValue);
+      }
+    }
+    if (convertedMap.isEmpty()) {
+      return null;
+    }
+    return convertedMap;
+  }
+
+  @Override
+  public boolean isMultiValue(Object value) {
+    if (super.isMultiValue(value)) {
+      return true;
+    }
+    return value.getClass().isArray();
+  }
+
+  @Nullable
+  @Override
+  protected Object convertMultiValue(Object value) {
+    if (value instanceof Collection) {
+      return super.convertMultiValue(value);
+    }
+    // value is Object[]
+    Object[] values = (Object[]) value;
+    return super.convertMultiValue(Arrays.asList(values));
+  }
+}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
new file mode 100644
index 0000000..542f117
--- /dev/null
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.schema.MessageType;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+/**
+ * Record reader for Parquet file.
+ */
+public class ParquetNativeRecordReader implements RecordReader {
+  private Path _dataFilePath;
+  private ParquetNativeRecordExtractor _recordExtractor;
+  private MessageType _schema;
+  private ParquetMetadata _parquetMetadata;
+  private ParquetFileReader _parquetFileReader;
+  private Group _nextRecord;
+  private PageReadStore _pageReadStore;
+  private MessageColumnIO _columnIO;
+  private org.apache.parquet.io.RecordReader _parquetRecordReader;
+  private int _currentPageIdx;
+
+  @Override
+  public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
+      throws IOException {
+    _dataFilePath = new Path(dataFile.getAbsolutePath());
+    Configuration conf = new Configuration();
+    _parquetMetadata = ParquetFileReader.readFooter(conf, _dataFilePath, ParquetMetadataConverter.NO_FILTER);
+    _recordExtractor = new ParquetNativeRecordExtractor();
+    _recordExtractor.init(fieldsToRead, null);
+    _schema = _parquetMetadata.getFileMetaData().getSchema();
+    _parquetFileReader =
+        new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(), _dataFilePath, _parquetMetadata.getBlocks(),
+            _schema.getColumns());
+    _pageReadStore = _parquetFileReader.readNextRowGroup();
+    _columnIO = new ColumnIOFactory().getColumnIO(_schema);
+    _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
+    _currentPageIdx = 0;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (_pageReadStore == null) {
+      return false;
+    }
+    if (_pageReadStore.getRowCount() - _currentPageIdx >= 1) {
+      // System.out.println("_pageReadStore.getRowCount() = " + _pageReadStore.getRowCount() + ", _currentPageIdx = " + _currentPageIdx);
+      return true;
+    }
+    try {
+      _pageReadStore = _parquetFileReader.readNextRowGroup();
+      _currentPageIdx = 0;
+      if (_pageReadStore == null) {
+        return false;
+      }
+      _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return hasNext();
+  }
+
+  @Override
+  public GenericRow next()
+      throws IOException {
+    return next(new GenericRow());
+  }
+
+  @Override
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    _nextRecord = (Group) _parquetRecordReader.read();
+    _recordExtractor.extract(_nextRecord, reuse);
+    _currentPageIdx++;
+    return reuse;
+  }
+
+  @Override
+  public void rewind()
+      throws IOException {
+    _parquetFileReader.close();
+    Configuration conf = new Configuration();
+    _parquetFileReader =
+        new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(), _dataFilePath, _parquetMetadata.getBlocks(),
+            _schema.getColumns());
+    _pageReadStore = _parquetFileReader.readNextRowGroup();
+    _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
+    _currentPageIdx = 0;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _parquetFileReader.close();
+  }
+}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
index 10ce618..a5f444d 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
@@ -22,10 +22,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Set;
 import javax.annotation.Nullable;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.ParquetReader;
-import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
@@ -35,24 +31,24 @@ import org.apache.pinot.spi.data.readers.RecordReaderConfig;
  * Record reader for Parquet file.
  */
 public class ParquetRecordReader implements RecordReader {
-  private Path _dataFilePath;
-  private AvroRecordExtractor _recordExtractor;
-  private ParquetReader<GenericRecord> _parquetReader;
-  private GenericRecord _nextRecord;
+  private RecordReader _internalParquetRecordReader;
+  private boolean _useAvroParquetRecordReader = true;
 
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
-    _dataFilePath = new Path(dataFile.getAbsolutePath());
-    _parquetReader = ParquetUtils.getParquetReader(_dataFilePath);
-    _recordExtractor = new AvroRecordExtractor();
-    _recordExtractor.init(fieldsToRead, null);
-    _nextRecord = _parquetReader.read();
+    if (recordReaderConfig == null || ((ParquetRecordReaderConfig) recordReaderConfig).useParquetAvroRecordReader()) {
+      _internalParquetRecordReader = new ParquetAvroRecordReader();
+    } else {
+      _useAvroParquetRecordReader = false;
+      _internalParquetRecordReader = new ParquetNativeRecordReader();
+    }
+    _internalParquetRecordReader.init(dataFile, fieldsToRead, recordReaderConfig);
   }
 
   @Override
   public boolean hasNext() {
-    return _nextRecord != null;
+    return _internalParquetRecordReader.hasNext();
   }
 
   @Override
@@ -64,22 +60,22 @@ public class ParquetRecordReader implements RecordReader {
   @Override
   public GenericRow next(GenericRow reuse)
       throws IOException {
-    _recordExtractor.extract(_nextRecord, reuse);
-    _nextRecord = _parquetReader.read();
-    return reuse;
+    return _internalParquetRecordReader.next(reuse);
   }
 
   @Override
   public void rewind()
       throws IOException {
-    _parquetReader.close();
-    _parquetReader = ParquetUtils.getParquetReader(_dataFilePath);
-    _nextRecord = _parquetReader.read();
+    _internalParquetRecordReader.rewind();
   }
 
   @Override
   public void close()
       throws IOException {
-    _parquetReader.close();
+    _internalParquetRecordReader.close();
+  }
+
+  public boolean useAvroParquetRecordReader() {
+    return _useAvroParquetRecordReader;
   }
 }
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java
new file mode 100644
index 0000000..a1c1799
--- /dev/null
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+public class ParquetRecordReaderConfig implements RecordReaderConfig {
+  private static final String USE_PARQUET_AVRO_RECORDER_READER = "useParquetAvroRecordReader";
+  private boolean _useParquetAvroRecordReader = true;
+
+  public ParquetRecordReaderConfig() {
+  }
+
+  public ParquetRecordReaderConfig(Configuration conf) {
+    _useParquetAvroRecordReader = conf.getBoolean(USE_PARQUET_AVRO_RECORDER_READER, true);
+  }
+
+  public boolean useParquetAvroRecordReader() {
+    return _useParquetAvroRecordReader;
+  }
+
+  public void setUseParquetAvroRecordReader(boolean useParquetAvroRecordReader) {
+    _useParquetAvroRecordReader = useParquetAvroRecordReader;
+  }
+}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
index f2a49b1..c94e5ad 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
@@ -33,6 +33,7 @@ import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
 
 
 public class ParquetUtils {
@@ -44,27 +45,27 @@ public class ParquetUtils {
   /**
    * Returns a ParquetReader with the given path.
    */
-  public static ParquetReader<GenericRecord> getParquetReader(Path path)
+  public static ParquetReader<GenericRecord> getParquetAvroReader(Path path)
       throws IOException {
     //noinspection unchecked
     return AvroParquetReader.<GenericRecord>builder(path).disableCompatibility().withDataModel(GenericData.get())
-        .withConf(getConfiguration()).build();
+        .withConf(getParquetAvroReaderConfiguration()).build();
   }
 
   /**
    * Returns a ParquetWriter with the given path and schema.
    */
-  public static ParquetWriter<GenericRecord> getParquetWriter(Path path, Schema schema)
+  public static ParquetWriter<GenericRecord> getParquetAvroWriter(Path path, Schema schema)
       throws IOException {
-    return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema).withConf(getConfiguration()).build();
+    return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema).withConf(getParquetAvroReaderConfiguration()).build();
   }
 
   /**
    * Returns the schema for the given Parquet file path.
    */
-  public static Schema getParquetSchema(Path path)
+  public static Schema getParquetAvroSchema(Path path)
       throws IOException {
-    ParquetMetadata footer = ParquetFileReader.readFooter(getConfiguration(), path, ParquetMetadataConverter.NO_FILTER);
+    ParquetMetadata footer = ParquetFileReader.readFooter(getParquetAvroReaderConfiguration(), path, ParquetMetadataConverter.NO_FILTER);
     Map<String, String> metaData = footer.getFileMetaData().getKeyValueMetaData();
     String schemaString = metaData.get("parquet.avro.schema");
     if (schemaString == null) {
@@ -74,11 +75,12 @@ public class ParquetUtils {
     if (schemaString != null) {
       return new Schema.Parser().parse(schemaString);
     } else {
-      return new AvroSchemaConverter().convert(footer.getFileMetaData().getSchema());
+      MessageType parquetSchema = footer.getFileMetaData().getSchema();
+      return new AvroSchemaConverter().convert(parquetSchema);
     }
   }
 
-  private static Configuration getConfiguration() {
+  private static Configuration getParquetAvroReaderConfiguration() {
     // The file path used in ParquetRecordReader is a local file path without prefix 'file:///',
     // so we have to make sure that the configuration item 'fs.defaultFS' is set to 'file:///'
     // in case that user's hadoop conf overwrite this item
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index ec9beed..ccc7909 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.plugin.inputformat.parquet;
 
+import com.google.common.collect.ImmutableSet;
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -30,11 +32,16 @@ import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
 
 public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
   private final File _dataFile = new File(_tempDir, "data.parquet");
+  private final File _testParquetFileWithInt96AndDecimal =
+      new File(getClass().getClassLoader().getResource("test-file-with-int96-and-decimal.snappy.parquet").getFile());
 
   @Override
   protected RecordReader createRecordReader()
@@ -57,10 +64,66 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
       records.add(record);
     }
     try (ParquetWriter<GenericRecord> writer = ParquetUtils
-        .getParquetWriter(new Path(_dataFile.getAbsolutePath()), schema)) {
+        .getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), schema)) {
       for (GenericRecord record : records) {
         writer.write(record);
       }
     }
   }
+
+  @Test
+  public void testParquetAvroRecordReader()
+      throws IOException {
+    ParquetAvroRecordReader avroRecordReader = new ParquetAvroRecordReader();
+    avroRecordReader.init(_dataFile, null, new ParquetRecordReaderConfig());
+    testReadParquetFile(avroRecordReader, SAMPLE_RECORDS_SIZE);
+  }
+
+  private void testReadParquetFile(RecordReader reader, int totalRecords)
+      throws IOException {
+    int numRecordsRead = 0;
+    while (reader.hasNext()) {
+      reader.next();
+      numRecordsRead++;
+    }
+    Assert.assertEquals(numRecordsRead, totalRecords);
+  }
+
+  @Test
+  public void testParquetNativeRecordReader()
+      throws IOException {
+    ParquetNativeRecordReader nativeRecordReader = new ParquetNativeRecordReader();
+    nativeRecordReader.init(_testParquetFileWithInt96AndDecimal, ImmutableSet.of(), new ParquetRecordReaderConfig());
+    testReadParquetFile(nativeRecordReader, 1965);
+    nativeRecordReader.init(_dataFile, ImmutableSet.of(), new ParquetRecordReaderConfig());
+    testReadParquetFile(nativeRecordReader, SAMPLE_RECORDS_SIZE);
+  }
+
+  @Test
+  public void testComparison()
+      throws IOException {
+    testComparison(_dataFile);
+  }
+
+  private void testComparison(File dataFile)
+      throws IOException {
+    final ParquetRecordReader avroRecordReader = new ParquetRecordReader();
+    avroRecordReader.init(dataFile, null, null);
+    final ParquetRecordReader nativeRecordReader = new ParquetRecordReader();
+    ParquetRecordReaderConfig parquetRecordReaderConfig = new ParquetRecordReaderConfig();
+    parquetRecordReaderConfig.setUseParquetAvroRecordReader(false);
+    nativeRecordReader.init(dataFile, null, parquetRecordReaderConfig);
+    Assert.assertTrue(avroRecordReader.useAvroParquetRecordReader());
+    Assert.assertFalse(nativeRecordReader.useAvroParquetRecordReader());
+
+    GenericRow avroReuse = new GenericRow();
+    GenericRow nativeReuse = new GenericRow();
+    while (avroRecordReader.hasNext()) {
+      Assert.assertTrue(nativeRecordReader.hasNext());
+      final GenericRow avroReaderRow = avroRecordReader.next(avroReuse);
+      final GenericRow nativeReaderRow = nativeRecordReader.next(nativeReuse);
+      Assert.assertEquals(nativeReaderRow.toString(), avroReaderRow.toString());
+      Assert.assertTrue(avroReaderRow.equals(nativeReaderRow));
+    }
+  }
 }
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-file-with-int96-and-decimal.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-file-with-int96-and-decimal.snappy.parquet
new file mode 100644
index 0000000..7818851
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-file-with-int96-and-decimal.snappy.parquet differ
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index 72e5c52..b4fca64 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -167,11 +167,62 @@ public class GenericRow implements Serializable {
     }
     if (obj instanceof GenericRow) {
       GenericRow that = (GenericRow) obj;
-      return _fieldToValueMap.equals(that._fieldToValueMap) && _nullValueFields.equals(that._nullValueFields);
+      if (!_nullValueFields.containsAll(that._nullValueFields) || !that._nullValueFields
+          .containsAll(_nullValueFields)) {
+        return false;
+      }
+      return compareMap(_fieldToValueMap, that._fieldToValueMap);
     }
     return false;
   }
 
+  private boolean compareMap(Map<String, Object> thisMap, Map<String, Object> thatMap) {
+    if (thisMap.size() == thatMap.size()) {
+      for (String key : thisMap.keySet()) {
+        Object fieldValue = thisMap.get(key);
+        Object thatFieldValue = thatMap.get(key);
+        if (fieldValue == null) {
+          if (thatFieldValue != null) {
+            return false;
+          }
+        } else if (!fieldValue.equals(thatFieldValue)) {
+          if (fieldValue instanceof Map && thatFieldValue instanceof Map) {
+            return compareMap((Map<String, Object>) fieldValue, (Map<String, Object>) thatFieldValue);
+          }
+          if (fieldValue.getClass().isArray() && thatFieldValue.getClass().isArray()) {
+            return compareArray((Object[]) fieldValue, (Object[]) thatFieldValue);
+          }
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  private boolean compareArray(Object[] fieldValue, Object[] thatFieldValue) {
+    for (int i = 0; i < fieldValue.length; i++) {
+      if (fieldValue[i] instanceof Map) {
+        if (!(thatFieldValue[i] instanceof Map)) {
+          return false;
+        }
+        if (!compareMap((Map<String, Object>) fieldValue[i], (Map<String, Object>) thatFieldValue[i])) {
+          return false;
+        }
+        continue;
+      }
+      if (fieldValue[i].getClass().isArray()) {
+        if (!compareArray((Object[]) fieldValue[i], (Object[]) thatFieldValue[i])) {
+          return false;
+        }
+        continue;
+      }
+      if (!fieldValue[i].equals(thatFieldValue[i])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   @Override
   public String toString() {
     try {
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
index 91035cd..91efd6c 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
@@ -39,7 +39,7 @@ import org.testng.collections.Lists;
 
 public abstract class AbstractRecordReaderTest {
   private final static Random RANDOM = new Random(System.currentTimeMillis());
-  private final static int SAMPLE_RECORDS_SIZE = 10000;
+  protected final static int SAMPLE_RECORDS_SIZE = 10000;
 
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), "RecordReaderTest");
   protected List<Map<String, Object>> _records;
@@ -155,6 +155,9 @@ public abstract class AbstractRecordReaderTest {
   @BeforeClass
   public void setUp()
       throws Exception {
+    if (_tempDir.exists()) {
+      FileUtils.cleanDirectory(_tempDir);
+    }
     FileUtils.forceMkdir(_tempDir);
     // Generate Pinot schema
     _pinotSchema = getPinotSchema();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org