You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/11 20:01:01 UTC

[arrow-rs] branch master updated: Add Parquet RowFilter API (#2335)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 21ba02eff Add Parquet RowFilter API (#2335)
21ba02eff is described below

commit 21ba02eff1ce356fc06c2949afe2ad4c84553ce9
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Aug 11 21:00:56 2022 +0100

    Add Parquet RowFilter API (#2335)
    
    * Add RowFilter API
    
    * Review feedback
    
    * Fix doc
    
    * Fix handling of NULL boolean array
    
    * Add tests, fix bugs
    
    * Fix clippy
    
    * Review feedback
    
    * Fix doc
---
 parquet/src/arrow/array_reader/builder.rs          |  19 +-
 parquet/src/arrow/array_reader/list_array.rs       |  10 +-
 parquet/src/arrow/array_reader/mod.rs              |   6 +-
 parquet/src/arrow/arrow_reader/filter.rs           | 109 ++++++
 .../arrow/{arrow_reader.rs => arrow_reader/mod.rs} | 135 ++++---
 parquet/src/arrow/arrow_reader/selection.rs        | 426 +++++++++++++++++++++
 parquet/src/arrow/async_reader.rs                  | 401 ++++++++++++++-----
 7 files changed, 910 insertions(+), 196 deletions(-)

diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs
index 32ffaeb9d..e389158a1 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -39,20 +39,18 @@ use crate::data_type::{
     Int64Type, Int96Type,
 };
 use crate::errors::Result;
-use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type};
+use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
 
 /// Create array reader from parquet schema, projection mask, and parquet file reader.
 pub fn build_array_reader(
-    parquet_schema: SchemaDescPtr,
     arrow_schema: SchemaRef,
     mask: ProjectionMask,
-    row_groups: Box<dyn RowGroupCollection>,
+    row_groups: &dyn RowGroupCollection,
 ) -> Result<Box<dyn ArrayReader>> {
-    let field =
-        convert_schema(parquet_schema.as_ref(), mask, Some(arrow_schema.as_ref()))?;
+    let field = convert_schema(&row_groups.schema(), mask, Some(arrow_schema.as_ref()))?;
 
     match &field {
-        Some(field) => build_reader(field, row_groups.as_ref()),
+        Some(field) => build_reader(field, row_groups),
         None => Ok(make_empty_array_reader(row_groups.num_rows())),
     }
 }
@@ -333,13 +331,8 @@ mod tests {
         )
         .unwrap();
 
-        let array_reader = build_array_reader(
-            file_reader.metadata().file_metadata().schema_descr_ptr(),
-            Arc::new(arrow_schema),
-            mask,
-            Box::new(file_reader),
-        )
-        .unwrap();
+        let array_reader =
+            build_array_reader(Arc::new(arrow_schema), mask, &file_reader).unwrap();
 
         // Create arrow types
         let arrow_type = DataType::Struct(vec![Field::new(
diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs
index 2acd59dcc..d2fa94611 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -582,13 +582,9 @@ mod tests {
         let schema = file_metadata.schema_descr_ptr();
         let mask = ProjectionMask::leaves(&schema, vec![0]);
 
-        let mut array_reader = build_array_reader(
-            schema,
-            Arc::new(arrow_schema),
-            mask,
-            Box::new(file_reader),
-        )
-        .unwrap();
+        let mut array_reader =
+            build_array_reader(Arc::new(arrow_schema), mask, &file_reader)
+                .unwrap();
 
         let batch = array_reader.next_batch(100).unwrap();
         assert_eq!(batch.data_type(), array_reader.get_data_type());
diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs
index d7665ef0f..54c45a336 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -100,7 +100,7 @@ pub trait ArrayReader: Send {
 /// A collection of row groups
 pub trait RowGroupCollection {
     /// Get schema of parquet file.
-    fn schema(&self) -> Result<SchemaDescPtr>;
+    fn schema(&self) -> SchemaDescPtr;
 
     /// Get the numer of rows in this collection
     fn num_rows(&self) -> usize;
@@ -110,8 +110,8 @@ pub trait RowGroupCollection {
 }
 
 impl RowGroupCollection for Arc<dyn FileReader> {
-    fn schema(&self) -> Result<SchemaDescPtr> {
-        Ok(self.metadata().file_metadata().schema_descr_ptr())
+    fn schema(&self) -> SchemaDescPtr {
+        self.metadata().file_metadata().schema_descr_ptr()
     }
 
     fn num_rows(&self) -> usize {
diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs
new file mode 100644
index 000000000..8945ccde4
--- /dev/null
+++ b/parquet/src/arrow/arrow_reader/filter.rs
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::ProjectionMask;
+use arrow::array::BooleanArray;
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+
+/// A predicate operating on [`RecordBatch`]
+pub trait ArrowPredicate: Send + 'static {
+    /// Returns the [`ProjectionMask`] that describes the columns required
+    /// to evaluate this predicate. All projected columns will be provided in the `batch`
+    /// passed to [`evaluate`](Self::evaluate)
+    fn projection(&self) -> &ProjectionMask;
+
+    /// Evaluate this predicate for the given [`RecordBatch`] containing the columns
+    /// identified by [`Self::projection`]
+    ///
+    /// Rows that are `true` in the returned [`BooleanArray`] will be returned by the
+    /// parquet reader, whereas rows that are `false` or `Null` will not be
+    fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray>;
+}
+
+/// An [`ArrowPredicate`] created from an [`FnMut`]
+pub struct ArrowPredicateFn<F> {
+    f: F,
+    projection: ProjectionMask,
+}
+
+impl<F> ArrowPredicateFn<F>
+where
+    F: FnMut(RecordBatch) -> ArrowResult<BooleanArray> + Send + 'static,
+{
+    /// Create a new [`ArrowPredicateFn`]. `f` will be passed batches
+    /// that contains the columns specified in `projection`
+    /// and returns a [`BooleanArray`] that describes which rows should
+    /// be passed along
+    pub fn new(projection: ProjectionMask, f: F) -> Self {
+        Self { f, projection }
+    }
+}
+
+impl<F> ArrowPredicate for ArrowPredicateFn<F>
+where
+    F: FnMut(RecordBatch) -> ArrowResult<BooleanArray> + Send + 'static,
+{
+    fn projection(&self) -> &ProjectionMask {
+        &self.projection
+    }
+
+    fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
+        (self.f)(batch)
+    }
+}
+
+/// A [`RowFilter`] allows pushing down a filter predicate to skip IO and decode
+///
+/// This consists of a list of [`ArrowPredicate`] where only the rows that satisfy all
+/// of the predicates will be returned. Any [`RowSelection`] will be applied prior
+/// to the first predicate, and each predicate in turn will then be used to compute
+/// a more refined [`RowSelection`] to use when evaluating the subsequent predicates.
+///
+/// Once all predicates have been evaluated, the final [`RowSelection`] is applied
+/// to the top-level [`ProjectionMask`] to produce the final output [`RecordBatch`].
+///
+/// This design has a couple of implications:
+///
+/// * [`RowFilter`] can be used to skip entire pages, and thus IO, in addition to CPU decode overheads
+/// * Columns may be decoded multiple times if they appear in multiple [`ProjectionMask`]
+/// * IO will be deferred until needed by a [`ProjectionMask`]
+///
+/// As such there is a trade-off between a single large predicate, or multiple predicates,
+/// that will depend on the shape of the data. Whilst multiple smaller predicates may
+/// minimise the amount of data scanned/decoded, it may not be faster overall.
+///
+/// For example, if a predicate that needs a single column of data filters out all but
+/// 1% of the rows, applying it as one of the early `ArrowPredicateFn` will likely significantly
+/// improve performance.
+///
+/// As a counter example, if a predicate needs several columns of data to evaluate but
+/// leaves 99% of the rows, it may be better to not filter the data from parquet and
+/// apply the filter after the RecordBatch has been fully decoded.
+///
+/// [`RowSelection`]: [super::selection::RowSelection]
+pub struct RowFilter {
+    /// A list of [`ArrowPredicate`]
+    pub(crate) predicates: Vec<Box<dyn ArrowPredicate>>,
+}
+
+impl RowFilter {
+    /// Create a new [`RowFilter`] from an array of [`ArrowPredicate`]
+    pub fn new(predicates: Vec<Box<dyn ArrowPredicate>>) -> Self {
+        Self { predicates }
+    }
+}
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader/mod.rs
similarity index 96%
rename from parquet/src/arrow/arrow_reader.rs
rename to parquet/src/arrow/arrow_reader/mod.rs
index fd68f4bc0..e363919f6 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -21,6 +21,7 @@ use std::collections::VecDeque;
 use std::sync::Arc;
 
 use arrow::array::Array;
+use arrow::compute::prep_null_mask_filter;
 use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::{RecordBatch, RecordBatchReader};
@@ -36,6 +37,17 @@ use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
 use crate::file::serialized_reader::ReadOptionsBuilder;
 use crate::schema::types::SchemaDescriptor;
 
+#[allow(unused)]
+mod filter;
+#[allow(unused)]
+mod selection;
+
+// TODO: Make these public once stable (#1792)
+#[allow(unused_imports)]
+pub(crate) use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
+#[allow(unused_imports)]
+pub(crate) use selection::{RowSelection, RowSelector};
+
 /// Arrow reader api.
 /// With this api, user can get arrow schema from parquet file, and read parquet data
 /// into arrow arrays.
@@ -72,41 +84,10 @@ pub trait ArrowReader {
     ) -> Result<Self::RecordReader>;
 }
 
-/// [`RowSelection`] allows selecting or skipping a provided number of rows
-/// when scanning the parquet file
-#[derive(Debug, Clone, Copy)]
-pub(crate) struct RowSelection {
-    /// The number of rows
-    pub row_count: usize,
-
-    /// If true, skip `row_count` rows
-    pub skip: bool,
-}
-
-impl RowSelection {
-    /// Select `row_count` rows
-    #[allow(unused)]
-    pub fn select(row_count: usize) -> Self {
-        Self {
-            row_count,
-            skip: false,
-        }
-    }
-
-    /// Skip `row_count` rows
-    #[allow(unused)]
-    pub fn skip(row_count: usize) -> Self {
-        Self {
-            row_count,
-            skip: true,
-        }
-    }
-}
-
 #[derive(Debug, Clone, Default)]
 pub struct ArrowReaderOptions {
     skip_arrow_metadata: bool,
-    selection: Option<Vec<RowSelection>>,
+    selection: Option<RowSelection>,
 }
 
 impl ArrowReaderOptions {
@@ -130,12 +111,9 @@ impl ArrowReaderOptions {
 
     /// Scan rows from the parquet file according to the provided `selection`
     ///
-    /// TODO: Make public once row selection fully implemented (#1792)
+    /// TODO: Revisit this API, as [`Self`] is provided before the file metadata is available
     #[allow(unused)]
-    pub(crate) fn with_row_selection(
-        self,
-        selection: impl Into<Vec<RowSelection>>,
-    ) -> Self {
+    pub(crate) fn with_row_selection(self, selection: impl Into<RowSelection>) -> Self {
         Self {
             selection: Some(selection.into()),
             ..self
@@ -143,6 +121,9 @@ impl ArrowReaderOptions {
     }
 }
 
+/// An `ArrowReader` that can be used to synchronously read parquet data as [`RecordBatch`]
+///
+/// See [`crate::arrow::async_reader`] for an asynchronous interface
 pub struct ParquetFileArrowReader {
     file_reader: Arc<dyn FileReader>,
 
@@ -178,21 +159,13 @@ impl ArrowReader for ParquetFileArrowReader {
         mask: ProjectionMask,
         batch_size: usize,
     ) -> Result<ParquetRecordBatchReader> {
-        let array_reader = build_array_reader(
-            self.file_reader
-                .metadata()
-                .file_metadata()
-                .schema_descr_ptr(),
-            Arc::new(self.get_schema()?),
-            mask,
-            Box::new(self.file_reader.clone()),
-        )?;
+        let array_reader =
+            build_array_reader(Arc::new(self.get_schema()?), mask, &self.file_reader)?;
 
-        let selection = self.options.selection.clone().map(Into::into);
         Ok(ParquetRecordBatchReader::new(
             batch_size,
             array_reader,
-            selection,
+            self.options.selection.clone(),
         ))
     }
 }
@@ -279,11 +252,13 @@ impl ParquetFileArrowReader {
     }
 }
 
+/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
+/// read from a parquet data source
 pub struct ParquetRecordBatchReader {
     batch_size: usize,
     array_reader: Box<dyn ArrayReader>,
     schema: SchemaRef,
-    selection: Option<VecDeque<RowSelection>>,
+    selection: Option<VecDeque<RowSelector>>,
 }
 
 impl Iterator for ParquetRecordBatchReader {
@@ -319,7 +294,7 @@ impl Iterator for ParquetRecordBatchReader {
                         Some(remaining) if remaining != 0 => {
                             // if page row count less than batch_size we must set batch size to page row count.
                             // add check avoid dead loop
-                            selection.push_front(RowSelection::select(remaining));
+                            selection.push_front(RowSelector::select(remaining));
                             need_read
                         }
                         _ => front.row_count,
@@ -364,22 +339,13 @@ impl RecordBatchReader for ParquetRecordBatchReader {
 }
 
 impl ParquetRecordBatchReader {
-    pub fn try_new(
-        batch_size: usize,
-        array_reader: Box<dyn ArrayReader>,
-    ) -> Result<Self> {
-        Ok(Self::new(batch_size, array_reader, None))
-    }
-
     /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
     /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
     /// all rows will be returned
-    ///
-    /// TODO: Make public once row selection fully implemented (#1792)
     pub(crate) fn new(
         batch_size: usize,
         array_reader: Box<dyn ArrayReader>,
-        selection: Option<VecDeque<RowSelection>>,
+        selection: Option<RowSelection>,
     ) -> Self {
         let schema = match array_reader.get_data_type() {
             ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
@@ -390,11 +356,41 @@ impl ParquetRecordBatchReader {
             batch_size,
             array_reader,
             schema: Arc::new(schema),
-            selection,
+            selection: selection.map(Into::into),
         }
     }
 }
 
+/// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`]
+///
+/// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the
+/// returned [`RowSelection`] will be the conjunction of this and
+/// the rows selected by `predicate`
+#[allow(unused)]
+pub(crate) fn evaluate_predicate(
+    batch_size: usize,
+    array_reader: Box<dyn ArrayReader>,
+    input_selection: Option<RowSelection>,
+    predicate: &mut dyn ArrowPredicate,
+) -> Result<RowSelection> {
+    let reader =
+        ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
+    let mut filters = vec![];
+    for maybe_batch in reader {
+        let filter = predicate.evaluate(maybe_batch?)?;
+        match filter.null_count() {
+            0 => filters.push(filter),
+            _ => filters.push(prep_null_mask_filter(&filter)),
+        };
+    }
+
+    let raw = RowSelection::from_filters(&filters);
+    Ok(match input_selection {
+        Some(selection) => selection.and_then(&raw),
+        None => raw,
+    })
+}
+
 #[cfg(test)]
 mod tests {
     use bytes::Bytes;
@@ -417,7 +413,7 @@ mod tests {
 
     use crate::arrow::arrow_reader::{
         ArrowReader, ArrowReaderOptions, ParquetFileArrowReader,
-        ParquetRecordBatchReader, RowSelection,
+        ParquetRecordBatchReader, RowSelection, RowSelector,
     };
     use crate::arrow::buffer::converter::{
         Converter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter,
@@ -893,7 +889,7 @@ mod tests {
         /// Encoding
         encoding: Encoding,
         //row selections and total selected row count
-        row_selections: Option<(Vec<RowSelection>, usize)>,
+        row_selections: Option<(RowSelection, usize)>,
     }
 
     impl Default for TestOptions {
@@ -1187,6 +1183,7 @@ mod tests {
             let mut without_skip_data = gen_expected_data::<T>(&def_levels, &values);
 
             let mut skip_data: Vec<Option<T::T>> = vec![];
+            let selections: VecDeque<RowSelector> = selections.into();
             for select in selections {
                 if select.skip {
                     without_skip_data.drain(0..select.row_count);
@@ -1763,12 +1760,12 @@ mod tests {
     /// a `batch_size` and `selection`
     fn get_expected_batches(
         column: &RecordBatch,
-        selection: &[RowSelection],
+        selection: &RowSelection,
         batch_size: usize,
     ) -> Vec<RecordBatch> {
         let mut expected_batches = vec![];
 
-        let mut selection: VecDeque<_> = selection.iter().cloned().collect();
+        let mut selection: VecDeque<_> = selection.clone().into();
         let mut row_offset = 0;
         let mut last_start = None;
         while row_offset < column.num_rows() && !selection.is_empty() {
@@ -1820,7 +1817,7 @@ mod tests {
         step_len: usize,
         total_len: usize,
         skip_first: bool,
-    ) -> (Vec<RowSelection>, usize) {
+    ) -> (RowSelection, usize) {
         let mut remaining = total_len;
         let mut skip = skip_first;
         let mut vec = vec![];
@@ -1831,7 +1828,7 @@ mod tests {
             } else {
                 remaining
             };
-            vec.push(RowSelection {
+            vec.push(RowSelector {
                 row_count: step,
                 skip,
             });
@@ -1841,7 +1838,7 @@ mod tests {
             }
             skip = !skip;
         }
-        (vec, selected_count)
+        (vec.into(), selected_count)
     }
 
     #[test]
@@ -1890,7 +1887,7 @@ mod tests {
         fn create_skip_reader(
             test_file: &File,
             batch_size: usize,
-            selections: Vec<RowSelection>,
+            selections: RowSelection,
         ) -> ParquetRecordBatchReader {
             let arrow_reader_options =
                 ArrowReaderOptions::new().with_row_selection(selections);
diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs
new file mode 100644
index 000000000..8e129f566
--- /dev/null
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -0,0 +1,426 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Array, BooleanArray};
+use arrow::compute::SlicesIterator;
+use std::cmp::Ordering;
+use std::collections::VecDeque;
+use std::ops::Range;
+
+/// [`RowSelector`] represents a range of rows to scan from a parquet file
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct RowSelector {
+    /// The number of rows
+    pub row_count: usize,
+
+    /// If true, skip `row_count` rows
+    pub skip: bool,
+}
+
+impl RowSelector {
+    /// Select `row_count` rows
+    pub fn select(row_count: usize) -> Self {
+        Self {
+            row_count,
+            skip: false,
+        }
+    }
+
+    /// Skip `row_count` rows
+    pub fn skip(row_count: usize) -> Self {
+        Self {
+            row_count,
+            skip: true,
+        }
+    }
+}
+
+/// [`RowSelection`] allows selecting or skipping a provided number of rows
+/// when scanning the parquet file.
+///
+/// This is applied prior to reading column data, and can therefore
+/// be used to skip IO to fetch data into memory
+///
+/// A typical use-case would be using the [`PageIndex`] to filter out rows
+/// that don't satisfy a predicate
+///
+/// [`PageIndex`]: [crate::file::page_index::index::PageIndex]
+#[derive(Debug, Clone, Default, Eq, PartialEq)]
+pub struct RowSelection {
+    selectors: Vec<RowSelector>,
+}
+
+impl RowSelection {
+    /// Creates a [`RowSelection`] from a slice of [`BooleanArray`]
+    ///
+    /// # Panic
+    ///
+    /// Panics if any of the [`BooleanArray`] contain nulls
+    pub fn from_filters(filters: &[BooleanArray]) -> Self {
+        let mut next_offset = 0;
+        let total_rows = filters.iter().map(|x| x.len()).sum();
+
+        let iter = filters.iter().flat_map(|filter| {
+            let offset = next_offset;
+            next_offset += filter.len();
+            assert_eq!(filter.null_count(), 0);
+            SlicesIterator::new(filter)
+                .map(move |(start, end)| start + offset..end + offset)
+        });
+
+        Self::from_consecutive_ranges(iter, total_rows)
+    }
+
+    /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep
+    fn from_consecutive_ranges<I: Iterator<Item = Range<usize>>>(
+        ranges: I,
+        total_rows: usize,
+    ) -> Self {
+        let mut selectors: Vec<RowSelector> = Vec::with_capacity(ranges.size_hint().0);
+        let mut last_end = 0;
+        for range in ranges {
+            let len = range.end - range.start;
+
+            match range.start.cmp(&last_end) {
+                Ordering::Equal => match selectors.last_mut() {
+                    Some(last) => last.row_count += len,
+                    None => selectors.push(RowSelector::select(len)),
+                },
+                Ordering::Greater => {
+                    selectors.push(RowSelector::skip(range.start - last_end));
+                    selectors.push(RowSelector::select(len))
+                }
+                Ordering::Less => panic!("out of order"),
+            }
+            last_end = range.end;
+        }
+
+        if last_end != total_rows {
+            selectors.push(RowSelector::skip(total_rows - last_end))
+        }
+
+        Self { selectors }
+    }
+
+    /// Splits off the first `row_count` from this [`RowSelection`]
+    pub fn split_off(&mut self, row_count: usize) -> Self {
+        let mut total_count = 0;
+
+        // Find the index where the selector exceeds the row count
+        let find = self.selectors.iter().enumerate().find(|(_, selector)| {
+            total_count += selector.row_count;
+            total_count > row_count
+        });
+
+        let split_idx = match find {
+            Some((idx, _)) => idx,
+            None => {
+                let selectors = std::mem::take(&mut self.selectors);
+                return Self { selectors };
+            }
+        };
+
+        let mut remaining = self.selectors.split_off(split_idx);
+
+        // Always present as `split_idx < self.selectors.len`
+        let next = remaining.first_mut().unwrap();
+        let overflow = total_count - row_count;
+
+        if next.row_count != overflow {
+            self.selectors.push(RowSelector {
+                row_count: next.row_count - overflow,
+                skip: next.skip,
+            })
+        }
+        next.row_count = overflow;
+
+        std::mem::swap(&mut remaining, &mut self.selectors);
+        Self {
+            selectors: remaining,
+        }
+    }
+
+    /// Given a [`RowSelection`] computed under `self`, returns the [`RowSelection`]
+    /// representing their conjunction
+    ///
+    /// For example:
+    ///
+    /// self:     NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY
+    /// other:                YYYYYNNNNYYYYYYYYYYYYY   YYNNN
+    ///
+    /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYYNNYNNNN
+    ///
+    ///
+    pub fn and_then(&self, other: &Self) -> Self {
+        let mut selectors = vec![];
+        let mut first = self.selectors.iter().cloned().peekable();
+        let mut second = other.selectors.iter().cloned().peekable();
+
+        let mut to_skip = 0;
+        while let Some(b) = second.peek_mut() {
+            let a = first.peek_mut().unwrap();
+
+            if b.row_count == 0 {
+                second.next().unwrap();
+                continue;
+            }
+
+            if a.row_count == 0 {
+                first.next().unwrap();
+                continue;
+            }
+
+            if a.skip {
+                // Records were skipped when producing second
+                to_skip += a.row_count;
+                first.next().unwrap();
+                continue;
+            }
+
+            let skip = b.skip;
+            let to_process = a.row_count.min(b.row_count);
+
+            a.row_count -= to_process;
+            b.row_count -= to_process;
+
+            match skip {
+                true => to_skip += to_process,
+                false => {
+                    if to_skip != 0 {
+                        selectors.push(RowSelector::skip(to_skip));
+                        to_skip = 0;
+                    }
+                    selectors.push(RowSelector::select(to_process))
+                }
+            }
+        }
+
+        for v in first {
+            if v.row_count != 0 {
+                assert!(v.skip);
+                to_skip += v.row_count
+            }
+        }
+
+        if to_skip != 0 {
+            selectors.push(RowSelector::skip(to_skip));
+        }
+
+        Self { selectors }
+    }
+
+    /// Returns `true` if this [`RowSelection`] selects any rows
+    pub fn selects_any(&self) -> bool {
+        self.selectors.iter().any(|x| !x.skip)
+    }
+}
+
+impl From<Vec<RowSelector>> for RowSelection {
+    fn from(selectors: Vec<RowSelector>) -> Self {
+        Self { selectors }
+    }
+}
+
+impl From<RowSelection> for VecDeque<RowSelector> {
+    fn from(r: RowSelection) -> Self {
+        r.selectors.into()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use rand::{thread_rng, Rng};
+
+    #[test]
+    fn test_from_filters() {
+        let filters = vec![
+            BooleanArray::from(vec![false, false, false, true, true, true, true]),
+            BooleanArray::from(vec![true, true, false, false, true, true, true]),
+            BooleanArray::from(vec![false, false, false, false]),
+            BooleanArray::from(Vec::<bool>::new()),
+        ];
+
+        let selection = RowSelection::from_filters(&filters[..1]);
+        assert!(selection.selects_any());
+        assert_eq!(
+            selection.selectors,
+            vec![RowSelector::skip(3), RowSelector::select(4)]
+        );
+
+        let selection = RowSelection::from_filters(&filters[..2]);
+        assert!(selection.selects_any());
+        assert_eq!(
+            selection.selectors,
+            vec![
+                RowSelector::skip(3),
+                RowSelector::select(6),
+                RowSelector::skip(2),
+                RowSelector::select(3)
+            ]
+        );
+
+        let selection = RowSelection::from_filters(&filters);
+        assert!(selection.selects_any());
+        assert_eq!(
+            selection.selectors,
+            vec![
+                RowSelector::skip(3),
+                RowSelector::select(6),
+                RowSelector::skip(2),
+                RowSelector::select(3),
+                RowSelector::skip(4)
+            ]
+        );
+
+        let selection = RowSelection::from_filters(&filters[2..3]);
+        assert!(!selection.selects_any());
+        assert_eq!(selection.selectors, vec![RowSelector::skip(4)]);
+    }
+
+    #[test]
+    fn test_split_off() {
+        let mut selection = RowSelection::from(vec![
+            RowSelector::skip(34),
+            RowSelector::select(12),
+            RowSelector::skip(3),
+            RowSelector::select(35),
+        ]);
+
+        let split = selection.split_off(34);
+        assert_eq!(split.selectors, vec![RowSelector::skip(34)]);
+        assert_eq!(
+            selection.selectors,
+            vec![
+                RowSelector::select(12),
+                RowSelector::skip(3),
+                RowSelector::select(35)
+            ]
+        );
+
+        let split = selection.split_off(5);
+        assert_eq!(split.selectors, vec![RowSelector::select(5)]);
+        assert_eq!(
+            selection.selectors,
+            vec![
+                RowSelector::select(7),
+                RowSelector::skip(3),
+                RowSelector::select(35)
+            ]
+        );
+
+        let split = selection.split_off(8);
+        assert_eq!(
+            split.selectors,
+            vec![RowSelector::select(7), RowSelector::skip(1)]
+        );
+        assert_eq!(
+            selection.selectors,
+            vec![RowSelector::skip(2), RowSelector::select(35)]
+        );
+
+        let split = selection.split_off(200);
+        assert_eq!(
+            split.selectors,
+            vec![RowSelector::skip(2), RowSelector::select(35)]
+        );
+        assert!(selection.selectors.is_empty());
+    }
+
+    #[test]
+    fn test_and() {
+        let mut a = RowSelection::from(vec![
+            RowSelector::skip(12),
+            RowSelector::select(23),
+            RowSelector::skip(3),
+            RowSelector::select(5),
+        ]);
+
+        let b = RowSelection::from(vec![
+            RowSelector::select(5),
+            RowSelector::skip(4),
+            RowSelector::select(15),
+            RowSelector::skip(4),
+        ]);
+
+        let mut expected = RowSelection::from(vec![
+            RowSelector::skip(12),
+            RowSelector::select(5),
+            RowSelector::skip(4),
+            RowSelector::select(14),
+            RowSelector::skip(3),
+            RowSelector::select(1),
+            RowSelector::skip(4),
+        ]);
+
+        assert_eq!(a.and_then(&b), expected);
+
+        a.split_off(7);
+        expected.split_off(7);
+        assert_eq!(a.and_then(&b), expected);
+
+        let a = RowSelection::from(vec![RowSelector::select(5), RowSelector::skip(3)]);
+
+        let b = RowSelection::from(vec![
+            RowSelector::select(2),
+            RowSelector::skip(1),
+            RowSelector::select(1),
+            RowSelector::skip(1),
+        ]);
+
+        assert_eq!(
+            a.and_then(&b).selectors,
+            vec![
+                RowSelector::select(2),
+                RowSelector::skip(1),
+                RowSelector::select(1),
+                RowSelector::skip(4)
+            ]
+        );
+    }
+
+    #[test]
+    fn test_and_fuzz() {
+        let mut rand = thread_rng();
+        for _ in 0..100 {
+            let a_len = rand.gen_range(10..100);
+            let a_bools: Vec<_> = (0..a_len).map(|x| rand.gen_bool(0.2)).collect();
+            let a = RowSelection::from_filters(&[BooleanArray::from(a_bools.clone())]);
+
+            let b_len: usize = a_bools.iter().map(|x| *x as usize).sum();
+            let b_bools: Vec<_> = (0..b_len).map(|x| rand.gen_bool(0.8)).collect();
+            let b = RowSelection::from_filters(&[BooleanArray::from(b_bools.clone())]);
+
+            let mut expected_bools = vec![false; a_len];
+
+            let mut iter_b = b_bools.iter();
+            for (idx, b) in a_bools.iter().enumerate() {
+                if *b && *iter_b.next().unwrap() {
+                    expected_bools[idx] = true;
+                }
+            }
+
+            let expected =
+                RowSelection::from_filters(&[BooleanArray::from(expected_bools)]);
+
+            let total_rows: usize = expected.selectors.iter().map(|s| s.row_count).sum();
+            assert_eq!(a_len, total_rows);
+
+            assert_eq!(a.and_then(&b), expected);
+        }
+    }
+}
diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs
index 3770ed265..5c186d7aa 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -86,6 +86,7 @@ use std::task::{Context, Poll};
 
 use bytes::Bytes;
 use futures::future::{BoxFuture, FutureExt};
+use futures::ready;
 use futures::stream::Stream;
 use parquet_format::{PageHeader, PageType};
 use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
@@ -94,7 +95,9 @@ use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 
 use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
-use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+use crate::arrow::arrow_reader::{
+    evaluate_predicate, ParquetRecordBatchReader, RowFilter, RowSelection,
+};
 use crate::arrow::schema::parquet_to_arrow_schema;
 use crate::arrow::ProjectionMask;
 use crate::basic::Compression;
@@ -102,7 +105,7 @@ use crate::column::page::{Page, PageIterator, PageMetadata, PageReader};
 use crate::compression::{create_codec, Codec};
 use crate::errors::{ParquetError, Result};
 use crate::file::footer::{decode_footer, decode_metadata};
-use crate::file::metadata::ParquetMetaData;
+use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
 use crate::file::serialized_reader::{decode_page, read_page_header};
 use crate::file::FOOTER_SIZE;
 use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
@@ -209,9 +212,13 @@ pub struct ParquetRecordBatchStreamBuilder<T> {
     row_groups: Option<Vec<usize>>,
 
     projection: ProjectionMask,
+
+    filter: Option<RowFilter>,
+
+    selection: Option<RowSelection>,
 }
 
-impl<T: AsyncFileReader> ParquetRecordBatchStreamBuilder<T> {
+impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
     /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
     pub async fn new(mut input: T) -> Result<Self> {
         let metadata = input.get_metadata().await?;
@@ -228,6 +235,8 @@ impl<T: AsyncFileReader> ParquetRecordBatchStreamBuilder<T> {
             batch_size: 1024,
             row_groups: None,
             projection: ProjectionMask::all(),
+            filter: None,
+            selection: None,
         })
     }
 
@@ -267,6 +276,32 @@ impl<T: AsyncFileReader> ParquetRecordBatchStreamBuilder<T> {
         }
     }
 
+    /// Provide a [`RowSelection] to filter out rows, and avoid fetching their
+    /// data into memory
+    ///
+    /// Row group filtering is applied prior to this, and rows from skipped
+    /// row groups should not be included in the [`RowSelection`]
+    ///
+    /// TODO: Make public once stable (#1792)
+    #[allow(unused)]
+    pub(crate) fn with_row_selection(self, selection: RowSelection) -> Self {
+        Self {
+            selection: Some(selection),
+            ..self
+        }
+    }
+
+    /// Provide a [`RowFilter`] to skip decoding rows
+    ///
+    /// TODO: Make public once stable (#1792)
+    #[allow(unused)]
+    pub(crate) fn with_row_filter(self, filter: RowFilter) -> Self {
+        Self {
+            filter: Some(filter),
+            ..self
+        }
+    }
+
     /// Build a new [`ParquetRecordBatchStream`]
     pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
         let num_row_groups = self.metadata.row_groups().len();
@@ -285,25 +320,122 @@ impl<T: AsyncFileReader> ParquetRecordBatchStreamBuilder<T> {
             None => (0..self.metadata.row_groups().len()).collect(),
         };
 
+        let reader = ReaderFactory {
+            input: self.input,
+            filter: self.filter,
+            metadata: self.metadata.clone(),
+            schema: self.schema.clone(),
+        };
+
         Ok(ParquetRecordBatchStream {
+            metadata: self.metadata,
+            batch_size: self.batch_size,
             row_groups,
             projection: self.projection,
-            batch_size: self.batch_size,
-            metadata: self.metadata,
+            selection: self.selection,
             schema: self.schema,
-            input: Some(self.input),
+            reader: Some(reader),
             state: StreamState::Init,
         })
     }
 }
 
+type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
+
+/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
+/// [`ParquetRecordBatchReader`]
+struct ReaderFactory<T> {
+    metadata: Arc<ParquetMetaData>,
+
+    schema: SchemaRef,
+
+    input: T,
+
+    filter: Option<RowFilter>,
+}
+
+impl<T> ReaderFactory<T>
+where
+    T: AsyncFileReader + Send,
+{
+    /// Reads the next row group with the provided `selection`, `projection` and `batch_size`
+    ///
+    /// Note: this captures self so that the resulting future has a static lifetime
+    async fn read_row_group(
+        mut self,
+        row_group_idx: usize,
+        mut selection: Option<RowSelection>,
+        projection: ProjectionMask,
+        batch_size: usize,
+    ) -> ReadResult<T> {
+        // TODO: calling build_array multiple times is wasteful
+        let selects_any = |selection: Option<&RowSelection>| {
+            selection.map(|x| x.selects_any()).unwrap_or(true)
+        };
+
+        let meta = self.metadata.row_group(row_group_idx);
+        let mut row_group = InMemoryRowGroup {
+            schema: meta.schema_descr_ptr(),
+            row_count: meta.num_rows() as usize,
+            column_chunks: vec![None; meta.columns().len()],
+        };
+
+        if let Some(filter) = self.filter.as_mut() {
+            for predicate in filter.predicates.iter_mut() {
+                if !selects_any(selection.as_ref()) {
+                    return Ok((self, None));
+                }
+
+                let predicate_projection = predicate.projection().clone();
+                row_group
+                    .fetch(
+                        &mut self.input,
+                        meta,
+                        &predicate_projection,
+                        selection.as_ref(),
+                    )
+                    .await?;
+
+                let array_reader = build_array_reader(
+                    self.schema.clone(),
+                    predicate_projection,
+                    &row_group,
+                )?;
+
+                selection = Some(evaluate_predicate(
+                    batch_size,
+                    array_reader,
+                    selection,
+                    predicate.as_mut(),
+                )?);
+            }
+        }
+
+        if !selects_any(selection.as_ref()) {
+            return Ok((self, None));
+        }
+
+        row_group
+            .fetch(&mut self.input, meta, &projection, selection.as_ref())
+            .await?;
+
+        let reader = ParquetRecordBatchReader::new(
+            batch_size,
+            build_array_reader(self.schema.clone(), projection, &row_group)?,
+            selection,
+        );
+
+        Ok((self, Some(reader)))
+    }
+}
+
 enum StreamState<T> {
     /// At the start of a new row group, or the end of the parquet stream
     Init,
     /// Decoding a batch
     Decoding(ParquetRecordBatchReader),
     /// Reading data from input
-    Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>),
+    Reading(BoxFuture<'static, ReadResult<T>>),
     /// Error
     Error,
 }
@@ -319,20 +451,23 @@ impl<T> std::fmt::Debug for StreamState<T> {
     }
 }
 
-/// An asynchronous [`Stream`] of [`RecordBatch`] for a parquet file
+/// An asynchronous [`Stream`] of [`RecordBatch`] for a parquet file that can be
+/// constructed using [`ParquetRecordBatchStreamBuilder`]
 pub struct ParquetRecordBatchStream<T> {
     metadata: Arc<ParquetMetaData>,
 
     schema: SchemaRef,
 
-    batch_size: usize,
+    row_groups: VecDeque<usize>,
 
     projection: ProjectionMask,
 
-    row_groups: VecDeque<usize>,
+    batch_size: usize,
+
+    selection: Option<RowSelection>,
 
     /// This is an option so it can be moved into a future
-    input: Option<T>,
+    reader: Option<ReaderFactory<T>>,
 
     state: StreamState<T>,
 }
@@ -384,101 +519,40 @@ where
                         None => return Poll::Ready(None),
                     };
 
-                    let metadata = self.metadata.clone();
-                    let mut input = match self.input.take() {
-                        Some(input) => input,
-                        None => {
-                            self.state = StreamState::Error;
-                            return Poll::Ready(Some(Err(general_err!(
-                                "input stream lost"
-                            ))));
-                        }
-                    };
+                    let reader = self.reader.take().expect("lost reader");
 
-                    let projection = self.projection.clone();
-                    self.state = StreamState::Reading(
-                        async move {
-                            let row_group_metadata = metadata.row_group(row_group_idx);
-                            let mut column_chunks =
-                                vec![None; row_group_metadata.columns().len()];
-
-                            // TODO: Combine consecutive ranges
-                            let fetch_ranges = (0..column_chunks.len())
-                                .into_iter()
-                                .filter_map(|idx| {
-                                    if !projection.leaf_included(idx) {
-                                        None
-                                    } else {
-                                        let column = row_group_metadata.column(idx);
-                                        let (start, length) = column.byte_range();
-
-                                        Some(start as usize..(start + length) as usize)
-                                    }
-                                })
-                                .collect();
-
-                            let mut chunk_data =
-                                input.get_byte_ranges(fetch_ranges).await?.into_iter();
-
-                            for (idx, chunk) in column_chunks.iter_mut().enumerate() {
-                                if !projection.leaf_included(idx) {
-                                    continue;
-                                }
-
-                                let column = row_group_metadata.column(idx);
-
-                                if let Some(data) = chunk_data.next() {
-                                    *chunk = Some(InMemoryColumnChunk {
-                                        num_values: column.num_values(),
-                                        compression: column.compression(),
-                                        physical_type: column.column_type(),
-                                        data,
-                                    });
-                                }
-                            }
-
-                            Ok((
-                                input,
-                                InMemoryRowGroup {
-                                    schema: metadata.file_metadata().schema_descr_ptr(),
-                                    row_count: row_group_metadata.num_rows() as usize,
-                                    column_chunks,
-                                },
-                            ))
-                        }
-                        .boxed(),
-                    )
-                }
-                StreamState::Reading(f) => {
-                    let result = futures::ready!(f.poll_unpin(cx));
-                    self.state = StreamState::Init;
-
-                    let row_group: Box<dyn RowGroupCollection> = match result {
-                        Ok((input, row_group)) => {
-                            self.input = Some(input);
-                            Box::new(row_group)
-                        }
-                        Err(e) => {
-                            self.state = StreamState::Error;
-                            return Poll::Ready(Some(Err(e)));
-                        }
-                    };
+                    let row_count =
+                        self.metadata.row_group(row_group_idx).num_rows() as usize;
 
-                    let parquet_schema = self.metadata.file_metadata().schema_descr_ptr();
+                    let selection =
+                        self.selection.as_mut().map(|s| s.split_off(row_count));
 
-                    let array_reader = build_array_reader(
-                        parquet_schema,
-                        self.schema.clone(),
-                        self.projection.clone(),
-                        row_group,
-                    )?;
-
-                    let batch_reader =
-                        ParquetRecordBatchReader::try_new(self.batch_size, array_reader)
-                            .expect("reader");
+                    let fut = reader
+                        .read_row_group(
+                            row_group_idx,
+                            selection,
+                            self.projection.clone(),
+                            self.batch_size,
+                        )
+                        .boxed();
 
-                    self.state = StreamState::Decoding(batch_reader)
+                    self.state = StreamState::Reading(fut)
                 }
+                StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
+                    Ok((reader_factory, maybe_reader)) => {
+                        self.reader = Some(reader_factory);
+                        match maybe_reader {
+                            // Read records from [`ParquetRecordBatchReader`]
+                            Some(reader) => self.state = StreamState::Decoding(reader),
+                            // All rows skipped, read next row group
+                            None => self.state = StreamState::Init,
+                        }
+                    }
+                    Err(e) => {
+                        self.state = StreamState::Error;
+                        return Poll::Ready(Some(Err(e)));
+                    }
+                },
                 StreamState::Error => return Poll::Pending,
             }
         }
@@ -492,9 +566,56 @@ struct InMemoryRowGroup {
     row_count: usize,
 }
 
+impl InMemoryRowGroup {
+    /// Fetches the necessary column data into memory
+    async fn fetch<T: AsyncFileReader + Send>(
+        &mut self,
+        input: &mut T,
+        metadata: &RowGroupMetaData,
+        projection: &ProjectionMask,
+        _selection: Option<&RowSelection>,
+    ) -> Result<()> {
+        // TODO: Use OffsetIndex and selection to prune pages
+
+        let fetch_ranges = self
+            .column_chunks
+            .iter()
+            .enumerate()
+            .into_iter()
+            .filter_map(|(idx, chunk)| {
+                (chunk.is_none() && projection.leaf_included(idx)).then(|| {
+                    let column = metadata.column(idx);
+                    let (start, length) = column.byte_range();
+                    start as usize..(start + length) as usize
+                })
+            })
+            .collect();
+
+        let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
+
+        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+            if chunk.is_some() || !projection.leaf_included(idx) {
+                continue;
+            }
+
+            let column = metadata.column(idx);
+
+            if let Some(data) = chunk_data.next() {
+                *chunk = Some(InMemoryColumnChunk {
+                    num_values: column.num_values(),
+                    compression: column.compression(),
+                    physical_type: column.column_type(),
+                    data,
+                });
+            }
+        }
+        Ok(())
+    }
+}
+
 impl RowGroupCollection for InMemoryRowGroup {
-    fn schema(&self) -> Result<SchemaDescPtr> {
-        Ok(self.schema.clone())
+    fn schema(&self) -> SchemaDescPtr {
+        self.schema.clone()
     }
 
     fn num_rows(&self) -> usize {
@@ -685,7 +806,10 @@ impl PageIterator for ColumnChunkIterator {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::arrow::{ArrowReader, ParquetFileArrowReader};
+    use crate::arrow::arrow_reader::ArrowPredicateFn;
+    use crate::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader};
+    use crate::file::footer::parse_metadata;
+    use arrow::array::{Array, ArrayRef, Int32Array, StringArray};
     use arrow::error::Result as ArrowResult;
     use futures::TryStreamExt;
     use std::sync::Mutex;
@@ -858,4 +982,73 @@ mod tests {
         assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE);
         assert_eq!(second_page.num_values(), 8);
     }
+
+    #[tokio::test]
+    async fn test_row_filter() {
+        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
+        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
+        let c = Int32Array::from_iter(0..6);
+        let data = RecordBatch::try_from_iter([
+            ("a", Arc::new(a) as ArrayRef),
+            ("b", Arc::new(b) as ArrayRef),
+            ("c", Arc::new(c) as ArrayRef),
+        ])
+        .unwrap();
+
+        let mut buf = Vec::with_capacity(1024);
+        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
+        writer.write(&data).unwrap();
+        writer.close().unwrap();
+
+        let data: Bytes = buf.into();
+        let metadata = parse_metadata(&data).unwrap();
+        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
+
+        let test = TestReader {
+            data,
+            metadata: Arc::new(metadata),
+            requests: Default::default(),
+        };
+        let requests = test.requests.clone();
+
+        let a_filter = ArrowPredicateFn::new(
+            ProjectionMask::leaves(&parquet_schema, vec![0]),
+            |batch| arrow::compute::eq_dyn_utf8_scalar(batch.column(0), "b"),
+        );
+
+        let b_filter = ArrowPredicateFn::new(
+            ProjectionMask::leaves(&parquet_schema, vec![1]),
+            |batch| arrow::compute::eq_dyn_utf8_scalar(batch.column(0), "4"),
+        );
+
+        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
+
+        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
+        let stream = ParquetRecordBatchStreamBuilder::new(test)
+            .await
+            .unwrap()
+            .with_projection(mask.clone())
+            .with_batch_size(1024)
+            .with_row_filter(filter)
+            .build()
+            .unwrap();
+
+        let batches: Vec<_> = stream.try_collect().await.unwrap();
+        assert_eq!(batches.len(), 1);
+
+        let batch = &batches[0];
+        assert_eq!(batch.num_rows(), 1);
+        assert_eq!(batch.num_columns(), 2);
+
+        let col = batch.column(0);
+        let val = col.as_any().downcast_ref::<StringArray>().unwrap().value(0);
+        assert_eq!(val, "b");
+
+        let col = batch.column(1);
+        let val = col.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
+        assert_eq!(val, 3);
+
+        // Should only have made 3 requests
+        assert_eq!(requests.lock().unwrap().len(), 3);
+    }
 }