You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/02/18 08:02:55 UTC
[incubator-pinot] branch master updated: Adding native parquet
record reader support (#6525)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d6fd42d Adding native parquet record reader support (#6525)
d6fd42d is described below
commit d6fd42d7883ceed4f6e207dfae7b7b809d2c2c5c
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Thu Feb 18 00:02:34 2021 -0800
Adding native parquet record reader support (#6525)
* Adding native parquet record reader support
* Adding more test data
* Address comments
---
...ordReader.java => ParquetAvroRecordReader.java} | 13 +-
.../parquet/ParquetNativeRecordExtractor.java | 263 +++++++++++++++++++++
.../parquet/ParquetNativeRecordReader.java | 129 ++++++++++
.../inputformat/parquet/ParquetRecordReader.java | 41 ++--
.../parquet/ParquetRecordReaderConfig.java | 52 ++++
.../plugin/inputformat/parquet/ParquetUtils.java | 24 +-
.../parquet/ParquetRecordReaderTest.java | 77 +++++-
.../src/test/resources/airlineStats.snappy.parquet | Bin 0 -> 1095802 bytes
.../test/resources/baseballStats.snappy.parquet | Bin 0 -> 1993064 bytes
.../src/test/resources/githubActivities.gz.parquet | Bin 0 -> 1610474 bytes
.../src/test/resources/githubEvents.snappy.parquet | Bin 0 -> 4537684 bytes
.../test/resources/starbucksStores.snappy.parquet | Bin 0 -> 451742 bytes
.../src/test/resources/test-comparison.gz.parquet | Bin 0 -> 10617970 bytes
.../test/resources/test-comparison.snappy.parquet | Bin 0 -> 18350 bytes
...test-file-with-int96-and-decimal.snappy.parquet | Bin 0 -> 19659 bytes
.../pinot-parquet/src/test/resources/users.parquet | Bin 0 -> 4065 bytes
.../apache/pinot/spi/data/readers/GenericRow.java | 57 ++++-
.../spi/data/readers/AbstractRecordReaderTest.java | 5 +-
18 files changed, 622 insertions(+), 39 deletions(-)
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
similarity index 78%
copy from pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
copy to pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
index 10ce618..9c494f0 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
@@ -32,9 +32,14 @@ import org.apache.pinot.spi.data.readers.RecordReaderConfig;
/**
- * Record reader for Parquet file.
+ * Avro Record reader for Parquet file. This reader doesn't read parquet file with incompatible Avro schemas,
+ * e.g. INT96, DECIMAL. Please use {@link org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader}
+ * instead.<p><p>
+ * For More info on Avro to Parquet schema conversion:
+ * <a href="https://javadoc.io/doc/org.apache.parquet/parquet-avro/latest/index.html">
+ * https://javadoc.io/doc/org.apache.parquet/parquet-avro/latest/index.html</a>
*/
-public class ParquetRecordReader implements RecordReader {
+public class ParquetAvroRecordReader implements RecordReader {
private Path _dataFilePath;
private AvroRecordExtractor _recordExtractor;
private ParquetReader<GenericRecord> _parquetReader;
@@ -44,7 +49,7 @@ public class ParquetRecordReader implements RecordReader {
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
_dataFilePath = new Path(dataFile.getAbsolutePath());
- _parquetReader = ParquetUtils.getParquetReader(_dataFilePath);
+ _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
_recordExtractor = new AvroRecordExtractor();
_recordExtractor.init(fieldsToRead, null);
_nextRecord = _parquetReader.read();
@@ -73,7 +78,7 @@ public class ParquetRecordReader implements RecordReader {
public void rewind()
throws IOException {
_parquetReader.close();
- _parquetReader = ParquetUtils.getParquetReader(_dataFilePath);
+ _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
_nextRecord = _parquetReader.read();
}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
new file mode 100644
index 0000000..46b989e
--- /dev/null
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.joda.time.DateTimeConstants;
+
+import static java.lang.Math.pow;
+
+
+/**
+ * ParquetNativeRecordExtractor extract values from Parquet {@link Group}.
+ */
+public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
+
+ /**
+ * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+ * The value of this constant is {@value}.
+ */
+ public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+
+ public static final long NANOS_PER_MILLISECOND = 1000000;
+
+ private Set<String> _fields;
+ private boolean _extractAll = false;
+
+ public static BigDecimal binaryToDecimal(Binary value, int precision, int scale) {
+ /*
+ * Precision <= 18 checks for the max number of digits for an unscaled long,
+ * else treat with big integer conversion
+ */
+ if (precision <= 18) {
+ ByteBuffer buffer = value.toByteBuffer();
+ byte[] bytes = buffer.array();
+ int start = buffer.arrayOffset() + buffer.position();
+ int end = buffer.arrayOffset() + buffer.limit();
+ long unscaled = 0L;
+ int i = start;
+ while (i < end) {
+ unscaled = (unscaled << 8 | bytes[i] & 0xff);
+ i++;
+ }
+ int bits = 8 * (end - start);
+ long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits);
+ if (unscaledNew <= -pow(10, 18) || unscaledNew >= pow(10, 18)) {
+ return new BigDecimal(unscaledNew);
+ } else {
+ return BigDecimal.valueOf(unscaledNew / pow(10, scale));
+ }
+ } else {
+ return new BigDecimal(new BigInteger(value.getBytes()), scale);
+ }
+ }
+
+ @Override
+ public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) {
+ if (fields == null || fields.isEmpty()) {
+ _extractAll = true;
+ _fields = Collections.emptySet();
+ } else {
+ _fields = ImmutableSet.copyOf(fields);
+ }
+ }
+
+ @Override
+ public GenericRow extract(Group from, GenericRow to) {
+ GroupType fromType = from.getType();
+ if (_extractAll) {
+ List<Type> fields = fromType.getFields();
+ for (Type field : fields) {
+ String fieldName = field.getName();
+ Object value = extractValue(from, fromType.getFieldIndex(fieldName));
+ if (value != null) {
+ value = convert(value);
+ }
+ to.putValue(fieldName, value);
+ }
+ } else {
+ for (String fieldName : _fields) {
+ Object value = extractValue(from, fromType.getFieldIndex(fieldName));
+ if (value != null) {
+ value = convert(value);
+ }
+ to.putValue(fieldName, value);
+ }
+ }
+ return to;
+ }
+
+ private Object extractValue(Group from, int fieldIndex) {
+ int valueCount = from.getFieldRepetitionCount(fieldIndex);
+ Type fieldType = from.getType().getType(fieldIndex);
+ if (valueCount == 0) {
+ return null;
+ }
+ if (valueCount == 1) {
+ return extractValue(from, fieldIndex, fieldType, 0);
+ }
+ // For multi-value (repeated field)
+ Object[] results = new Object[valueCount];
+ for (int index = 0; index < valueCount; index++) {
+ results[index] = extractValue(from, fieldIndex, fieldType, index);
+ }
+ return results;
+ }
+
+ private Object extractValue(Group from, int fieldIndex, Type fieldType, int index) {
+ OriginalType originalType = fieldType.getOriginalType();
+ if (fieldType.isPrimitive()) {
+ switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) {
+ case INT32:
+ return from.getInteger(fieldIndex, index);
+ case INT64:
+ return from.getLong(fieldIndex, index);
+ case FLOAT:
+ return from.getFloat(fieldIndex, index);
+ case DOUBLE:
+ return from.getDouble(fieldIndex, index);
+ case BOOLEAN:
+ return from.getValueToString(fieldIndex, index);
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ if (originalType == OriginalType.UTF8) {
+ return from.getValueToString(fieldIndex, index);
+ }
+ if (originalType == OriginalType.DECIMAL) {
+ DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata();
+ return binaryToDecimal(from.getBinary(fieldIndex, index), decimalMetadata.getPrecision(),
+ decimalMetadata.getScale());
+ }
+ return from.getBinary(fieldIndex, index).getBytes();
+ case INT96:
+ Binary int96 = from.getInt96(fieldIndex, index);
+ ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
+ long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+ + buf.getLong(0) / NANOS_PER_MILLISECOND;
+ return dateTime;
+ }
+ } else if ((fieldType.isRepetition(Type.Repetition.OPTIONAL)) || (fieldType.isRepetition(Type.Repetition.REQUIRED))
+ || (fieldType.isRepetition(Type.Repetition.REPEATED))) {
+ Group group = from.getGroup(fieldIndex, index);
+ if (originalType == OriginalType.LIST) {
+ return extractList(group);
+ }
+ return extractMap(group);
+ }
+ return null;
+ }
+
+ public Object[] extractList(Group group) {
+ int repFieldCount = group.getType().getFieldCount();
+ if (repFieldCount < 1) {
+ return null;
+ }
+ Object[] list = new Object[repFieldCount];
+ for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+ list[repFieldIdx] = extractValue(group, repFieldIdx);
+ }
+ if (repFieldCount == 1 && list[0] == null) {
+ return null;
+ }
+ if (repFieldCount == 1 && list[0].getClass().isArray()) {
+ return (Object[]) list[0];
+ }
+ return list;
+ }
+
+ public Map<String, Object> extractMap(Group group) {
+ final int repFieldCount = group.getType().getFieldCount();
+ if (repFieldCount < 1) {
+ return null;
+ }
+ Map<String, Object> resultMap = new HashMap<>();
+ for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) {
+ Object value = extractValue(group, repFieldIdx);
+ resultMap.put(group.getType().getType(repFieldIdx).getName(), value);
+ }
+ return resultMap;
+ }
+
+ @Override
+ public Object convertMap(Object value) {
+ Map<Object, Object> map = (Map) value;
+ if (map.isEmpty()) {
+ return null;
+ }
+ Map<Object, Object> convertedMap = new HashMap<>();
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
+ Object mapKey = entry.getKey();
+ Object mapValue = entry.getValue();
+ if (mapKey != null) {
+ Object convertedMapValue = null;
+ if (mapValue != null) {
+ convertedMapValue = convert(mapValue);
+ }
+ convertedMap.put(convertSingleValue(entry.getKey()), convertedMapValue);
+ }
+ }
+ if (convertedMap.isEmpty()) {
+ return null;
+ }
+ return convertedMap;
+ }
+
+ @Override
+ public boolean isMultiValue(Object value) {
+ if (super.isMultiValue(value)) {
+ return true;
+ }
+ if (value instanceof byte[]) {
+ return false;
+ }
+ return value.getClass().isArray();
+ }
+
+ @Nullable
+ @Override
+ protected Object convertMultiValue(Object value) {
+ if (value instanceof Collection) {
+ return super.convertMultiValue(value);
+ }
+ // value is Object[]
+ Object[] values = (Object[]) value;
+ return super.convertMultiValue(Arrays.asList(values));
+ }
+}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
new file mode 100644
index 0000000..3c55137
--- /dev/null
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.schema.MessageType;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+/**
+ * Record reader for Native Parquet file.
+ */
+public class ParquetNativeRecordReader implements RecordReader {
+ private Path _dataFilePath;
+ private ParquetNativeRecordExtractor _recordExtractor;
+ private MessageType _schema;
+ private ParquetMetadata _parquetMetadata;
+ private ParquetFileReader _parquetFileReader;
+ private Group _nextRecord;
+ private PageReadStore _pageReadStore;
+ private MessageColumnIO _columnIO;
+ private org.apache.parquet.io.RecordReader _parquetRecordReader;
+ private int _currentPageIdx;
+
+ @Override
+ public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
+ throws IOException {
+ _dataFilePath = new Path(dataFile.getAbsolutePath());
+ Configuration conf = new Configuration();
+ _parquetMetadata = ParquetFileReader.readFooter(conf, _dataFilePath, ParquetMetadataConverter.NO_FILTER);
+ _recordExtractor = new ParquetNativeRecordExtractor();
+ _recordExtractor.init(fieldsToRead, null);
+ _schema = _parquetMetadata.getFileMetaData().getSchema();
+ _parquetFileReader =
+ new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(), _dataFilePath, _parquetMetadata.getBlocks(),
+ _schema.getColumns());
+ _pageReadStore = _parquetFileReader.readNextRowGroup();
+ _columnIO = new ColumnIOFactory().getColumnIO(_schema);
+ _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
+ _currentPageIdx = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (_pageReadStore == null) {
+ return false;
+ }
+ if (_pageReadStore.getRowCount() - _currentPageIdx >= 1) {
+ // System.out.println("_pageReadStore.getRowCount() = " + _pageReadStore.getRowCount() + ", _currentPageIdx = " + _currentPageIdx);
+ return true;
+ }
+ try {
+ _pageReadStore = _parquetFileReader.readNextRowGroup();
+ _currentPageIdx = 0;
+ if (_pageReadStore == null) {
+ return false;
+ }
+ _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return hasNext();
+ }
+
+ @Override
+ public GenericRow next()
+ throws IOException {
+ return next(new GenericRow());
+ }
+
+ @Override
+ public GenericRow next(GenericRow reuse)
+ throws IOException {
+ _nextRecord = (Group) _parquetRecordReader.read();
+ _recordExtractor.extract(_nextRecord, reuse);
+ _currentPageIdx++;
+ return reuse;
+ }
+
+ @Override
+ public void rewind()
+ throws IOException {
+ _parquetFileReader.close();
+ Configuration conf = new Configuration();
+ _parquetFileReader =
+ new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(), _dataFilePath, _parquetMetadata.getBlocks(),
+ _schema.getColumns());
+ _pageReadStore = _parquetFileReader.readNextRowGroup();
+ _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
+ _currentPageIdx = 0;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _parquetFileReader.close();
+ }
+}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
index 10ce618..790e97f 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
@@ -22,37 +22,34 @@ import java.io.File;
import java.io.IOException;
import java.util.Set;
import javax.annotation.Nullable;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.ParquetReader;
-import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
/**
- * Record reader for Parquet file.
+ * Pinot Record reader for Parquet file.<p>
+ * It has two implementations: {@link ParquetAvroRecordReader} (Default) and {@link ParquetNativeRecordReader}.
*/
public class ParquetRecordReader implements RecordReader {
- private Path _dataFilePath;
- private AvroRecordExtractor _recordExtractor;
- private ParquetReader<GenericRecord> _parquetReader;
- private GenericRecord _nextRecord;
+ private RecordReader _internalParquetRecordReader;
+ private boolean _useAvroParquetRecordReader = true;
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
- _dataFilePath = new Path(dataFile.getAbsolutePath());
- _parquetReader = ParquetUtils.getParquetReader(_dataFilePath);
- _recordExtractor = new AvroRecordExtractor();
- _recordExtractor.init(fieldsToRead, null);
- _nextRecord = _parquetReader.read();
+ if (recordReaderConfig == null || ((ParquetRecordReaderConfig) recordReaderConfig).useParquetAvroRecordReader()) {
+ _internalParquetRecordReader = new ParquetAvroRecordReader();
+ } else {
+ _useAvroParquetRecordReader = false;
+ _internalParquetRecordReader = new ParquetNativeRecordReader();
+ }
+ _internalParquetRecordReader.init(dataFile, fieldsToRead, recordReaderConfig);
}
@Override
public boolean hasNext() {
- return _nextRecord != null;
+ return _internalParquetRecordReader.hasNext();
}
@Override
@@ -64,22 +61,22 @@ public class ParquetRecordReader implements RecordReader {
@Override
public GenericRow next(GenericRow reuse)
throws IOException {
- _recordExtractor.extract(_nextRecord, reuse);
- _nextRecord = _parquetReader.read();
- return reuse;
+ return _internalParquetRecordReader.next(reuse);
}
@Override
public void rewind()
throws IOException {
- _parquetReader.close();
- _parquetReader = ParquetUtils.getParquetReader(_dataFilePath);
- _nextRecord = _parquetReader.read();
+ _internalParquetRecordReader.rewind();
}
@Override
public void close()
throws IOException {
- _parquetReader.close();
+ _internalParquetRecordReader.close();
+ }
+
+ public boolean useAvroParquetRecordReader() {
+ return _useAvroParquetRecordReader;
}
}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java
new file mode 100644
index 0000000..d6bdce2
--- /dev/null
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+/**
+ * Config for ParquetRecordReader
+ */
+public class ParquetRecordReaderConfig implements RecordReaderConfig {
+ private static final String USE_PARQUET_AVRO_RECORDER_READER = "useParquetAvroRecordReader";
+ private boolean _useParquetAvroRecordReader = true;
+ private Configuration _conf;
+
+ public ParquetRecordReaderConfig() {
+ }
+
+ public ParquetRecordReaderConfig(Configuration conf) {
+ _conf = conf;
+ _useParquetAvroRecordReader = conf.getBoolean(USE_PARQUET_AVRO_RECORDER_READER, true);
+ }
+
+ public boolean useParquetAvroRecordReader() {
+ return _useParquetAvroRecordReader;
+ }
+
+ public void setUseParquetAvroRecordReader(boolean useParquetAvroRecordReader) {
+ _useParquetAvroRecordReader = useParquetAvroRecordReader;
+ }
+
+ public Configuration getConfig() {
+ return _conf;
+ }
+}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
index f2a49b1..5f3dd81 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
@@ -33,38 +33,41 @@ import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
public class ParquetUtils {
+ private static final String DEFAULT_FS = "file:///";
+
private ParquetUtils() {
}
- private static final String DEFAULT_FS = "file:///";
-
/**
* Returns a ParquetReader with the given path.
*/
- public static ParquetReader<GenericRecord> getParquetReader(Path path)
+ public static ParquetReader<GenericRecord> getParquetAvroReader(Path path)
throws IOException {
//noinspection unchecked
return AvroParquetReader.<GenericRecord>builder(path).disableCompatibility().withDataModel(GenericData.get())
- .withConf(getConfiguration()).build();
+ .withConf(getParquetAvroReaderConfiguration()).build();
}
/**
* Returns a ParquetWriter with the given path and schema.
*/
- public static ParquetWriter<GenericRecord> getParquetWriter(Path path, Schema schema)
+ public static ParquetWriter<GenericRecord> getParquetAvroWriter(Path path, Schema schema)
throws IOException {
- return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema).withConf(getConfiguration()).build();
+ return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema)
+ .withConf(getParquetAvroReaderConfiguration()).build();
}
/**
* Returns the schema for the given Parquet file path.
*/
- public static Schema getParquetSchema(Path path)
+ public static Schema getParquetAvroSchema(Path path)
throws IOException {
- ParquetMetadata footer = ParquetFileReader.readFooter(getConfiguration(), path, ParquetMetadataConverter.NO_FILTER);
+ ParquetMetadata footer =
+ ParquetFileReader.readFooter(getParquetAvroReaderConfiguration(), path, ParquetMetadataConverter.NO_FILTER);
Map<String, String> metaData = footer.getFileMetaData().getKeyValueMetaData();
String schemaString = metaData.get("parquet.avro.schema");
if (schemaString == null) {
@@ -74,11 +77,12 @@ public class ParquetUtils {
if (schemaString != null) {
return new Schema.Parser().parse(schemaString);
} else {
- return new AvroSchemaConverter().convert(footer.getFileMetaData().getSchema());
+ MessageType parquetSchema = footer.getFileMetaData().getSchema();
+ return new AvroSchemaConverter().convert(parquetSchema);
}
}
- private static Configuration getConfiguration() {
+ private static Configuration getParquetAvroReaderConfiguration() {
// The file path used in ParquetRecordReader is a local file path without prefix 'file:///',
// so we have to make sure that the configuration item 'fs.defaultFS' is set to 'file:///'
// in case that user's hadoop conf overwrite this item
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index ec9beed..f39df45 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.plugin.inputformat.parquet;
+import com.google.common.collect.ImmutableSet;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -30,11 +32,16 @@ import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
+import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
private final File _dataFile = new File(_tempDir, "data.parquet");
+ private final File _testParquetFileWithInt96AndDecimal =
+ new File(getClass().getClassLoader().getResource("test-file-with-int96-and-decimal.snappy.parquet").getFile());
@Override
protected RecordReader createRecordReader()
@@ -57,10 +64,78 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
records.add(record);
}
try (ParquetWriter<GenericRecord> writer = ParquetUtils
- .getParquetWriter(new Path(_dataFile.getAbsolutePath()), schema)) {
+ .getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), schema)) {
for (GenericRecord record : records) {
writer.write(record);
}
}
}
+
+ @Test
+ public void testParquetAvroRecordReader()
+ throws IOException {
+ ParquetAvroRecordReader avroRecordReader = new ParquetAvroRecordReader();
+ avroRecordReader.init(_dataFile, null, new ParquetRecordReaderConfig());
+ testReadParquetFile(avroRecordReader, SAMPLE_RECORDS_SIZE);
+ }
+
+ private void testReadParquetFile(RecordReader reader, int totalRecords)
+ throws IOException {
+ int numRecordsRead = 0;
+ while (reader.hasNext()) {
+ reader.next();
+ numRecordsRead++;
+ }
+ Assert.assertEquals(numRecordsRead, totalRecords);
+ }
+
+ @Test
+ public void testParquetNativeRecordReader()
+ throws IOException {
+ ParquetNativeRecordReader nativeRecordReader = new ParquetNativeRecordReader();
+ nativeRecordReader.init(_testParquetFileWithInt96AndDecimal, ImmutableSet.of(), new ParquetRecordReaderConfig());
+ testReadParquetFile(nativeRecordReader, 1965);
+ nativeRecordReader.init(_dataFile, ImmutableSet.of(), new ParquetRecordReaderConfig());
+ testReadParquetFile(nativeRecordReader, SAMPLE_RECORDS_SIZE);
+ }
+
+ @Test
+ public void testComparison()
+ throws IOException {
+ testComparison(_dataFile, SAMPLE_RECORDS_SIZE);
+ testComparison(new File(getClass().getClassLoader().getResource("users.parquet").getFile()), 1);
+ testComparison(new File(getClass().getClassLoader().getResource("test-comparison.gz.parquet").getFile()), 363667);
+ testComparison(new File(getClass().getClassLoader().getResource("test-comparison.snappy.parquet").getFile()), 2870);
+ testComparison(new File(getClass().getClassLoader().getResource("baseballStats.snappy.parquet").getFile()), 97889);
+ testComparison(new File(getClass().getClassLoader().getResource("githubEvents.snappy.parquet").getFile()), 10000);
+ testComparison(new File(getClass().getClassLoader().getResource("starbucksStores.snappy.parquet").getFile()), 6443);
+ testComparison(new File(getClass().getClassLoader().getResource("airlineStats.snappy.parquet").getFile()), 19492);
+ testComparison(new File(getClass().getClassLoader().getResource("githubActivities.gz.parquet").getFile()), 2000);
+ }
+
+ private void testComparison(File dataFile, int totalRecords)
+ throws IOException {
+ final ParquetRecordReader avroRecordReader = new ParquetRecordReader();
+ avroRecordReader.init(dataFile, null, null);
+ final ParquetRecordReader nativeRecordReader = new ParquetRecordReader();
+ ParquetRecordReaderConfig parquetRecordReaderConfig = new ParquetRecordReaderConfig();
+ parquetRecordReaderConfig.setUseParquetAvroRecordReader(false);
+ nativeRecordReader.init(dataFile, null, parquetRecordReaderConfig);
+ Assert.assertTrue(avroRecordReader.useAvroParquetRecordReader());
+ Assert.assertFalse(nativeRecordReader.useAvroParquetRecordReader());
+
+ GenericRow avroReuse = new GenericRow();
+ GenericRow nativeReuse = new GenericRow();
+ int recordsRead = 0;
+ while (avroRecordReader.hasNext()) {
+ Assert.assertTrue(nativeRecordReader.hasNext());
+ final GenericRow avroReaderRow = avroRecordReader.next(avroReuse);
+ final GenericRow nativeReaderRow = nativeRecordReader.next(nativeReuse);
+ Assert.assertEquals(nativeReaderRow.toString(), avroReaderRow.toString());
+ Assert.assertTrue(avroReaderRow.equals(nativeReaderRow));
+ recordsRead++;
+ }
+ Assert.assertEquals(recordsRead, totalRecords,
+ "Message read from ParquetRecordReader doesn't match the expected number.");
+ }
}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/airlineStats.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/airlineStats.snappy.parquet
new file mode 100644
index 0000000..9d1954b
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/airlineStats.snappy.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/baseballStats.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/baseballStats.snappy.parquet
new file mode 100644
index 0000000..931f882
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/baseballStats.snappy.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubActivities.gz.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubActivities.gz.parquet
new file mode 100644
index 0000000..1723e2f
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubActivities.gz.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubEvents.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubEvents.snappy.parquet
new file mode 100644
index 0000000..6bd49d0
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubEvents.snappy.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/starbucksStores.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/starbucksStores.snappy.parquet
new file mode 100644
index 0000000..c3b3efc
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/starbucksStores.snappy.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.gz.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.gz.parquet
new file mode 100644
index 0000000..160c659
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.gz.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.snappy.parquet
new file mode 100644
index 0000000..8aed9b4
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.snappy.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-file-with-int96-and-decimal.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-file-with-int96-and-decimal.snappy.parquet
new file mode 100644
index 0000000..7818851
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-file-with-int96-and-decimal.snappy.parquet differ
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/users.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/users.parquet
new file mode 100644
index 0000000..58ed9a8
Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/users.parquet differ
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index 72e5c52..b3a79ce 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -167,11 +168,65 @@ public class GenericRow implements Serializable {
}
if (obj instanceof GenericRow) {
GenericRow that = (GenericRow) obj;
- return _fieldToValueMap.equals(that._fieldToValueMap) && _nullValueFields.equals(that._nullValueFields);
+ if (!_nullValueFields.containsAll(that._nullValueFields) || !that._nullValueFields
+ .containsAll(_nullValueFields)) {
+ return false;
+ }
+ return compareMap(_fieldToValueMap, that._fieldToValueMap);
}
return false;
}
+ private boolean compareMap(Map<String, Object> thisMap, Map<String, Object> thatMap) {
+ if (thisMap.size() == thatMap.size()) {
+ for (String key : thisMap.keySet()) {
+ Object fieldValue = thisMap.get(key);
+ Object thatFieldValue = thatMap.get(key);
+ if (fieldValue == null) {
+ if (thatFieldValue != null) {
+ return false;
+ }
+ } else if (!fieldValue.equals(thatFieldValue)) {
+ if (fieldValue instanceof Map && thatFieldValue instanceof Map) {
+ return compareMap((Map<String, Object>) fieldValue, (Map<String, Object>) thatFieldValue);
+ }
+ if ((fieldValue instanceof byte[]) && (thatFieldValue instanceof byte[])) {
+ return Arrays.equals((byte[]) fieldValue, (byte[]) thatFieldValue);
+ }
+ if (fieldValue.getClass().isArray() && thatFieldValue.getClass().isArray()) {
+ return compareArray((Object[]) fieldValue, (Object[]) thatFieldValue);
+ }
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private boolean compareArray(Object[] fieldValue, Object[] thatFieldValue) {
+ for (int i = 0; i < fieldValue.length; i++) {
+ if (fieldValue[i] instanceof Map) {
+ if (!(thatFieldValue[i] instanceof Map)) {
+ return false;
+ }
+ if (!compareMap((Map<String, Object>) fieldValue[i], (Map<String, Object>) thatFieldValue[i])) {
+ return false;
+ }
+ continue;
+ }
+ if (fieldValue[i].getClass().isArray()) {
+ if (!compareArray((Object[]) fieldValue[i], (Object[]) thatFieldValue[i])) {
+ return false;
+ }
+ continue;
+ }
+ if (!fieldValue[i].equals(thatFieldValue[i])) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@Override
public String toString() {
try {
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
index 91035cd..91efd6c 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
@@ -39,7 +39,7 @@ import org.testng.collections.Lists;
public abstract class AbstractRecordReaderTest {
private final static Random RANDOM = new Random(System.currentTimeMillis());
- private final static int SAMPLE_RECORDS_SIZE = 10000;
+ protected final static int SAMPLE_RECORDS_SIZE = 10000;
protected final File _tempDir = new File(FileUtils.getTempDirectory(), "RecordReaderTest");
protected List<Map<String, Object>> _records;
@@ -155,6 +155,9 @@ public abstract class AbstractRecordReaderTest {
@BeforeClass
public void setUp()
throws Exception {
+ if (_tempDir.exists()) {
+ FileUtils.cleanDirectory(_tempDir);
+ }
FileUtils.forceMkdir(_tempDir);
// Generate Pinot schema
_pinotSchema = getPinotSchema();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org