You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/11/11 04:43:57 UTC

[pinot] branch master updated: Handle fields missing in the source in ParquetNativeRecordReader (#7742)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new af01aa5  Handle fields missing in the source in ParquetNativeRecordReader (#7742)
af01aa5 is described below

commit af01aa5778a3097afda48c154dfe6e68e36f63bc
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Nov 10 20:43:34 2021 -0800

    Handle fields missing in the source in ParquetNativeRecordReader (#7742)
    
    * Fix ParquetNativeRecordExtractor for fields missing in the source
    
    * nit
    
    * Same bug in proto
---
 .../parquet/ParquetNativeRecordExtractor.java      |  2 +-
 .../parquet/ParquetNativeRecordReaderTest.java     | 66 ++++++++++++++++++++++
 .../protobuf/ProtoBufRecordExtractor.java          |  2 +-
 .../spi/data/readers/AbstractRecordReaderTest.java |  4 +-
 4 files changed, 71 insertions(+), 3 deletions(-)

diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
index ccc24c2..175782b 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
@@ -114,7 +114,7 @@ public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
       }
     } else {
       for (String fieldName : _fields) {
-        Object value = extractValue(from, fromType.getFieldIndex(fieldName));
+        Object value = fromType.containsField(fieldName) ? extractValue(from, fromType.getFieldIndex(fieldName)) : null;
         if (value != null) {
           value = convert(value);
         }
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
new file mode 100644
index 0000000..ffc80a9
--- /dev/null
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.parquet;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
+import org.apache.pinot.spi.data.readers.RecordReader;
+
+
+public class ParquetNativeRecordReaderTest extends AbstractRecordReaderTest {
+  private final File _dataFile = new File(_tempDir, "data.parquet");
+
+  @Override
+  protected RecordReader createRecordReader()
+      throws Exception {
+    ParquetNativeRecordReader recordReader = new ParquetNativeRecordReader();
+    recordReader.init(_dataFile, _sourceFields, null);
+    return recordReader;
+  }
+
+  @Override
+  protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
+      throws Exception {
+    Schema schema = AvroUtils.getAvroSchemaFromPinotSchema(getPinotSchema());
+    List<GenericRecord> records = new ArrayList<>();
+    for (Map<String, Object> r : recordsToWrite) {
+      GenericRecord record = new GenericData.Record(schema);
+      for (FieldSpec fieldSpec : getPinotSchema().getAllFieldSpecs()) {
+        record.put(fieldSpec.getName(), r.get(fieldSpec.getName()));
+      }
+      records.add(record);
+    }
+    try (ParquetWriter<GenericRecord> writer = ParquetUtils
+        .getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), schema)) {
+      for (GenericRecord record : records) {
+        writer.write(record);
+      }
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
index 17cb445..8685d0c 100644
--- a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
+++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
@@ -67,7 +67,7 @@ public class ProtoBufRecordExtractor extends BaseRecordExtractor<Message> {
     } else {
       for (String fieldName : _fields) {
         Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(fieldName);
-        Object fieldValue = from.getField(fieldDescriptor);
+        Object fieldValue = fieldDescriptor != null ? from.getField(fieldDescriptor) : null;
         if (fieldValue != null) {
           fieldValue = convert(new ProtoBufFieldInfo(fieldValue, descriptor.findFieldByName(fieldName)));
         }
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
index 7eadbe3..b4821bb 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
@@ -150,7 +150,9 @@ public abstract class AbstractRecordReaderTest {
   }
 
   protected Set<String> getSourceFields(Schema schema) {
-    return Sets.newHashSet(schema.getColumnNames());
+    Set<String> sourceFields = Sets.newHashSet(schema.getColumnNames());
+    sourceFields.add("column_not_in_source");
+    return sourceFields;
   }
 
   @BeforeClass

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org