You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/11 20:01:01 UTC
[arrow-rs] branch master updated: Add Parquet RowFilter API (#2335)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 21ba02eff Add Parquet RowFilter API (#2335)
21ba02eff is described below
commit 21ba02eff1ce356fc06c2949afe2ad4c84553ce9
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Aug 11 21:00:56 2022 +0100
Add Parquet RowFilter API (#2335)
* Add RowFilter API
* Review feedback
* Fix doc
* Fix handling of NULL boolean array
* Add tests, fix bugs
* Fix clippy
* Review feedback
* Fix doc
---
parquet/src/arrow/array_reader/builder.rs | 19 +-
parquet/src/arrow/array_reader/list_array.rs | 10 +-
parquet/src/arrow/array_reader/mod.rs | 6 +-
parquet/src/arrow/arrow_reader/filter.rs | 109 ++++++
.../arrow/{arrow_reader.rs => arrow_reader/mod.rs} | 135 ++++---
parquet/src/arrow/arrow_reader/selection.rs | 426 +++++++++++++++++++++
parquet/src/arrow/async_reader.rs | 401 ++++++++++++++-----
7 files changed, 910 insertions(+), 196 deletions(-)
diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs
index 32ffaeb9d..e389158a1 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -39,20 +39,18 @@ use crate::data_type::{
Int64Type, Int96Type,
};
use crate::errors::Result;
-use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type};
+use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
/// Create array reader from parquet schema, projection mask, and parquet file reader.
pub fn build_array_reader(
- parquet_schema: SchemaDescPtr,
arrow_schema: SchemaRef,
mask: ProjectionMask,
- row_groups: Box<dyn RowGroupCollection>,
+ row_groups: &dyn RowGroupCollection,
) -> Result<Box<dyn ArrayReader>> {
- let field =
- convert_schema(parquet_schema.as_ref(), mask, Some(arrow_schema.as_ref()))?;
+ let field = convert_schema(&row_groups.schema(), mask, Some(arrow_schema.as_ref()))?;
match &field {
- Some(field) => build_reader(field, row_groups.as_ref()),
+ Some(field) => build_reader(field, row_groups),
None => Ok(make_empty_array_reader(row_groups.num_rows())),
}
}
@@ -333,13 +331,8 @@ mod tests {
)
.unwrap();
- let array_reader = build_array_reader(
- file_reader.metadata().file_metadata().schema_descr_ptr(),
- Arc::new(arrow_schema),
- mask,
- Box::new(file_reader),
- )
- .unwrap();
+ let array_reader =
+ build_array_reader(Arc::new(arrow_schema), mask, &file_reader).unwrap();
// Create arrow types
let arrow_type = DataType::Struct(vec![Field::new(
diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs
index 2acd59dcc..d2fa94611 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -582,13 +582,9 @@ mod tests {
let schema = file_metadata.schema_descr_ptr();
let mask = ProjectionMask::leaves(&schema, vec![0]);
- let mut array_reader = build_array_reader(
- schema,
- Arc::new(arrow_schema),
- mask,
- Box::new(file_reader),
- )
- .unwrap();
+ let mut array_reader =
+ build_array_reader(Arc::new(arrow_schema), mask, &file_reader)
+ .unwrap();
let batch = array_reader.next_batch(100).unwrap();
assert_eq!(batch.data_type(), array_reader.get_data_type());
diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs
index d7665ef0f..54c45a336 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -100,7 +100,7 @@ pub trait ArrayReader: Send {
/// A collection of row groups
pub trait RowGroupCollection {
/// Get schema of parquet file.
- fn schema(&self) -> Result<SchemaDescPtr>;
+ fn schema(&self) -> SchemaDescPtr;
/// Get the numer of rows in this collection
fn num_rows(&self) -> usize;
@@ -110,8 +110,8 @@ pub trait RowGroupCollection {
}
impl RowGroupCollection for Arc<dyn FileReader> {
- fn schema(&self) -> Result<SchemaDescPtr> {
- Ok(self.metadata().file_metadata().schema_descr_ptr())
+ fn schema(&self) -> SchemaDescPtr {
+ self.metadata().file_metadata().schema_descr_ptr()
}
fn num_rows(&self) -> usize {
diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs
new file mode 100644
index 000000000..8945ccde4
--- /dev/null
+++ b/parquet/src/arrow/arrow_reader/filter.rs
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::ProjectionMask;
+use arrow::array::BooleanArray;
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+
+/// A predicate operating on [`RecordBatch`]
+pub trait ArrowPredicate: Send + 'static {
+ /// Returns the [`ProjectionMask`] that describes the columns required
+ /// to evaluate this predicate. All projected columns will be provided in the `batch`
+ /// passed to [`evaluate`](Self::evaluate)
+ fn projection(&self) -> &ProjectionMask;
+
+ /// Evaluate this predicate for the given [`RecordBatch`] containing the columns
+ /// identified by [`Self::projection`]
+ ///
+ /// Rows that are `true` in the returned [`BooleanArray`] will be returned by the
+ /// parquet reader, whereas rows that are `false` or `Null` will not be
+ fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray>;
+}
+
+/// An [`ArrowPredicate`] created from an [`FnMut`]
+pub struct ArrowPredicateFn<F> {
+ f: F,
+ projection: ProjectionMask,
+}
+
+impl<F> ArrowPredicateFn<F>
+where
+ F: FnMut(RecordBatch) -> ArrowResult<BooleanArray> + Send + 'static,
+{
+ /// Create a new [`ArrowPredicateFn`]. `f` will be passed batches
+ /// that contains the columns specified in `projection`
+ /// and returns a [`BooleanArray`] that describes which rows should
+ /// be passed along
+ pub fn new(projection: ProjectionMask, f: F) -> Self {
+ Self { f, projection }
+ }
+}
+
+impl<F> ArrowPredicate for ArrowPredicateFn<F>
+where
+ F: FnMut(RecordBatch) -> ArrowResult<BooleanArray> + Send + 'static,
+{
+ fn projection(&self) -> &ProjectionMask {
+ &self.projection
+ }
+
+ fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
+ (self.f)(batch)
+ }
+}
+
+/// A [`RowFilter`] allows pushing down a filter predicate to skip IO and decode
+///
+/// This consists of a list of [`ArrowPredicate`] where only the rows that satisfy all
+/// of the predicates will be returned. Any [`RowSelection`] will be applied prior
+/// to the first predicate, and each predicate in turn will then be used to compute
+/// a more refined [`RowSelection`] to use when evaluating the subsequent predicates.
+///
+/// Once all predicates have been evaluated, the final [`RowSelection`] is applied
+/// to the top-level [`ProjectionMask`] to produce the final output [`RecordBatch`].
+///
+/// This design has a couple of implications:
+///
+/// * [`RowFilter`] can be used to skip entire pages, and thus IO, in addition to CPU decode overheads
+/// * Columns may be decoded multiple times if they appear in multiple [`ProjectionMask`]
+/// * IO will be deferred until needed by a [`ProjectionMask`]
+///
+/// As such there is a trade-off between a single large predicate, or multiple predicates,
+/// that will depend on the shape of the data. Whilst multiple smaller predicates may
+/// minimise the amount of data scanned/decoded, it may not be faster overall.
+///
+/// For example, if a predicate that needs a single column of data filters out all but
+/// 1% of the rows, applying it as one of the early `ArrowPredicateFn` will likely significantly
+/// improve performance.
+///
+/// As a counter example, if a predicate needs several columns of data to evaluate but
+/// leaves 99% of the rows, it may be better to not filter the data from parquet and
+/// apply the filter after the RecordBatch has been fully decoded.
+///
+/// [`RowSelection`]: [super::selection::RowSelection]
+pub struct RowFilter {
+ /// A list of [`ArrowPredicate`]
+ pub(crate) predicates: Vec<Box<dyn ArrowPredicate>>,
+}
+
+impl RowFilter {
+ /// Create a new [`RowFilter`] from an array of [`ArrowPredicate`]
+ pub fn new(predicates: Vec<Box<dyn ArrowPredicate>>) -> Self {
+ Self { predicates }
+ }
+}
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader/mod.rs
similarity index 96%
rename from parquet/src/arrow/arrow_reader.rs
rename to parquet/src/arrow/arrow_reader/mod.rs
index fd68f4bc0..e363919f6 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -21,6 +21,7 @@ use std::collections::VecDeque;
use std::sync::Arc;
use arrow::array::Array;
+use arrow::compute::prep_null_mask_filter;
use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
@@ -36,6 +37,17 @@ use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::schema::types::SchemaDescriptor;
+#[allow(unused)]
+mod filter;
+#[allow(unused)]
+mod selection;
+
+// TODO: Make these public once stable (#1792)
+#[allow(unused_imports)]
+pub(crate) use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
+#[allow(unused_imports)]
+pub(crate) use selection::{RowSelection, RowSelector};
+
/// Arrow reader api.
/// With this api, user can get arrow schema from parquet file, and read parquet data
/// into arrow arrays.
@@ -72,41 +84,10 @@ pub trait ArrowReader {
) -> Result<Self::RecordReader>;
}
-/// [`RowSelection`] allows selecting or skipping a provided number of rows
-/// when scanning the parquet file
-#[derive(Debug, Clone, Copy)]
-pub(crate) struct RowSelection {
- /// The number of rows
- pub row_count: usize,
-
- /// If true, skip `row_count` rows
- pub skip: bool,
-}
-
-impl RowSelection {
- /// Select `row_count` rows
- #[allow(unused)]
- pub fn select(row_count: usize) -> Self {
- Self {
- row_count,
- skip: false,
- }
- }
-
- /// Skip `row_count` rows
- #[allow(unused)]
- pub fn skip(row_count: usize) -> Self {
- Self {
- row_count,
- skip: true,
- }
- }
-}
-
#[derive(Debug, Clone, Default)]
pub struct ArrowReaderOptions {
skip_arrow_metadata: bool,
- selection: Option<Vec<RowSelection>>,
+ selection: Option<RowSelection>,
}
impl ArrowReaderOptions {
@@ -130,12 +111,9 @@ impl ArrowReaderOptions {
/// Scan rows from the parquet file according to the provided `selection`
///
- /// TODO: Make public once row selection fully implemented (#1792)
+ /// TODO: Revisit this API, as [`Self`] is provided before the file metadata is available
#[allow(unused)]
- pub(crate) fn with_row_selection(
- self,
- selection: impl Into<Vec<RowSelection>>,
- ) -> Self {
+ pub(crate) fn with_row_selection(self, selection: impl Into<RowSelection>) -> Self {
Self {
selection: Some(selection.into()),
..self
@@ -143,6 +121,9 @@ impl ArrowReaderOptions {
}
}
+/// An `ArrowReader` that can be used to synchronously read parquet data as [`RecordBatch`]
+///
+/// See [`crate::arrow::async_reader`] for an asynchronous interface
pub struct ParquetFileArrowReader {
file_reader: Arc<dyn FileReader>,
@@ -178,21 +159,13 @@ impl ArrowReader for ParquetFileArrowReader {
mask: ProjectionMask,
batch_size: usize,
) -> Result<ParquetRecordBatchReader> {
- let array_reader = build_array_reader(
- self.file_reader
- .metadata()
- .file_metadata()
- .schema_descr_ptr(),
- Arc::new(self.get_schema()?),
- mask,
- Box::new(self.file_reader.clone()),
- )?;
+ let array_reader =
+ build_array_reader(Arc::new(self.get_schema()?), mask, &self.file_reader)?;
- let selection = self.options.selection.clone().map(Into::into);
Ok(ParquetRecordBatchReader::new(
batch_size,
array_reader,
- selection,
+ self.options.selection.clone(),
))
}
}
@@ -279,11 +252,13 @@ impl ParquetFileArrowReader {
}
}
+/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
+/// read from a parquet data source
pub struct ParquetRecordBatchReader {
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
schema: SchemaRef,
- selection: Option<VecDeque<RowSelection>>,
+ selection: Option<VecDeque<RowSelector>>,
}
impl Iterator for ParquetRecordBatchReader {
@@ -319,7 +294,7 @@ impl Iterator for ParquetRecordBatchReader {
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
- selection.push_front(RowSelection::select(remaining));
+ selection.push_front(RowSelector::select(remaining));
need_read
}
_ => front.row_count,
@@ -364,22 +339,13 @@ impl RecordBatchReader for ParquetRecordBatchReader {
}
impl ParquetRecordBatchReader {
- pub fn try_new(
- batch_size: usize,
- array_reader: Box<dyn ArrayReader>,
- ) -> Result<Self> {
- Ok(Self::new(batch_size, array_reader, None))
- }
-
/// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
/// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
/// all rows will be returned
- ///
- /// TODO: Make public once row selection fully implemented (#1792)
pub(crate) fn new(
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
- selection: Option<VecDeque<RowSelection>>,
+ selection: Option<RowSelection>,
) -> Self {
let schema = match array_reader.get_data_type() {
ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
@@ -390,11 +356,41 @@ impl ParquetRecordBatchReader {
batch_size,
array_reader,
schema: Arc::new(schema),
- selection,
+ selection: selection.map(Into::into),
}
}
}
+/// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`]
+///
+/// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the
+/// returned [`RowSelection`] will be the conjunction of this and
+/// the rows selected by `predicate`
+#[allow(unused)]
+pub(crate) fn evaluate_predicate(
+ batch_size: usize,
+ array_reader: Box<dyn ArrayReader>,
+ input_selection: Option<RowSelection>,
+ predicate: &mut dyn ArrowPredicate,
+) -> Result<RowSelection> {
+ let reader =
+ ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
+ let mut filters = vec![];
+ for maybe_batch in reader {
+ let filter = predicate.evaluate(maybe_batch?)?;
+ match filter.null_count() {
+ 0 => filters.push(filter),
+ _ => filters.push(prep_null_mask_filter(&filter)),
+ };
+ }
+
+ let raw = RowSelection::from_filters(&filters);
+ Ok(match input_selection {
+ Some(selection) => selection.and_then(&raw),
+ None => raw,
+ })
+}
+
#[cfg(test)]
mod tests {
use bytes::Bytes;
@@ -417,7 +413,7 @@ mod tests {
use crate::arrow::arrow_reader::{
ArrowReader, ArrowReaderOptions, ParquetFileArrowReader,
- ParquetRecordBatchReader, RowSelection,
+ ParquetRecordBatchReader, RowSelection, RowSelector,
};
use crate::arrow::buffer::converter::{
Converter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter,
@@ -893,7 +889,7 @@ mod tests {
/// Encoding
encoding: Encoding,
//row selections and total selected row count
- row_selections: Option<(Vec<RowSelection>, usize)>,
+ row_selections: Option<(RowSelection, usize)>,
}
impl Default for TestOptions {
@@ -1187,6 +1183,7 @@ mod tests {
let mut without_skip_data = gen_expected_data::<T>(&def_levels, &values);
let mut skip_data: Vec<Option<T::T>> = vec![];
+ let selections: VecDeque<RowSelector> = selections.into();
for select in selections {
if select.skip {
without_skip_data.drain(0..select.row_count);
@@ -1763,12 +1760,12 @@ mod tests {
/// a `batch_size` and `selection`
fn get_expected_batches(
column: &RecordBatch,
- selection: &[RowSelection],
+ selection: &RowSelection,
batch_size: usize,
) -> Vec<RecordBatch> {
let mut expected_batches = vec![];
- let mut selection: VecDeque<_> = selection.iter().cloned().collect();
+ let mut selection: VecDeque<_> = selection.clone().into();
let mut row_offset = 0;
let mut last_start = None;
while row_offset < column.num_rows() && !selection.is_empty() {
@@ -1820,7 +1817,7 @@ mod tests {
step_len: usize,
total_len: usize,
skip_first: bool,
- ) -> (Vec<RowSelection>, usize) {
+ ) -> (RowSelection, usize) {
let mut remaining = total_len;
let mut skip = skip_first;
let mut vec = vec![];
@@ -1831,7 +1828,7 @@ mod tests {
} else {
remaining
};
- vec.push(RowSelection {
+ vec.push(RowSelector {
row_count: step,
skip,
});
@@ -1841,7 +1838,7 @@ mod tests {
}
skip = !skip;
}
- (vec, selected_count)
+ (vec.into(), selected_count)
}
#[test]
@@ -1890,7 +1887,7 @@ mod tests {
fn create_skip_reader(
test_file: &File,
batch_size: usize,
- selections: Vec<RowSelection>,
+ selections: RowSelection,
) -> ParquetRecordBatchReader {
let arrow_reader_options =
ArrowReaderOptions::new().with_row_selection(selections);
diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs
new file mode 100644
index 000000000..8e129f566
--- /dev/null
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -0,0 +1,426 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Array, BooleanArray};
+use arrow::compute::SlicesIterator;
+use std::cmp::Ordering;
+use std::collections::VecDeque;
+use std::ops::Range;
+
+/// [`RowSelector`] represents a range of rows to scan from a parquet file
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct RowSelector {
+ /// The number of rows
+ pub row_count: usize,
+
+ /// If true, skip `row_count` rows
+ pub skip: bool,
+}
+
+impl RowSelector {
+ /// Select `row_count` rows
+ pub fn select(row_count: usize) -> Self {
+ Self {
+ row_count,
+ skip: false,
+ }
+ }
+
+ /// Skip `row_count` rows
+ pub fn skip(row_count: usize) -> Self {
+ Self {
+ row_count,
+ skip: true,
+ }
+ }
+}
+
+/// [`RowSelection`] allows selecting or skipping a provided number of rows
+/// when scanning the parquet file.
+///
+/// This is applied prior to reading column data, and can therefore
+/// be used to skip IO to fetch data into memory
+///
+/// A typical use-case would be using the [`PageIndex`] to filter out rows
+/// that don't satisfy a predicate
+///
+/// [`PageIndex`]: [crate::file::page_index::index::PageIndex]
+#[derive(Debug, Clone, Default, Eq, PartialEq)]
+pub struct RowSelection {
+ selectors: Vec<RowSelector>,
+}
+
+impl RowSelection {
+ /// Creates a [`RowSelection`] from a slice of [`BooleanArray`]
+ ///
+ /// # Panic
+ ///
+ /// Panics if any of the [`BooleanArray`] contain nulls
+ pub fn from_filters(filters: &[BooleanArray]) -> Self {
+ let mut next_offset = 0;
+ let total_rows = filters.iter().map(|x| x.len()).sum();
+
+ let iter = filters.iter().flat_map(|filter| {
+ let offset = next_offset;
+ next_offset += filter.len();
+ assert_eq!(filter.null_count(), 0);
+ SlicesIterator::new(filter)
+ .map(move |(start, end)| start + offset..end + offset)
+ });
+
+ Self::from_consecutive_ranges(iter, total_rows)
+ }
+
+ /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep
+ fn from_consecutive_ranges<I: Iterator<Item = Range<usize>>>(
+ ranges: I,
+ total_rows: usize,
+ ) -> Self {
+ let mut selectors: Vec<RowSelector> = Vec::with_capacity(ranges.size_hint().0);
+ let mut last_end = 0;
+ for range in ranges {
+ let len = range.end - range.start;
+
+ match range.start.cmp(&last_end) {
+ Ordering::Equal => match selectors.last_mut() {
+ Some(last) => last.row_count += len,
+ None => selectors.push(RowSelector::select(len)),
+ },
+ Ordering::Greater => {
+ selectors.push(RowSelector::skip(range.start - last_end));
+ selectors.push(RowSelector::select(len))
+ }
+ Ordering::Less => panic!("out of order"),
+ }
+ last_end = range.end;
+ }
+
+ if last_end != total_rows {
+ selectors.push(RowSelector::skip(total_rows - last_end))
+ }
+
+ Self { selectors }
+ }
+
+ /// Splits off the first `row_count` from this [`RowSelection`]
+ pub fn split_off(&mut self, row_count: usize) -> Self {
+ let mut total_count = 0;
+
+ // Find the index where the selector exceeds the row count
+ let find = self.selectors.iter().enumerate().find(|(_, selector)| {
+ total_count += selector.row_count;
+ total_count > row_count
+ });
+
+ let split_idx = match find {
+ Some((idx, _)) => idx,
+ None => {
+ let selectors = std::mem::take(&mut self.selectors);
+ return Self { selectors };
+ }
+ };
+
+ let mut remaining = self.selectors.split_off(split_idx);
+
+ // Always present as `split_idx < self.selectors.len`
+ let next = remaining.first_mut().unwrap();
+ let overflow = total_count - row_count;
+
+ if next.row_count != overflow {
+ self.selectors.push(RowSelector {
+ row_count: next.row_count - overflow,
+ skip: next.skip,
+ })
+ }
+ next.row_count = overflow;
+
+ std::mem::swap(&mut remaining, &mut self.selectors);
+ Self {
+ selectors: remaining,
+ }
+ }
+
+ /// Given a [`RowSelection`] computed under `self`, returns the [`RowSelection`]
+ /// representing their conjunction
+ ///
+ /// For example:
+ ///
+ /// self: NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY
+ /// other: YYYYYNNNNYYYYYYYYYYYYY YYNNN
+ ///
+ /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYYNNYNNNN
+ ///
+ ///
+ pub fn and_then(&self, other: &Self) -> Self {
+ let mut selectors = vec![];
+ let mut first = self.selectors.iter().cloned().peekable();
+ let mut second = other.selectors.iter().cloned().peekable();
+
+ let mut to_skip = 0;
+ while let Some(b) = second.peek_mut() {
+ let a = first.peek_mut().unwrap();
+
+ if b.row_count == 0 {
+ second.next().unwrap();
+ continue;
+ }
+
+ if a.row_count == 0 {
+ first.next().unwrap();
+ continue;
+ }
+
+ if a.skip {
+ // Records were skipped when producing second
+ to_skip += a.row_count;
+ first.next().unwrap();
+ continue;
+ }
+
+ let skip = b.skip;
+ let to_process = a.row_count.min(b.row_count);
+
+ a.row_count -= to_process;
+ b.row_count -= to_process;
+
+ match skip {
+ true => to_skip += to_process,
+ false => {
+ if to_skip != 0 {
+ selectors.push(RowSelector::skip(to_skip));
+ to_skip = 0;
+ }
+ selectors.push(RowSelector::select(to_process))
+ }
+ }
+ }
+
+ for v in first {
+ if v.row_count != 0 {
+ assert!(v.skip);
+ to_skip += v.row_count
+ }
+ }
+
+ if to_skip != 0 {
+ selectors.push(RowSelector::skip(to_skip));
+ }
+
+ Self { selectors }
+ }
+
+ /// Returns `true` if this [`RowSelection`] selects any rows
+ pub fn selects_any(&self) -> bool {
+ self.selectors.iter().any(|x| !x.skip)
+ }
+}
+
+impl From<Vec<RowSelector>> for RowSelection {
+ fn from(selectors: Vec<RowSelector>) -> Self {
+ Self { selectors }
+ }
+}
+
+impl From<RowSelection> for VecDeque<RowSelector> {
+ fn from(r: RowSelection) -> Self {
+ r.selectors.into()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use rand::{thread_rng, Rng};
+
+ #[test]
+ fn test_from_filters() {
+ let filters = vec![
+ BooleanArray::from(vec![false, false, false, true, true, true, true]),
+ BooleanArray::from(vec![true, true, false, false, true, true, true]),
+ BooleanArray::from(vec![false, false, false, false]),
+ BooleanArray::from(Vec::<bool>::new()),
+ ];
+
+ let selection = RowSelection::from_filters(&filters[..1]);
+ assert!(selection.selects_any());
+ assert_eq!(
+ selection.selectors,
+ vec![RowSelector::skip(3), RowSelector::select(4)]
+ );
+
+ let selection = RowSelection::from_filters(&filters[..2]);
+ assert!(selection.selects_any());
+ assert_eq!(
+ selection.selectors,
+ vec![
+ RowSelector::skip(3),
+ RowSelector::select(6),
+ RowSelector::skip(2),
+ RowSelector::select(3)
+ ]
+ );
+
+ let selection = RowSelection::from_filters(&filters);
+ assert!(selection.selects_any());
+ assert_eq!(
+ selection.selectors,
+ vec![
+ RowSelector::skip(3),
+ RowSelector::select(6),
+ RowSelector::skip(2),
+ RowSelector::select(3),
+ RowSelector::skip(4)
+ ]
+ );
+
+ let selection = RowSelection::from_filters(&filters[2..3]);
+ assert!(!selection.selects_any());
+ assert_eq!(selection.selectors, vec![RowSelector::skip(4)]);
+ }
+
+ #[test]
+ fn test_split_off() {
+ let mut selection = RowSelection::from(vec![
+ RowSelector::skip(34),
+ RowSelector::select(12),
+ RowSelector::skip(3),
+ RowSelector::select(35),
+ ]);
+
+ let split = selection.split_off(34);
+ assert_eq!(split.selectors, vec![RowSelector::skip(34)]);
+ assert_eq!(
+ selection.selectors,
+ vec![
+ RowSelector::select(12),
+ RowSelector::skip(3),
+ RowSelector::select(35)
+ ]
+ );
+
+ let split = selection.split_off(5);
+ assert_eq!(split.selectors, vec![RowSelector::select(5)]);
+ assert_eq!(
+ selection.selectors,
+ vec![
+ RowSelector::select(7),
+ RowSelector::skip(3),
+ RowSelector::select(35)
+ ]
+ );
+
+ let split = selection.split_off(8);
+ assert_eq!(
+ split.selectors,
+ vec![RowSelector::select(7), RowSelector::skip(1)]
+ );
+ assert_eq!(
+ selection.selectors,
+ vec![RowSelector::skip(2), RowSelector::select(35)]
+ );
+
+ let split = selection.split_off(200);
+ assert_eq!(
+ split.selectors,
+ vec![RowSelector::skip(2), RowSelector::select(35)]
+ );
+ assert!(selection.selectors.is_empty());
+ }
+
+ #[test]
+ fn test_and() {
+ let mut a = RowSelection::from(vec![
+ RowSelector::skip(12),
+ RowSelector::select(23),
+ RowSelector::skip(3),
+ RowSelector::select(5),
+ ]);
+
+ let b = RowSelection::from(vec![
+ RowSelector::select(5),
+ RowSelector::skip(4),
+ RowSelector::select(15),
+ RowSelector::skip(4),
+ ]);
+
+ let mut expected = RowSelection::from(vec![
+ RowSelector::skip(12),
+ RowSelector::select(5),
+ RowSelector::skip(4),
+ RowSelector::select(14),
+ RowSelector::skip(3),
+ RowSelector::select(1),
+ RowSelector::skip(4),
+ ]);
+
+ assert_eq!(a.and_then(&b), expected);
+
+ a.split_off(7);
+ expected.split_off(7);
+ assert_eq!(a.and_then(&b), expected);
+
+ let a = RowSelection::from(vec![RowSelector::select(5), RowSelector::skip(3)]);
+
+ let b = RowSelection::from(vec![
+ RowSelector::select(2),
+ RowSelector::skip(1),
+ RowSelector::select(1),
+ RowSelector::skip(1),
+ ]);
+
+ assert_eq!(
+ a.and_then(&b).selectors,
+ vec![
+ RowSelector::select(2),
+ RowSelector::skip(1),
+ RowSelector::select(1),
+ RowSelector::skip(4)
+ ]
+ );
+ }
+
+ #[test]
+ fn test_and_fuzz() {
+ let mut rand = thread_rng();
+ for _ in 0..100 {
+ let a_len = rand.gen_range(10..100);
+ let a_bools: Vec<_> = (0..a_len).map(|x| rand.gen_bool(0.2)).collect();
+ let a = RowSelection::from_filters(&[BooleanArray::from(a_bools.clone())]);
+
+ let b_len: usize = a_bools.iter().map(|x| *x as usize).sum();
+ let b_bools: Vec<_> = (0..b_len).map(|x| rand.gen_bool(0.8)).collect();
+ let b = RowSelection::from_filters(&[BooleanArray::from(b_bools.clone())]);
+
+ let mut expected_bools = vec![false; a_len];
+
+ let mut iter_b = b_bools.iter();
+ for (idx, b) in a_bools.iter().enumerate() {
+ if *b && *iter_b.next().unwrap() {
+ expected_bools[idx] = true;
+ }
+ }
+
+ let expected =
+ RowSelection::from_filters(&[BooleanArray::from(expected_bools)]);
+
+ let total_rows: usize = expected.selectors.iter().map(|s| s.row_count).sum();
+ assert_eq!(a_len, total_rows);
+
+ assert_eq!(a.and_then(&b), expected);
+ }
+ }
+}
diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs
index 3770ed265..5c186d7aa 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -86,6 +86,7 @@ use std::task::{Context, Poll};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
+use futures::ready;
use futures::stream::Stream;
use parquet_format::{PageHeader, PageType};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
@@ -94,7 +95,9 @@ use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
-use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+use crate::arrow::arrow_reader::{
+ evaluate_predicate, ParquetRecordBatchReader, RowFilter, RowSelection,
+};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::ProjectionMask;
use crate::basic::Compression;
@@ -102,7 +105,7 @@ use crate::column::page::{Page, PageIterator, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
-use crate::file::metadata::ParquetMetaData;
+use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::serialized_reader::{decode_page, read_page_header};
use crate::file::FOOTER_SIZE;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
@@ -209,9 +212,13 @@ pub struct ParquetRecordBatchStreamBuilder<T> {
row_groups: Option<Vec<usize>>,
projection: ProjectionMask,
+
+ filter: Option<RowFilter>,
+
+ selection: Option<RowSelection>,
}
-impl<T: AsyncFileReader> ParquetRecordBatchStreamBuilder<T> {
+impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
pub async fn new(mut input: T) -> Result<Self> {
let metadata = input.get_metadata().await?;
@@ -228,6 +235,8 @@ impl<T: AsyncFileReader> ParquetRecordBatchStreamBuilder<T> {
batch_size: 1024,
row_groups: None,
projection: ProjectionMask::all(),
+ filter: None,
+ selection: None,
})
}
@@ -267,6 +276,32 @@ impl<T: AsyncFileReader> ParquetRecordBatchStreamBuilder<T> {
}
}
+ /// Provide a [`RowSelection] to filter out rows, and avoid fetching their
+ /// data into memory
+ ///
+ /// Row group filtering is applied prior to this, and rows from skipped
+ /// row groups should not be included in the [`RowSelection`]
+ ///
+ /// TODO: Make public once stable (#1792)
+ #[allow(unused)]
+ pub(crate) fn with_row_selection(self, selection: RowSelection) -> Self {
+ Self {
+ selection: Some(selection),
+ ..self
+ }
+ }
+
+ /// Provide a [`RowFilter`] to skip decoding rows
+ ///
+ /// TODO: Make public once stable (#1792)
+ #[allow(unused)]
+ pub(crate) fn with_row_filter(self, filter: RowFilter) -> Self {
+ Self {
+ filter: Some(filter),
+ ..self
+ }
+ }
+
/// Build a new [`ParquetRecordBatchStream`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
let num_row_groups = self.metadata.row_groups().len();
@@ -285,25 +320,122 @@ impl<T: AsyncFileReader> ParquetRecordBatchStreamBuilder<T> {
None => (0..self.metadata.row_groups().len()).collect(),
};
+ let reader = ReaderFactory {
+ input: self.input,
+ filter: self.filter,
+ metadata: self.metadata.clone(),
+ schema: self.schema.clone(),
+ };
+
Ok(ParquetRecordBatchStream {
+ metadata: self.metadata,
+ batch_size: self.batch_size,
row_groups,
projection: self.projection,
- batch_size: self.batch_size,
- metadata: self.metadata,
+ selection: self.selection,
schema: self.schema,
- input: Some(self.input),
+ reader: Some(reader),
state: StreamState::Init,
})
}
}
+type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
+
+/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
+/// [`ParquetRecordBatchReader`]
+struct ReaderFactory<T> {
+ metadata: Arc<ParquetMetaData>,
+
+ schema: SchemaRef,
+
+ input: T,
+
+ filter: Option<RowFilter>,
+}
+
+impl<T> ReaderFactory<T>
+where
+ T: AsyncFileReader + Send,
+{
+ /// Reads the next row group with the provided `selection`, `projection` and `batch_size`
+ ///
+ /// Note: this captures self so that the resulting future has a static lifetime
+ async fn read_row_group(
+ mut self,
+ row_group_idx: usize,
+ mut selection: Option<RowSelection>,
+ projection: ProjectionMask,
+ batch_size: usize,
+ ) -> ReadResult<T> {
+ // TODO: calling build_array multiple times is wasteful
+ let selects_any = |selection: Option<&RowSelection>| {
+ selection.map(|x| x.selects_any()).unwrap_or(true)
+ };
+
+ let meta = self.metadata.row_group(row_group_idx);
+ let mut row_group = InMemoryRowGroup {
+ schema: meta.schema_descr_ptr(),
+ row_count: meta.num_rows() as usize,
+ column_chunks: vec![None; meta.columns().len()],
+ };
+
+ if let Some(filter) = self.filter.as_mut() {
+ for predicate in filter.predicates.iter_mut() {
+ if !selects_any(selection.as_ref()) {
+ return Ok((self, None));
+ }
+
+ let predicate_projection = predicate.projection().clone();
+ row_group
+ .fetch(
+ &mut self.input,
+ meta,
+ &predicate_projection,
+ selection.as_ref(),
+ )
+ .await?;
+
+ let array_reader = build_array_reader(
+ self.schema.clone(),
+ predicate_projection,
+ &row_group,
+ )?;
+
+ selection = Some(evaluate_predicate(
+ batch_size,
+ array_reader,
+ selection,
+ predicate.as_mut(),
+ )?);
+ }
+ }
+
+ if !selects_any(selection.as_ref()) {
+ return Ok((self, None));
+ }
+
+ row_group
+ .fetch(&mut self.input, meta, &projection, selection.as_ref())
+ .await?;
+
+ let reader = ParquetRecordBatchReader::new(
+ batch_size,
+ build_array_reader(self.schema.clone(), projection, &row_group)?,
+ selection,
+ );
+
+ Ok((self, Some(reader)))
+ }
+}
+
enum StreamState<T> {
/// At the start of a new row group, or the end of the parquet stream
Init,
/// Decoding a batch
Decoding(ParquetRecordBatchReader),
/// Reading data from input
- Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>),
+ Reading(BoxFuture<'static, ReadResult<T>>),
/// Error
Error,
}
@@ -319,20 +451,23 @@ impl<T> std::fmt::Debug for StreamState<T> {
}
}
-/// An asynchronous [`Stream`] of [`RecordBatch`] for a parquet file
+/// An asynchronous [`Stream`] of [`RecordBatch`] for a parquet file that can be
+/// constructed using [`ParquetRecordBatchStreamBuilder`]
pub struct ParquetRecordBatchStream<T> {
metadata: Arc<ParquetMetaData>,
schema: SchemaRef,
- batch_size: usize,
+ row_groups: VecDeque<usize>,
projection: ProjectionMask,
- row_groups: VecDeque<usize>,
+ batch_size: usize,
+
+ selection: Option<RowSelection>,
/// This is an option so it can be moved into a future
- input: Option<T>,
+ reader: Option<ReaderFactory<T>>,
state: StreamState<T>,
}
@@ -384,101 +519,40 @@ where
None => return Poll::Ready(None),
};
- let metadata = self.metadata.clone();
- let mut input = match self.input.take() {
- Some(input) => input,
- None => {
- self.state = StreamState::Error;
- return Poll::Ready(Some(Err(general_err!(
- "input stream lost"
- ))));
- }
- };
+ let reader = self.reader.take().expect("lost reader");
- let projection = self.projection.clone();
- self.state = StreamState::Reading(
- async move {
- let row_group_metadata = metadata.row_group(row_group_idx);
- let mut column_chunks =
- vec![None; row_group_metadata.columns().len()];
-
- // TODO: Combine consecutive ranges
- let fetch_ranges = (0..column_chunks.len())
- .into_iter()
- .filter_map(|idx| {
- if !projection.leaf_included(idx) {
- None
- } else {
- let column = row_group_metadata.column(idx);
- let (start, length) = column.byte_range();
-
- Some(start as usize..(start + length) as usize)
- }
- })
- .collect();
-
- let mut chunk_data =
- input.get_byte_ranges(fetch_ranges).await?.into_iter();
-
- for (idx, chunk) in column_chunks.iter_mut().enumerate() {
- if !projection.leaf_included(idx) {
- continue;
- }
-
- let column = row_group_metadata.column(idx);
-
- if let Some(data) = chunk_data.next() {
- *chunk = Some(InMemoryColumnChunk {
- num_values: column.num_values(),
- compression: column.compression(),
- physical_type: column.column_type(),
- data,
- });
- }
- }
-
- Ok((
- input,
- InMemoryRowGroup {
- schema: metadata.file_metadata().schema_descr_ptr(),
- row_count: row_group_metadata.num_rows() as usize,
- column_chunks,
- },
- ))
- }
- .boxed(),
- )
- }
- StreamState::Reading(f) => {
- let result = futures::ready!(f.poll_unpin(cx));
- self.state = StreamState::Init;
-
- let row_group: Box<dyn RowGroupCollection> = match result {
- Ok((input, row_group)) => {
- self.input = Some(input);
- Box::new(row_group)
- }
- Err(e) => {
- self.state = StreamState::Error;
- return Poll::Ready(Some(Err(e)));
- }
- };
+ let row_count =
+ self.metadata.row_group(row_group_idx).num_rows() as usize;
- let parquet_schema = self.metadata.file_metadata().schema_descr_ptr();
+ let selection =
+ self.selection.as_mut().map(|s| s.split_off(row_count));
- let array_reader = build_array_reader(
- parquet_schema,
- self.schema.clone(),
- self.projection.clone(),
- row_group,
- )?;
-
- let batch_reader =
- ParquetRecordBatchReader::try_new(self.batch_size, array_reader)
- .expect("reader");
+ let fut = reader
+ .read_row_group(
+ row_group_idx,
+ selection,
+ self.projection.clone(),
+ self.batch_size,
+ )
+ .boxed();
- self.state = StreamState::Decoding(batch_reader)
+ self.state = StreamState::Reading(fut)
}
+ StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
+ Ok((reader_factory, maybe_reader)) => {
+ self.reader = Some(reader_factory);
+ match maybe_reader {
+ // Read records from [`ParquetRecordBatchReader`]
+ Some(reader) => self.state = StreamState::Decoding(reader),
+ // All rows skipped, read next row group
+ None => self.state = StreamState::Init,
+ }
+ }
+ Err(e) => {
+ self.state = StreamState::Error;
+ return Poll::Ready(Some(Err(e)));
+ }
+ },
StreamState::Error => return Poll::Pending,
}
}
@@ -492,9 +566,56 @@ struct InMemoryRowGroup {
row_count: usize,
}
+impl InMemoryRowGroup {
+ /// Fetches the necessary column data into memory
+ async fn fetch<T: AsyncFileReader + Send>(
+ &mut self,
+ input: &mut T,
+ metadata: &RowGroupMetaData,
+ projection: &ProjectionMask,
+ _selection: Option<&RowSelection>,
+ ) -> Result<()> {
+ // TODO: Use OffsetIndex and selection to prune pages
+
+ let fetch_ranges = self
+ .column_chunks
+ .iter()
+ .enumerate()
+ .into_iter()
+ .filter_map(|(idx, chunk)| {
+ (chunk.is_none() && projection.leaf_included(idx)).then(|| {
+ let column = metadata.column(idx);
+ let (start, length) = column.byte_range();
+ start as usize..(start + length) as usize
+ })
+ })
+ .collect();
+
+ let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
+
+ for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+ if chunk.is_some() || !projection.leaf_included(idx) {
+ continue;
+ }
+
+ let column = metadata.column(idx);
+
+ if let Some(data) = chunk_data.next() {
+ *chunk = Some(InMemoryColumnChunk {
+ num_values: column.num_values(),
+ compression: column.compression(),
+ physical_type: column.column_type(),
+ data,
+ });
+ }
+ }
+ Ok(())
+ }
+}
+
impl RowGroupCollection for InMemoryRowGroup {
- fn schema(&self) -> Result<SchemaDescPtr> {
- Ok(self.schema.clone())
+ fn schema(&self) -> SchemaDescPtr {
+ self.schema.clone()
}
fn num_rows(&self) -> usize {
@@ -685,7 +806,10 @@ impl PageIterator for ColumnChunkIterator {
#[cfg(test)]
mod tests {
use super::*;
- use crate::arrow::{ArrowReader, ParquetFileArrowReader};
+ use crate::arrow::arrow_reader::ArrowPredicateFn;
+ use crate::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader};
+ use crate::file::footer::parse_metadata;
+ use arrow::array::{Array, ArrayRef, Int32Array, StringArray};
use arrow::error::Result as ArrowResult;
use futures::TryStreamExt;
use std::sync::Mutex;
@@ -858,4 +982,73 @@ mod tests {
assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE);
assert_eq!(second_page.num_values(), 8);
}
+
+ #[tokio::test]
+ async fn test_row_filter() {
+ let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
+ let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
+ let c = Int32Array::from_iter(0..6);
+ let data = RecordBatch::try_from_iter([
+ ("a", Arc::new(a) as ArrayRef),
+ ("b", Arc::new(b) as ArrayRef),
+ ("c", Arc::new(c) as ArrayRef),
+ ])
+ .unwrap();
+
+ let mut buf = Vec::with_capacity(1024);
+ let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
+ writer.write(&data).unwrap();
+ writer.close().unwrap();
+
+ let data: Bytes = buf.into();
+ let metadata = parse_metadata(&data).unwrap();
+ let parquet_schema = metadata.file_metadata().schema_descr_ptr();
+
+ let test = TestReader {
+ data,
+ metadata: Arc::new(metadata),
+ requests: Default::default(),
+ };
+ let requests = test.requests.clone();
+
+ let a_filter = ArrowPredicateFn::new(
+ ProjectionMask::leaves(&parquet_schema, vec![0]),
+ |batch| arrow::compute::eq_dyn_utf8_scalar(batch.column(0), "b"),
+ );
+
+ let b_filter = ArrowPredicateFn::new(
+ ProjectionMask::leaves(&parquet_schema, vec![1]),
+ |batch| arrow::compute::eq_dyn_utf8_scalar(batch.column(0), "4"),
+ );
+
+ let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
+
+ let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
+ let stream = ParquetRecordBatchStreamBuilder::new(test)
+ .await
+ .unwrap()
+ .with_projection(mask.clone())
+ .with_batch_size(1024)
+ .with_row_filter(filter)
+ .build()
+ .unwrap();
+
+ let batches: Vec<_> = stream.try_collect().await.unwrap();
+ assert_eq!(batches.len(), 1);
+
+ let batch = &batches[0];
+ assert_eq!(batch.num_rows(), 1);
+ assert_eq!(batch.num_columns(), 2);
+
+ let col = batch.column(0);
+ let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
+ assert_eq!(val, "b");
+
+ let col = batch.column(1);
+ let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
+ assert_eq!(val, 3);
+
+ // Should only have made 3 requests
+ assert_eq!(requests.lock().unwrap().len(), 3);
+ }
}