You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/02/18 08:02:55 UTC

[incubator-pinot] branch master updated: Adding native parquet record reader support (#6525)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d6fd42d  Adding native parquet record reader support (#6525)
d6fd42d is described below

commit d6fd42d7883ceed4f6e207dfae7b7b809d2c2c5c
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Thu Feb 18 00:02:34 2021 -0800

    Adding native parquet record reader support (#6525)
    
    * Adding native parquet record reader support
    
    * Adding more test data
    
    * Address comments
---
 ...ordReader.java => ParquetAvroRecordReader.java} |  13 +-
 .../parquet/ParquetNativeRecordExtractor.java      | 263 +++++++++++++++++++++
 .../parquet/ParquetNativeRecordReader.java         | 129 ++++++++++
 .../inputformat/parquet/ParquetRecordReader.java   |  41 ++--
 .../parquet/ParquetRecordReaderConfig.java         |  52 ++++
 .../plugin/inputformat/parquet/ParquetUtils.java   |  24 +-
 .../parquet/ParquetRecordReaderTest.java           |  77 +++++-
 .../src/test/resources/airlineStats.snappy.parquet | Bin 0 -> 1095802 bytes
 .../test/resources/baseballStats.snappy.parquet    | Bin 0 -> 1993064 bytes
 .../src/test/resources/githubActivities.gz.parquet | Bin 0 -> 1610474 bytes
 .../src/test/resources/githubEvents.snappy.parquet | Bin 0 -> 4537684 bytes
 .../test/resources/starbucksStores.snappy.parquet  | Bin 0 -> 451742 bytes
 .../src/test/resources/test-comparison.gz.parquet  | Bin 0 -> 10617970 bytes
 .../test/resources/test-comparison.snappy.parquet  | Bin 0 -> 18350 bytes
 ...test-file-with-int96-and-decimal.snappy.parquet | Bin 0 -> 19659 bytes
 .../pinot-parquet/src/test/resources/users.parquet | Bin 0 -> 4065 bytes
 .../apache/pinot/spi/data/readers/GenericRow.java  |  57 ++++-
 .../spi/data/readers/AbstractRecordReaderTest.java |   5 +-
 18 files changed, 622 insertions(+), 39 deletions(-)

diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
similarity index 78%
copy from pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
copy to pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
index 10ce618..9c494f0 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
@@ -32,9 +32,14 @@ import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 
 
 /**
- * Record reader for Parquet file.
+ * Avro Record reader for Parquet file. This reader doesn't read parquet file with incompatible Avro schemas,
+ * e.g. INT96, DECIMAL. Please use {@link org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader}
+ * instead.<p><p>
+ * For More info on Avro to Parquet schema conversion:
+ * <a href="https://javadoc.io/doc/org.apache.parquet/parquet-avro/latest/index.html">
+ *   https://javadoc.io/doc/org.apache.parquet/parquet-avro/latest/index.html</a>
  */
-public class ParquetRecordReader implements RecordReader {
+public class ParquetAvroRecordReader implements RecordReader {
   private Path _dataFilePath;
   private AvroRecordExtractor _recordExtractor;
   private ParquetReader<GenericRecord> _parquetReader;
@@ -44,7 +49,7 @@ public class ParquetRecordReader implements RecordReader {
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
     _dataFilePath = new Path(dataFile.getAbsolutePath());
-    _parquetReader = ParquetUtils.getParquetReader(_dataFilePath);
+    _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
     _recordExtractor = new AvroRecordExtractor();
     _recordExtractor.init(fieldsToRead, null);
     _nextRecord = _parquetReader.read();
@@ -73,7 +78,7 @@ public class ParquetRecordReader implements RecordReader {
   public void rewind()
       throws IOException {
     _parquetReader.close();
-    _parquetReader = ParquetUtils.getParquetReader(_dataFilePath);
+    _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
     _nextRecord = _parquetReader.read();
   }
 
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
new file mode 100644
index 0000000..46b989e
--- /dev/null
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+/**
+ * ParquetNativeRecordExtractor extract values from Parquet {@link Group}.
+ */
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+  public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  private Set<String> _fields;
+  private boolean _extractAll = false;
+
+  public static BigDecimal binaryToDecimal(Binary value, int precision, int scale) {
+    /*
+     * Precision <= 18 checks for the max number of digits for an unscaled long,
+     * else treat with big integer conversion
+     */
+    if (precision <= 18) {
+      ByteBuffer buffer = value.toByteBuffer();
+      byte[] bytes = buffer.array();
+      int start = buffer.arrayOffset() + buffer.position();
+      int end = buffer.arrayOffset() + buffer.limit();
+      long unscaled = 0L;
+      int i = start;
+      while (i < end) {
+        unscaled = (unscaled << 8 | bytes[i] & 0xff);
+        i++;
+      }
+      int bits = 8 * (end - start);
+      long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits);
+      if (unscaledNew <= -pow(10, 18) || unscaledNew >= pow(10, 18)) {
+        return new BigDecimal(unscaledNew);
+      } else {
+        return BigDecimal.valueOf(unscaledNew / pow(10, scale));
+      }
+    } else {
+      return new BigDecimal(new BigInteger(value.getBytes()), scale);
+    }
+  }
+
+  @Override
+  public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+    if (fields == null || fields.isEmpty()) {
+      _extractAll = true;
+      _fields = Collections.emptySet();
+    } else {
+      _fields = ImmutableSet.copyOf(fields);
+    }
+  }
+
+  @Override
+  public GenericRow extract(Group from, GenericRow to) {
+    GroupType fromType = from.getType();
+    if (_extractAll) {
+      List<Type> fields = fromType.getFields();
+      for (Type field : fields) {
+        String fieldName = field.getName();
+        Object value = extractValue(from, fromType.getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    } else {
+      for (String fieldName : _fields) {
+        Object value = extractValue(from, fromType.getFieldIndex(fieldName));
+        if (value != null) {
+          value = convert(value);
+        }
+        to.putValue(fieldName, value);
+      }
+    }
+    return to;
+  }
+
+  private Object extractValue(Group from, int fieldIndex) {
+    int valueCount = from.getFieldRepetitionCount(fieldIndex);
+    Type fieldType = from.getType().getType(fieldIndex);
+    if (valueCount == 0) {
+      return null;
+    }
+    if (valueCount == 1) {
+      return extractValue(from, fieldIndex, fieldType, 0);
+    }
+    // For multi-value (repeated field)
+    Object[] results = new Object[valueCount];
+    for (int index = 0; index < valueCount; index++) {
+      results[index] = extractValue(from, fieldIndex, fieldType, index);
+    }
+    return results;
+  }
+
+  private Object extractValue(Group from, int fieldIndex, Type fieldType, int index) {
+    OriginalType originalType = fieldType.getOriginalType();
+    if (fieldType.isPrimitive()) {
+      switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+        case INT32:
+          return from.getInteger(fieldIndex, index);
+        case INT64:
+          return from.getLong(fieldIndex, index);
+        case FLOAT:
+          return from.getFloat(fieldIndex, index);
+        case DOUBLE:
+          return from.getDouble(fieldIndex, index);
+        case BOOLEAN:
+          return from.getValueToString(fieldIndex, index);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          if (originalType == OriginalType.UTF8) {
+            return from.getValueToString(fieldIndex, index);
+          }
+          if (originalType == OriginalType.DECIMAL) {
+            DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+            return binaryToDecimal(from.getBinary(fieldIndex, index), decimalMetadata.getPrecision(),
+                decimalMetadata.getScale());
+          }
+          return from.getBinary(fieldIndex, index).getBytes();
+        case INT96:
+          Binary int96 = from.getInt96(fieldIndex, index);
+          ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+          long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+              + buf.getLong(0) / NANOS_PER_MILLISECOND;
+          return dateTime;
+      }
+    } else if ((fieldType.isRepetition(Type.Repetition.OPTIONAL)) || (fieldType.isRepetition(Type.Repetition.REQUIRED))
+        || (fieldType.isRepetition(Type.Repetition.REPEATED))) {
+      Group group = from.getGroup(fieldIndex, index);
+      if (originalType == OriginalType.LIST) {
+        return extractList(group);
+      }
+      return extractMap(group);
+    }
+    return null;
+  }
+
+  public Object[] extractList(Group group) {
+    int repFieldCount = group.getType().getFieldCount();
+    if (repFieldCount < 1) {
+      return null;
+    }
+    Object[] list = new Object[repFieldCount];
+    for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+      list[repFieldIdx] = extractValue(group, repFieldIdx);
+    }
+    if (repFieldCount == 1 && list[0] == null) {
+      return null;
+    }
+    if (repFieldCount == 1 && list[0].getClass().isArray()) {
+      return (Object[]) list[0];
+    }
+    return list;
+  }
+
+  public Map<String, Object> extractMap(Group group) {
+    final int repFieldCount = group.getType().getFieldCount();
+    if (repFieldCount < 1) {
+      return null;
+    }
+    Map<String, Object> resultMap = new HashMap<>();
+    for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+      Object value = extractValue(group, repFieldIdx);
+      resultMap.put(group.getType().getType(repFieldIdx).getName(), value);
+    }
+    return resultMap;
+  }
+
+  @Override
+  public Object convertMap(Object value) {
+    Map<Object, Object> map = (Map) value;
+    if (map.isEmpty()) {
+      return null;
+    }
+    Map<Object, Object> convertedMap = new HashMap<>();
+    for (Map.Entry<Object, Object> entry : map.entrySet()) {
+      Object mapKey = entry.getKey();
+      Object mapValue = entry.getValue();
+      if (mapKey != null) {
+        Object convertedMapValue = null;
+        if (mapValue != null) {
+          convertedMapValue = convert(mapValue);
+        }
+        convertedMap.put(convertSingleValue(entry.getKey()), convertedMapValue);
+      }
+    }
+    if (convertedMap.isEmpty()) {
+      return null;
+    }
+    return convertedMap;
+  }
+
+  @Override
+  public boolean isMultiValue(Object value) {
+    if (super.isMultiValue(value)) {
+      return true;
+    }
+    if (value instanceof byte[]) {
+      return false;
+    }
+    return value.getClass().isArray();
+  }
+
+  @Nullable
+  @Override
+  protected Object convertMultiValue(Object value) {
+    if (value instanceof Collection) {
+      return super.convertMultiValue(value);
+    }
+    // value is Object[]
+    Object[] values = (Object[]) value;
+    return super.convertMultiValue(Arrays.asList(values));
+  }
+}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
new file mode 100644
index 0000000..3c55137
--- /dev/null
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.schema.MessageType;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+/**
+ * Record reader for Native Parquet file.
+ */
+public class ParquetNativeRecordReader implements RecordReader {
+  private Path _dataFilePath;
+  private ParquetNativeRecordExtractor _recordExtractor;
+  private MessageType _schema;
+  private ParquetMetadata _parquetMetadata;
+  private ParquetFileReader _parquetFileReader;
+  private Group _nextRecord;
+  private PageReadStore _pageReadStore;
+  private MessageColumnIO _columnIO;
+  private org.apache.parquet.io.RecordReader _parquetRecordReader;
+  private int _currentPageIdx;
+
+  @Override
+  public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
+      throws IOException {
+    _dataFilePath = new Path(dataFile.getAbsolutePath());
+    Configuration conf = new Configuration();
+    _parquetMetadata = ParquetFileReader.readFooter(conf, _dataFilePath, ParquetMetadataConverter.NO_FILTER);
+    _recordExtractor = new ParquetNativeRecordExtractor();
+    _recordExtractor.init(fieldsToRead, null);
+    _schema = _parquetMetadata.getFileMetaData().getSchema();
+    _parquetFileReader =
+        new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(), _dataFilePath, _parquetMetadata.getBlocks(),
+            _schema.getColumns());
+    _pageReadStore = _parquetFileReader.readNextRowGroup();
+    _columnIO = new ColumnIOFactory().getColumnIO(_schema);
+    _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
+    _currentPageIdx = 0;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (_pageReadStore == null) {
+      return false;
+    }
+    if (_pageReadStore.getRowCount() - _currentPageIdx >= 1) {
+      // System.out.println("_pageReadStore.getRowCount() = " + _pageReadStore.getRowCount() + ", _currentPageIdx = " + _currentPageIdx);
+      return true;
+    }
+    try {
+      _pageReadStore = _parquetFileReader.readNextRowGroup();
+      _currentPageIdx = 0;
+      if (_pageReadStore == null) {
+        return false;
+      }
+      _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return hasNext();
+  }
+
+  @Override
+  public GenericRow next()
+      throws IOException {
+    return next(new GenericRow());
+  }
+
+  @Override
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    _nextRecord = (Group) _parquetRecordReader.read();
+    _recordExtractor.extract(_nextRecord, reuse);
+    _currentPageIdx++;
+    return reuse;
+  }
+
+  @Override
+  public void rewind()
+      throws IOException {
+    _parquetFileReader.close();
+    Configuration conf = new Configuration();
+    _parquetFileReader =
+        new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(), _dataFilePath, _parquetMetadata.getBlocks(),
+            _schema.getColumns());
+    _pageReadStore = _parquetFileReader.readNextRowGroup();
+    _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
+    _currentPageIdx = 0;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _parquetFileReader.close();
+  }
+}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
index 10ce618..790e97f 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
@@ -22,37 +22,34 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Set;
 import javax.annotation.Nullable;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.ParquetReader;
-import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 
 
 /**
- * Record reader for Parquet file.
+ * Pinot Record reader for Parquet file.<p>
+ * It has two implementations: {@link ParquetAvroRecordReader} (Default) and {@link ParquetNativeRecordReader}.
  */
 public class ParquetRecordReader implements RecordReader {
-  private Path _dataFilePath;
-  private AvroRecordExtractor _recordExtractor;
-  private ParquetReader<GenericRecord> _parquetReader;
-  private GenericRecord _nextRecord;
+  private RecordReader _internalParquetRecordReader;
+  private boolean _useAvroParquetRecordReader = true;
 
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
-    _dataFilePath = new Path(dataFile.getAbsolutePath());
-    _parquetReader = ParquetUtils.getParquetReader(_dataFilePath);
-    _recordExtractor = new AvroRecordExtractor();
-    _recordExtractor.init(fieldsToRead, null);
-    _nextRecord = _parquetReader.read();
+    if (recordReaderConfig == null || ((ParquetRecordReaderConfig) recordReaderConfig).useParquetAvroRecordReader()) {
+      _internalParquetRecordReader = new ParquetAvroRecordReader();
+    } else {
+      _useAvroParquetRecordReader = false;
+      _internalParquetRecordReader = new ParquetNativeRecordReader();
+    }
+    _internalParquetRecordReader.init(dataFile, fieldsToRead, recordReaderConfig);
   }
 
   @Override
   public boolean hasNext() {
-    return _nextRecord != null;
+    return _internalParquetRecordReader.hasNext();
   }
 
   @Override
@@ -64,22 +61,22 @@ public class ParquetRecordReader implements RecordReader {
   @Override
   public GenericRow next(GenericRow reuse)
       throws IOException {
-    _recordExtractor.extract(_nextRecord, reuse);
-    _nextRecord = _parquetReader.read();
-    return reuse;
+    return _internalParquetRecordReader.next(reuse);
   }
 
   @Override
   public void rewind()
       throws IOException {
-    _parquetReader.close();
-    _parquetReader = ParquetUtils.getParquetReader(_dataFilePath);
-    _nextRecord = _parquetReader.read();
+    _internalParquetRecordReader.rewind();
   }
 
   @Override
   public void close()
       throws IOException {
-    _parquetReader.close();
+    _internalParquetRecordReader.close();
+  }
+
+  public boolean useAvroParquetRecordReader() {
+    return _useAvroParquetRecordReader;
   }
 }
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java
new file mode 100644
index 0000000..d6bdce2
--- /dev/null
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+/**
+ * Config for ParquetRecordReader
+ */
+public class ParquetRecordReaderConfig implements RecordReaderConfig {
+  private static final String USE_PARQUET_AVRO_RECORDER_READER = "useParquetAvroRecordReader";
+  private boolean _useParquetAvroRecordReader = true;
+  private Configuration _conf;
+
+  public ParquetRecordReaderConfig() {
+  }
+
+  public ParquetRecordReaderConfig(Configuration conf) {
+    _conf = conf;
+    _useParquetAvroRecordReader = conf.getBoolean(USE_PARQUET_AVRO_RECORDER_READER, true);
+  }
+
+  public boolean useParquetAvroRecordReader() {
+    return _useParquetAvroRecordReader;
+  }
+
+  public void setUseParquetAvroRecordReader(boolean useParquetAvroRecordReader) {
+    _useParquetAvroRecordReader = useParquetAvroRecordReader;
+  }
+
+  public Configuration getConfig() {
+    return _conf;
+  }
+}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
index f2a49b1..5f3dd81 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
@@ -33,38 +33,41 @@ import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
 
 
 public class ParquetUtils {
+  private static final String DEFAULT_FS = "file:///";
+
   private ParquetUtils() {
   }
 
-  private static final String DEFAULT_FS = "file:///";
-
   /**
    * Returns a ParquetReader with the given path.
    */
-  public static ParquetReader<GenericRecord> getParquetReader(Path path)
+  public static ParquetReader<GenericRecord> getParquetAvroReader(Path path)
       throws IOException {
     //noinspection unchecked
     return AvroParquetReader.<GenericRecord>builder(path).disableCompatibility().withDataModel(GenericData.get())
-        .withConf(getConfiguration()).build();
+        .withConf(getParquetAvroReaderConfiguration()).build();
   }
 
   /**
    * Returns a ParquetWriter with the given path and schema.
    */
-  public static ParquetWriter<GenericRecord> getParquetWriter(Path path, Schema schema)
+  public static ParquetWriter<GenericRecord> getParquetAvroWriter(Path path, Schema schema)
       throws IOException {
-    return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema).withConf(getConfiguration()).build();
+    return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema)
+        .withConf(getParquetAvroReaderConfiguration()).build();
   }
 
   /**
    * Returns the schema for the given Parquet file path.
    */
-  public static Schema getParquetSchema(Path path)
+  public static Schema getParquetAvroSchema(Path path)
       throws IOException {
-    ParquetMetadata footer = ParquetFileReader.readFooter(getConfiguration(), path, ParquetMetadataConverter.NO_FILTER);
+    ParquetMetadata footer =
+        ParquetFileReader.readFooter(getParquetAvroReaderConfiguration(), path, ParquetMetadataConverter.NO_FILTER);
     Map<String, String> metaData = footer.getFileMetaData().getKeyValueMetaData();
     String schemaString = metaData.get("parquet.avro.schema");
     if (schemaString == null) {
@@ -74,11 +77,12 @@ public class ParquetUtils {
     if (schemaString != null) {
       return new Schema.Parser().parse(schemaString);
     } else {
-      return new AvroSchemaConverter().convert(footer.getFileMetaData().getSchema());
+      MessageType parquetSchema = footer.getFileMetaData().getSchema();
+      return new AvroSchemaConverter().convert(parquetSchema);
     }
   }
 
-  private static Configuration getConfiguration() {
+  private static Configuration getParquetAvroReaderConfiguration() {
     // The file path used in ParquetRecordReader is a local file path without prefix 'file:///',
     // so we have to make sure that the configuration item 'fs.defaultFS' is set to 'file:///'
     // in case that user's hadoop conf overwrite this item
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index ec9beed..f39df45 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.plugin.inputformat.parquet;
 
+import com.google.common.collect.ImmutableSet;
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -30,11 +32,16 @@ import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
 
 public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
   private final File _dataFile = new File(_tempDir, "data.parquet");
+  private final File _testParquetFileWithInt96AndDecimal =
+      new File(getClass().getClassLoader().getResource("test-file-with-int96-and-decimal.snappy.parquet").getFile());
 
   @Override
   protected RecordReader createRecordReader()
@@ -57,10 +64,78 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
       records.add(record);
     }
     try (ParquetWriter<GenericRecord> writer = ParquetUtils
-        .getParquetWriter(new Path(_dataFile.getAbsolutePath()), schema)) {
+        .getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), schema)) {
       for (GenericRecord record : records) {
         writer.write(record);
       }
     }
   }
+
+  @Test
+  public void testParquetAvroRecordReader()
+      throws IOException {
+    ParquetAvroRecordReader avroRecordReader = new ParquetAvroRecordReader();
+    avroRecordReader.init(_dataFile, null, new ParquetRecordReaderConfig());
+    testReadParquetFile(avroRecordReader, SAMPLE_RECORDS_SIZE);
+  }
+
+  private void testReadParquetFile(RecordReader reader, int totalRecords)
+      throws IOException {
+    int numRecordsRead = 0;
+    while (reader.hasNext()) {
+      reader.next();
+      numRecordsRead++;
+    }
+    Assert.assertEquals(numRecordsRead, totalRecords);
+  }
+
+  @Test
+  public void testParquetNativeRecordReader()
+      throws IOException {
+    ParquetNativeRecordReader nativeRecordReader = new ParquetNativeRecordReader();
+    nativeRecordReader.init(_testParquetFileWithInt96AndDecimal, ImmutableSet.of(), new ParquetRecordReaderConfig());
+    testReadParquetFile(nativeRecordReader, 1965);
+    nativeRecordReader.init(_dataFile, ImmutableSet.of(), new ParquetRecordReaderConfig());
+    testReadParquetFile(nativeRecordReader, SAMPLE_RECORDS_SIZE);
+  }
+
+  @Test
+  public void testComparison()
+      throws IOException {
+    testComparison(_dataFile, SAMPLE_RECORDS_SIZE);
+    testComparison(new File(getClass().getClassLoader().getResource("users.parquet").getFile()), 1);
+    testComparison(new File(getClass().getClassLoader().getResource("test-comparison.gz.parquet").getFile()), 363667);
+    testComparison(new File(getClass().getClassLoader().getResource("test-comparison.snappy.parquet").getFile()), 2870);
+    testComparison(new File(getClass().getClassLoader().getResource("baseballStats.snappy.parquet").getFile()), 97889);
+    testComparison(new File(getClass().getClassLoader().getResource("githubEvents.snappy.parquet").getFile()), 10000);
+    testComparison(new File(getClass().getClassLoader().getResource("starbucksStores.snappy.parquet").getFile()), 6443);
+    testComparison(new File(getClass().getClassLoader().getResource("airlineStats.snappy.parquet").getFile()), 19492);
+    testComparison(new File(getClass().getClassLoader().getResource("githubActivities.gz.parquet").getFile()), 2000);
+  }
+
+  private void testComparison(File dataFile, int totalRecords)
+      throws IOException {
+    final ParquetRecordReader avroRecordReader = new ParquetRecordReader();
+    avroRecordReader.init(dataFile, null, null);
+    final ParquetRecordReader nativeRecordReader = new ParquetRecordReader();
+    ParquetRecordReaderConfig parquetRecordReaderConfig = new ParquetRecordReaderConfig();
+    parquetRecordReaderConfig.setUseParquetAvroRecordReader(false);
+    nativeRecordReader.init(dataFile, null, parquetRecordReaderConfig);
+    Assert.assertTrue(avroRecordReader.useAvroParquetRecordReader());
+    Assert.assertFalse(nativeRecordReader.useAvroParquetRecordReader());
+
+    GenericRow avroReuse = new GenericRow();
+    GenericRow nativeReuse = new GenericRow();
+    int recordsRead = 0;
+    while (avroRecordReader.hasNext()) {
+      Assert.assertTrue(nativeRecordReader.hasNext());
+      final GenericRow avroReaderRow = avroRecordReader.next(avroReuse);
+      final GenericRow nativeReaderRow = nativeRecordReader.next(nativeReuse);
+      Assert.assertEquals(nativeReaderRow.toString(), avroReaderRow.toString());
+      Assert.assertTrue(avroReaderRow.equals(nativeReaderRow));
+      recordsRead++;
+    }
+    Assert.assertEquals(recordsRead, totalRecords,
+        "Message read from ParquetRecordReader doesn't match the expected number.");
+  }
 }
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/airlineStats.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/airlineStats.snappy.parquet
new file mode 100644
index 0000000..9d1954b
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/airlineStats.snappy.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/baseballStats.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/baseballStats.snappy.parquet
new file mode 100644
index 0000000..931f882
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/baseballStats.snappy.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubActivities.gz.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubActivities.gz.parquet
new file mode 100644
index 0000000..1723e2f
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubActivities.gz.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubEvents.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubEvents.snappy.parquet
new file mode 100644
index 0000000..6bd49d0
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubEvents.snappy.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/starbucksStores.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/starbucksStores.snappy.parquet
new file mode 100644
index 0000000..c3b3efc
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/starbucksStores.snappy.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.gz.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.gz.parquet
new file mode 100644
index 0000000..160c659
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.gz.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.snappy.parquet
new file mode 100644
index 0000000..8aed9b4
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.snappy.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-file-with-int96-and-decimal.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-file-with-int96-and-decimal.snappy.parquet
new file mode 100644
index 0000000..7818851
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-file-with-int96-and-decimal.snappy.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/users.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/users.parquet
new file mode 100644
index 0000000..58ed9a8
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/users.parquet differ
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index 72e5c52..b3a79ce 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -167,11 +168,65 @@ public class GenericRow implements Serializable {
     }
     if (obj instanceof GenericRow) {
       GenericRow that = (GenericRow) obj;
-      return _fieldToValueMap.equals(that._fieldToValueMap) && _nullValueFields.equals(that._nullValueFields);
+      if (!_nullValueFields.containsAll(that._nullValueFields) || !that._nullValueFields
+          .containsAll(_nullValueFields)) {
+        return false;
+      }
+      return compareMap(_fieldToValueMap, that._fieldToValueMap);
     }
     return false;
   }
 
+  private boolean compareMap(Map<String, Object> thisMap, Map<String, Object> thatMap) {
+    if (thisMap.size() == thatMap.size()) {
+      for (String key : thisMap.keySet()) {
+        Object fieldValue = thisMap.get(key);
+        Object thatFieldValue = thatMap.get(key);
+        if (fieldValue == null) {
+          if (thatFieldValue != null) {
+            return false;
+          }
+        } else if (!fieldValue.equals(thatFieldValue)) {
+          if (fieldValue instanceof Map && thatFieldValue instanceof Map) {
+            return compareMap((Map<String, Object>) fieldValue, (Map<String, Object>) thatFieldValue);
+          }
+          if ((fieldValue instanceof byte[]) && (thatFieldValue instanceof byte[])) {
+            return Arrays.equals((byte[]) fieldValue, (byte[]) thatFieldValue);
+          }
+          if (fieldValue.getClass().isArray() && thatFieldValue.getClass().isArray()) {
+            return compareArray((Object[]) fieldValue, (Object[]) thatFieldValue);
+          }
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  private boolean compareArray(Object[] fieldValue, Object[] thatFieldValue) {
+    for (int i = 0; i < fieldValue.length; i++) {
+      if (fieldValue[i] instanceof Map) {
+        if (!(thatFieldValue[i] instanceof Map)) {
+          return false;
+        }
+        if (!compareMap((Map<String, Object>) fieldValue[i], (Map<String, Object>) thatFieldValue[i])) {
+          return false;
+        }
+        continue;
+      }
+      if (fieldValue[i].getClass().isArray()) {
+        if (!compareArray((Object[]) fieldValue[i], (Object[]) thatFieldValue[i])) {
+          return false;
+        }
+        continue;
+      }
+      if (!fieldValue[i].equals(thatFieldValue[i])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   @Override
   public String toString() {
     try {
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
index 91035cd..91efd6c 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
@@ -39,7 +39,7 @@ import org.testng.collections.Lists;
 
 public abstract class AbstractRecordReaderTest {
   private final static Random RANDOM = new Random(System.currentTimeMillis());
-  private final static int SAMPLE_RECORDS_SIZE = 10000;
+  protected final static int SAMPLE_RECORDS_SIZE = 10000;
 
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), "RecordReaderTest");
   protected List<Map<String, Object>> _records;
@@ -155,6 +155,9 @@ public abstract class AbstractRecordReaderTest {
   @BeforeClass
   public void setUp()
       throws Exception {
+    if (_tempDir.exists()) {
+      FileUtils.cleanDirectory(_tempDir);
+    }
     FileUtils.forceMkdir(_tempDir);
     // Generate Pinot schema
     _pinotSchema = getPinotSchema();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org