You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/15 12:43:47 UTC
[arrow-rs] branch master updated: Support empty projection in ParquetRecordBatchReader (#1560)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new b10920173 Support empty projection in ParquetRecordBatchReader (#1560)
b10920173 is described below
commit b109201736d4d3037d95b3f8f40a9a4d0d0b94af
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri Apr 15 13:43:42 2022 +0100
Support empty projection in ParquetRecordBatchReader (#1560)
* Support empty projection in ParquetRecordBatchReader
* Fix async reader
* Fix RAT
---
parquet/src/arrow/array_reader.rs | 8 +++
parquet/src/arrow/array_reader/builder.rs | 15 +++---
parquet/src/arrow/array_reader/empty_array.rs | 75 +++++++++++++++++++++++++++
parquet/src/arrow/arrow_reader.rs | 47 ++++++++++-------
parquet/src/arrow/async_reader.rs | 6 +++
5 files changed, 122 insertions(+), 29 deletions(-)
diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs
index f54e6797e..7d0f4bfed 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -55,6 +55,7 @@ mod builder;
mod byte_array;
mod byte_array_dictionary;
mod dictionary_buffer;
+mod empty_array;
mod offset_buffer;
#[cfg(test)]
@@ -97,6 +98,9 @@ pub trait RowGroupCollection {
/// Get schema of parquet file.
fn schema(&self) -> Result<SchemaDescPtr>;
+ /// Get the numer of rows in this collection
+ fn num_rows(&self) -> usize;
+
/// Returns an iterator over the column chunks for particular column
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
}
@@ -106,6 +110,10 @@ impl RowGroupCollection for Arc<dyn FileReader> {
Ok(self.metadata().file_metadata().schema_descr_ptr())
}
+ fn num_rows(&self) -> usize {
+ self.metadata().file_metadata().num_rows() as usize
+ }
+
fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
Ok(Box::new(iterator))
diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs
index ba3ba3558..2836c699c 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -20,6 +20,7 @@ use std::sync::Arc;
use arrow::datatypes::{DataType as ArrowType, Field, IntervalUnit, Schema, SchemaRef};
+use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::{
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
@@ -37,7 +38,7 @@ use crate::data_type::{
Int96Type,
};
use crate::errors::ParquetError::ArrowError;
-use crate::errors::{ParquetError, Result};
+use crate::errors::{Result};
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr};
use crate::schema::visitor::TypeVisitor;
@@ -64,10 +65,6 @@ where
filtered_root_names.insert(root.name().to_string());
}
- if leaves.is_empty() {
- return Err(general_err!("Can't build array reader without columns!"));
- }
-
// Only pass root fields that take part in the projection
// to avoid traversal of columns that are not read.
// TODO: also prune unread parts of the tree in child structures
@@ -412,10 +409,10 @@ impl<'a> ArrayReaderBuilder {
fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
let context = ArrayReaderBuilderContext::default();
- self.visit_struct(self.root_schema.clone(), &context)
- .and_then(|reader_opt| {
- reader_opt.ok_or_else(|| general_err!("Failed to build array reader!"))
- })
+ match self.visit_struct(self.root_schema.clone(), &context)? {
+ Some(reader) => Ok(reader),
+ None => Ok(make_empty_array_reader(self.row_groups.num_rows())),
+ }
}
// Utility functions
diff --git a/parquet/src/arrow/array_reader/empty_array.rs b/parquet/src/arrow/array_reader/empty_array.rs
new file mode 100644
index 000000000..54b77becb
--- /dev/null
+++ b/parquet/src/arrow/array_reader/empty_array.rs
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::Result;
+use arrow::array::{ArrayDataBuilder, ArrayRef, StructArray};
+use arrow::datatypes::DataType as ArrowType;
+use std::any::Any;
+use std::sync::Arc;
+
+/// Returns an [`ArrayReader`] that yields [`StructArray`] with no columns
+/// but with row counts that correspond to the amount of data in the file
+///
+/// This is useful for when projection eliminates all columns within a collection
+pub fn make_empty_array_reader(row_count: usize) -> Box<dyn ArrayReader> {
+ Box::new(EmptyArrayReader::new(row_count))
+}
+
+struct EmptyArrayReader {
+ data_type: ArrowType,
+ remaining_rows: usize,
+}
+
+impl EmptyArrayReader {
+ pub fn new(row_count: usize) -> Self {
+ Self {
+ data_type: ArrowType::Struct(vec![]),
+ remaining_rows: row_count,
+ }
+ }
+}
+
+impl ArrayReader for EmptyArrayReader {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn get_data_type(&self) -> &ArrowType {
+ &self.data_type
+ }
+
+ fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+ let len = self.remaining_rows.min(batch_size);
+ self.remaining_rows -= len;
+
+ let data = ArrayDataBuilder::new(self.data_type.clone())
+ .len(len)
+ .build()
+ .unwrap();
+
+ Ok(Arc::new(StructArray::from(data)))
+ }
+
+ fn get_def_levels(&self) -> Option<&[i16]> {
+ None
+ }
+
+ fn get_rep_levels(&self) -> Option<&[i16]> {
+ None
+ }
+}
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index 80323e59f..7675707e3 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -19,17 +19,18 @@
use std::sync::Arc;
+use arrow::array::Array;
use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use arrow::{array::StructArray, error::ArrowError};
-use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader};
+use crate::arrow::array_reader::{build_array_reader, ArrayReader};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::schema::{
parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns,
};
-use crate::errors::{ParquetError, Result};
+use crate::errors::Result;
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::FileReader;
@@ -234,20 +235,10 @@ impl Iterator for ParquetRecordBatchReader {
"Struct array reader should return struct array".to_string(),
)
});
+
match struct_array {
Err(err) => Some(Err(err)),
- Ok(e) => {
- match RecordBatch::try_new(self.schema.clone(), e.columns_ref()) {
- Err(err) => Some(Err(err)),
- Ok(record_batch) => {
- if record_batch.num_rows() > 0 {
- Some(Ok(record_batch))
- } else {
- None
- }
- }
- }
- }
+ Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
}
}
}
@@ -265,12 +256,6 @@ impl ParquetRecordBatchReader {
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
) -> Result<Self> {
- // Check that array reader is struct array reader
- array_reader
- .as_any()
- .downcast_ref::<StructArrayReader>()
- .ok_or_else(|| general_err!("The input must be struct array reader!"))?;
-
let schema = match array_reader.get_data_type() {
ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
_ => unreachable!("Struct array reader's data type is not struct!"),
@@ -1386,4 +1371,26 @@ mod tests {
schema_without_metadata.as_ref()
);
}
+
+ #[test]
+ fn test_empty_projection() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{}/alltypes_plain.parquet", testdata);
+ let file = File::open(&path).unwrap();
+ let reader = SerializedFileReader::try_from(file).unwrap();
+ let expected_rows = reader.metadata().file_metadata().num_rows() as usize;
+
+ let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
+ let batch_reader = arrow_reader.get_record_reader_by_columns([], 2).unwrap();
+
+ let mut total_rows = 0;
+ for maybe_batch in batch_reader {
+ let batch = maybe_batch.unwrap();
+ total_rows += batch.num_rows();
+ assert_eq!(batch.num_columns(), 0);
+ assert!(batch.num_rows() <= 2);
+ }
+
+ assert_eq!(total_rows, expected_rows);
+ }
}
diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs
index b8fafec1e..7bf3ebfa9 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -345,6 +345,7 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream
input,
InMemoryRowGroup {
schema: metadata.file_metadata().schema_descr_ptr(),
+ row_count: row_group_metadata.num_rows() as usize,
column_chunks,
},
))
@@ -419,6 +420,7 @@ async fn read_footer<T: AsyncRead + AsyncSeek + Unpin>(
struct InMemoryRowGroup {
schema: SchemaDescPtr,
column_chunks: Vec<Option<InMemoryColumnChunk>>,
+ row_count: usize,
}
impl RowGroupCollection for InMemoryRowGroup {
@@ -426,6 +428,10 @@ impl RowGroupCollection for InMemoryRowGroup {
Ok(self.schema.clone())
}
+ fn num_rows(&self) -> usize {
+ self.row_count
+ }
+
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
let page_reader = self.column_chunks[i].as_ref().unwrap().pages();