You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/11/11 04:43:57 UTC
[pinot] branch master updated: Handle fields missing in the source
in ParquetNativeRecordReader (#7742)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new af01aa5 Handle fields missing in the source in ParquetNativeRecordReader (#7742)
af01aa5 is described below
commit af01aa5778a3097afda48c154dfe6e68e36f63bc
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Nov 10 20:43:34 2021 -0800
Handle fields missing in the source in ParquetNativeRecordReader (#7742)
* Fix ParquetNativeRecordExtractor for fields missing in the source
* nit
* Same bug in proto
---
.../parquet/ParquetNativeRecordExtractor.java | 2 +-
.../parquet/ParquetNativeRecordReaderTest.java | 66 ++++++++++++++++++++++
.../protobuf/ProtoBufRecordExtractor.java | 2 +-
.../spi/data/readers/AbstractRecordReaderTest.java | 4 +-
4 files changed, 71 insertions(+), 3 deletions(-)
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
index ccc24c2..175782b 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
@@ -114,7 +114,7 @@ public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
}
} else {
for (String fieldName : _fields) {
- Object value = extractValue(from, fromType.getFieldIndex(fieldName));
+ Object value = fromType.containsField(fieldName) ? extractValue(from, fromType.getFieldIndex(fieldName)) : null;
if (value != null) {
value = convert(value);
}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
new file mode 100644
index 0000000..ffc80a9
--- /dev/null
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
+import org.apache.pinot.spi.data.readers.RecordReader;
+
+
+public class ParquetNativeRecordReaderTest extends AbstractRecordReaderTest {
+ private final File _dataFile = new File(_tempDir, "data.parquet");
+
+ @Override
+ protected RecordReader createRecordReader()
+ throws Exception {
+ ParquetNativeRecordReader recordReader = new ParquetNativeRecordReader();
+ recordReader.init(_dataFile, _sourceFields, null);
+ return recordReader;
+ }
+
+ @Override
+ protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
+ throws Exception {
+ Schema schema = AvroUtils.getAvroSchemaFromPinotSchema(getPinotSchema());
+ List<GenericRecord> records = new ArrayList<>();
+ for (Map<String, Object> r : recordsToWrite) {
+ GenericRecord record = new GenericData.Record(schema);
+ for (FieldSpec fieldSpec : getPinotSchema().getAllFieldSpecs()) {
+ record.put(fieldSpec.getName(), r.get(fieldSpec.getName()));
+ }
+ records.add(record);
+ }
+ try (ParquetWriter<GenericRecord> writer = ParquetUtils
+ .getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), schema)) {
+ for (GenericRecord record : records) {
+ writer.write(record);
+ }
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
index 17cb445..8685d0c 100644
--- a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
+++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
@@ -67,7 +67,7 @@ public class ProtoBufRecordExtractor extends BaseRecordExtractor<Message> {
} else {
for (String fieldName : _fields) {
Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(fieldName);
- Object fieldValue = from.getField(fieldDescriptor);
+ Object fieldValue = fieldDescriptor != null ? from.getField(fieldDescriptor) : null;
if (fieldValue != null) {
fieldValue = convert(new ProtoBufFieldInfo(fieldValue, descriptor.findFieldByName(fieldName)));
}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
index 7eadbe3..b4821bb 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
@@ -150,7 +150,9 @@ public abstract class AbstractRecordReaderTest {
}
protected Set<String> getSourceFields(Schema schema) {
- return Sets.newHashSet(schema.getColumnNames());
+ Set<String> sourceFields = Sets.newHashSet(schema.getColumnNames());
+ sourceFields.add("column_not_in_source");
+ return sourceFields;
}
@BeforeClass
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org