You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/15 12:43:47 UTC

[arrow-rs] branch master updated: Support empty projection in ParquetRecordBatchReader (#1560)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new b10920173 Support empty projection in ParquetRecordBatchReader (#1560)
b10920173 is described below

commit b109201736d4d3037d95b3f8f40a9a4d0d0b94af
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri Apr 15 13:43:42 2022 +0100

    Support empty projection in ParquetRecordBatchReader (#1560)
    
    * Support empty projection in ParquetRecordBatchReader
    
    * Fix async reader
    
    * Fix RAT
---
 parquet/src/arrow/array_reader.rs             |  8 +++
 parquet/src/arrow/array_reader/builder.rs     | 15 +++---
 parquet/src/arrow/array_reader/empty_array.rs | 75 +++++++++++++++++++++++++++
 parquet/src/arrow/arrow_reader.rs             | 47 ++++++++++-------
 parquet/src/arrow/async_reader.rs             |  6 +++
 5 files changed, 122 insertions(+), 29 deletions(-)

diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs
index f54e6797e..7d0f4bfed 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -55,6 +55,7 @@ mod builder;
 mod byte_array;
 mod byte_array_dictionary;
 mod dictionary_buffer;
+mod empty_array;
 mod offset_buffer;
 
 #[cfg(test)]
@@ -97,6 +98,9 @@ pub trait RowGroupCollection {
     /// Get schema of parquet file.
     fn schema(&self) -> Result<SchemaDescPtr>;
 
+    /// Get the numer of rows in this collection
+    fn num_rows(&self) -> usize;
+
     /// Returns an iterator over the column chunks for particular column
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
 }
@@ -106,6 +110,10 @@ impl RowGroupCollection for Arc<dyn FileReader> {
         Ok(self.metadata().file_metadata().schema_descr_ptr())
     }
 
+    fn num_rows(&self) -> usize {
+        self.metadata().file_metadata().num_rows() as usize
+    }
+
     fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
         let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
         Ok(Box::new(iterator))
diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs
index ba3ba3558..2836c699c 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -20,6 +20,7 @@ use std::sync::Arc;
 
 use arrow::datatypes::{DataType as ArrowType, Field, IntervalUnit, Schema, SchemaRef};
 
+use crate::arrow::array_reader::empty_array::make_empty_array_reader;
 use crate::arrow::array_reader::{
     make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
     ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
@@ -37,7 +38,7 @@ use crate::data_type::{
     Int96Type,
 };
 use crate::errors::ParquetError::ArrowError;
-use crate::errors::{ParquetError, Result};
+use crate::errors::{Result};
 use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr};
 use crate::schema::visitor::TypeVisitor;
 
@@ -64,10 +65,6 @@ where
         filtered_root_names.insert(root.name().to_string());
     }
 
-    if leaves.is_empty() {
-        return Err(general_err!("Can't build array reader without columns!"));
-    }
-
     // Only pass root fields that take part in the projection
     // to avoid traversal of columns that are not read.
     // TODO: also prune unread parts of the tree in child structures
@@ -412,10 +409,10 @@ impl<'a> ArrayReaderBuilder {
     fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
         let context = ArrayReaderBuilderContext::default();
 
-        self.visit_struct(self.root_schema.clone(), &context)
-            .and_then(|reader_opt| {
-                reader_opt.ok_or_else(|| general_err!("Failed to build array reader!"))
-            })
+        match self.visit_struct(self.root_schema.clone(), &context)? {
+            Some(reader) => Ok(reader),
+            None => Ok(make_empty_array_reader(self.row_groups.num_rows())),
+        }
     }
 
     // Utility functions
diff --git a/parquet/src/arrow/array_reader/empty_array.rs b/parquet/src/arrow/array_reader/empty_array.rs
new file mode 100644
index 000000000..54b77becb
--- /dev/null
+++ b/parquet/src/arrow/array_reader/empty_array.rs
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::Result;
+use arrow::array::{ArrayDataBuilder, ArrayRef, StructArray};
+use arrow::datatypes::DataType as ArrowType;
+use std::any::Any;
+use std::sync::Arc;
+
+/// Returns an [`ArrayReader`] that yields [`StructArray`] with no columns
+/// but with row counts that correspond to the amount of data in the file
+///
+/// This is useful for when projection eliminates all columns within a collection
+pub fn make_empty_array_reader(row_count: usize) -> Box<dyn ArrayReader> {
+    Box::new(EmptyArrayReader::new(row_count))
+}
+
+struct EmptyArrayReader {
+    data_type: ArrowType,
+    remaining_rows: usize,
+}
+
+impl EmptyArrayReader {
+    pub fn new(row_count: usize) -> Self {
+        Self {
+            data_type: ArrowType::Struct(vec![]),
+            remaining_rows: row_count,
+        }
+    }
+}
+
+impl ArrayReader for EmptyArrayReader {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        let len = self.remaining_rows.min(batch_size);
+        self.remaining_rows -= len;
+
+        let data = ArrayDataBuilder::new(self.data_type.clone())
+            .len(len)
+            .build()
+            .unwrap();
+
+        Ok(Arc::new(StructArray::from(data)))
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        None
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        None
+    }
+}
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index 80323e59f..7675707e3 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -19,17 +19,18 @@
 
 use std::sync::Arc;
 
+use arrow::array::Array;
 use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::{RecordBatch, RecordBatchReader};
 use arrow::{array::StructArray, error::ArrowError};
 
-use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader};
+use crate::arrow::array_reader::{build_array_reader, ArrayReader};
 use crate::arrow::schema::parquet_to_arrow_schema;
 use crate::arrow::schema::{
     parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns,
 };
-use crate::errors::{ParquetError, Result};
+use crate::errors::Result;
 use crate::file::metadata::{KeyValue, ParquetMetaData};
 use crate::file::reader::FileReader;
 
@@ -234,20 +235,10 @@ impl Iterator for ParquetRecordBatchReader {
                             "Struct array reader should return struct array".to_string(),
                         )
                     });
+
                 match struct_array {
                     Err(err) => Some(Err(err)),
-                    Ok(e) => {
-                        match RecordBatch::try_new(self.schema.clone(), e.columns_ref()) {
-                            Err(err) => Some(Err(err)),
-                            Ok(record_batch) => {
-                                if record_batch.num_rows() > 0 {
-                                    Some(Ok(record_batch))
-                                } else {
-                                    None
-                                }
-                            }
-                        }
-                    }
+                    Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
                 }
             }
         }
@@ -265,12 +256,6 @@ impl ParquetRecordBatchReader {
         batch_size: usize,
         array_reader: Box<dyn ArrayReader>,
     ) -> Result<Self> {
-        // Check that array reader is struct array reader
-        array_reader
-            .as_any()
-            .downcast_ref::<StructArrayReader>()
-            .ok_or_else(|| general_err!("The input must be struct array reader!"))?;
-
         let schema = match array_reader.get_data_type() {
             ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
             _ => unreachable!("Struct array reader's data type is not struct!"),
@@ -1386,4 +1371,26 @@ mod tests {
             schema_without_metadata.as_ref()
         );
     }
+
+    #[test]
+    fn test_empty_projection() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{}/alltypes_plain.parquet", testdata);
+        let file = File::open(&path).unwrap();
+        let reader = SerializedFileReader::try_from(file).unwrap();
+        let expected_rows = reader.metadata().file_metadata().num_rows() as usize;
+
+        let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
+        let batch_reader = arrow_reader.get_record_reader_by_columns([], 2).unwrap();
+
+        let mut total_rows = 0;
+        for maybe_batch in batch_reader {
+            let batch = maybe_batch.unwrap();
+            total_rows += batch.num_rows();
+            assert_eq!(batch.num_columns(), 0);
+            assert!(batch.num_rows() <= 2);
+        }
+
+        assert_eq!(total_rows, expected_rows);
+    }
 }
diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs
index b8fafec1e..7bf3ebfa9 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -345,6 +345,7 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream
                                 input,
                                 InMemoryRowGroup {
                                     schema: metadata.file_metadata().schema_descr_ptr(),
+                                    row_count: row_group_metadata.num_rows() as usize,
                                     column_chunks,
                                 },
                             ))
@@ -419,6 +420,7 @@ async fn read_footer<T: AsyncRead + AsyncSeek + Unpin>(
 struct InMemoryRowGroup {
     schema: SchemaDescPtr,
     column_chunks: Vec<Option<InMemoryColumnChunk>>,
+    row_count: usize,
 }
 
 impl RowGroupCollection for InMemoryRowGroup {
@@ -426,6 +428,10 @@ impl RowGroupCollection for InMemoryRowGroup {
         Ok(self.schema.clone())
     }
 
+    fn num_rows(&self) -> usize {
+        self.row_count
+    }
+
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
         let page_reader = self.column_chunks[i].as_ref().unwrap().pages();