You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/15 16:44:37 UTC

[arrow-rs] branch master updated: Support RowFilter within ParquetRecordBatchReader (#2431) (#2452)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 569c78129 Support RowFilter within ParquetRecordBatchReader (#2431) (#2452)
569c78129 is described below

commit 569c781290dc12a8e2d86069df3827225a53d563
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Aug 15 17:44:33 2022 +0100

    Support RowFilter within ParquetRecordBatchReader (#2431) (#2452)
    
    * Support RowFilter within ParquetRecordBatchReader (#2431)
    
    * Fix doc
    
    * Review feedback
---
 parquet/src/arrow/arrow_reader/mod.rs | 143 ++++++++++++++++++++++++++++++----
 parquet/src/arrow/async_reader.rs     |   7 +-
 2 files changed, 129 insertions(+), 21 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index e96b5d8fa..052ef40ee 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -400,22 +400,46 @@ impl<T: ChunkReader + 'static> ArrowReaderBuilder<SyncReader<T>> {
         Self::new_builder(SyncReader(reader), metadata, options)
     }
 
+    /// Build a [`ParquetRecordBatchReader`]
+    ///
+    /// Note: this will eagerly evaluate any `RowFilter` before returning
     pub fn build(self) -> Result<ParquetRecordBatchReader> {
         let reader =
             FileReaderRowGroupCollection::new(Arc::new(self.input.0), self.row_groups);
+
+        let mut filter = self.filter;
+        let mut selection = self.selection;
+
+        if let Some(filter) = filter.as_mut() {
+            for predicate in filter.predicates.iter_mut() {
+                if !selects_any(selection.as_ref()) {
+                    break;
+                }
+
+                let projection = predicate.projection().clone();
+                let array_reader =
+                    build_array_reader(Arc::clone(&self.schema), projection, &reader)?;
+
+                selection = Some(evaluate_predicate(
+                    self.batch_size,
+                    array_reader,
+                    selection,
+                    predicate.as_mut(),
+                )?);
+            }
+        }
+
         let array_reader = build_array_reader(self.schema, self.projection, &reader)?;
 
-        if self.filter.is_some() {
-            // TODO: Support RowFilter within sync interface (#2431)
-            return Err(nyi_err!(
-                "RowFilter is currently not supported within the sync interface"
-            ));
+        // If selection is empty, truncate
+        if !selects_any(selection.as_ref()) {
+            selection = Some(RowSelection::from(vec![]));
         }
 
         Ok(ParquetRecordBatchReader::new(
             self.batch_size,
             array_reader,
-            self.selection,
+            selection,
         ))
     }
 }
@@ -541,12 +565,16 @@ impl ParquetRecordBatchReader {
     }
 }
 
+/// Returns `true` if `selection` is `None` or selects some rows
+pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
+    selection.map(|x| x.selects_any()).unwrap_or(true)
+}
+
 /// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`]
 ///
 /// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the
 /// returned [`RowSelection`] will be the conjunction of this and
 /// the rows selected by `predicate`
-#[allow(unused)]
 pub(crate) fn evaluate_predicate(
     batch_size: usize,
     array_reader: Box<dyn ArrayReader>,
@@ -576,6 +604,7 @@ mod tests {
     use bytes::Bytes;
     use std::cmp::min;
     use std::collections::VecDeque;
+    use std::fmt::Formatter;
     use std::fs::File;
     use std::io::Seek;
     use std::path::PathBuf;
@@ -591,8 +620,8 @@ mod tests {
     use arrow::record_batch::{RecordBatch, RecordBatchReader};
 
     use crate::arrow::arrow_reader::{
-        ArrowReaderOptions, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder,
-        RowSelection, RowSelector,
+        ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReader,
+        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
     };
     use crate::arrow::buffer::converter::{
         Converter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter,
@@ -1021,7 +1050,7 @@ mod tests {
     }
 
     /// Parameters for single_column_reader_test
-    #[derive(Debug, Clone)]
+    #[derive(Clone)]
     struct TestOptions {
         /// Number of row group to write to parquet (row group size =
         /// num_row_groups / num_rows)
@@ -1047,8 +1076,30 @@ mod tests {
         enabled_statistics: EnabledStatistics,
         /// Encoding
         encoding: Encoding,
-        //row selections and total selected row count
+        /// row selections and total selected row count
         row_selections: Option<(RowSelection, usize)>,
+        /// row filter
+        row_filter: Option<Vec<bool>>,
+    }
+
+    /// Manually implement this to avoid printing entire contents of row_selections and row_filter
+    impl std::fmt::Debug for TestOptions {
+        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+            f.debug_struct("TestOptions")
+                .field("num_row_groups", &self.num_row_groups)
+                .field("num_rows", &self.num_rows)
+                .field("record_batch_size", &self.record_batch_size)
+                .field("null_percent", &self.null_percent)
+                .field("write_batch_size", &self.write_batch_size)
+                .field("max_data_page_size", &self.max_data_page_size)
+                .field("max_dict_page_size", &self.max_dict_page_size)
+                .field("writer_version", &self.writer_version)
+                .field("enabled_statistics", &self.enabled_statistics)
+                .field("encoding", &self.encoding)
+                .field("row_selections", &self.row_selections.is_some())
+                .field("row_filter", &self.row_filter.is_some())
+                .finish()
+        }
     }
 
     impl Default for TestOptions {
@@ -1065,6 +1116,7 @@ mod tests {
                 enabled_statistics: EnabledStatistics::Page,
                 encoding: Encoding::PLAIN,
                 row_selections: None,
+                row_filter: None,
             }
         }
     }
@@ -1108,6 +1160,8 @@ mod tests {
         }
 
         fn with_row_selections(self) -> Self {
+            assert!(self.row_filter.is_none(), "Must set row selection first");
+
             let mut rng = thread_rng();
             let step = rng.gen_range(self.record_batch_size..self.num_rows);
             let row_selections = create_test_selection(
@@ -1121,6 +1175,19 @@ mod tests {
             }
         }
 
+        fn with_row_filter(self) -> Self {
+            let row_count = match &self.row_selections {
+                Some((_, count)) => *count,
+                None => self.num_row_groups * self.num_rows,
+            };
+
+            let mut rng = thread_rng();
+            Self {
+                row_filter: Some((0..row_count).map(|_| rng.gen_bool(0.9)).collect()),
+                ..self
+            }
+        }
+
         fn writer_props(&self) -> WriterProperties {
             let builder = WriterProperties::builder()
                 .set_data_pagesize_limit(self.max_data_page_size)
@@ -1158,7 +1225,7 @@ mod tests {
         G: RandGen<T>,
         F: Fn(&[Option<T::T>]) -> ArrayRef,
     {
-        let mut all_options = vec![
+        let all_options = vec![
             // choose record_batch_batch (15) so batches cross row
             // group boundaries (50 rows in 2 row groups) cases.
             TestOptions::new(2, 100, 15),
@@ -1187,9 +1254,8 @@ mod tests {
             TestOptions::new(2, 256, 91)
                 .with_null_percent(25)
                 .with_enabled_statistics(EnabledStatistics::None),
-        ];
+            // Test skip
 
-        let skip_options = vec![
             // choose record_batch_batch (15) so batches cross row
             // group boundaries (50 rows in 2 row groups) cases.
             TestOptions::new(2, 100, 15).with_row_selections(),
@@ -1218,10 +1284,25 @@ mod tests {
             TestOptions::new(2, 256, 93)
                 .with_null_percent(25)
                 .with_row_selections(),
+            // Test filter
+
+            // Test with row filter
+            TestOptions::new(4, 100, 25).with_row_filter(),
+            // Test with row selection and row filter
+            TestOptions::new(4, 100, 25)
+                .with_row_selections()
+                .with_row_filter(),
+            // Test with nulls and row filter
+            TestOptions::new(2, 256, 93)
+                .with_null_percent(25)
+                .with_row_filter(),
+            // Test with nulls and row filter
+            TestOptions::new(2, 256, 93)
+                .with_null_percent(25)
+                .with_row_selections()
+                .with_row_filter(),
         ];
 
-        all_options.extend(skip_options);
-
         all_options.into_iter().for_each(|opts| {
             for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0]
             {
@@ -1365,6 +1446,36 @@ mod tests {
             }
         };
 
+        let expected_data = match opts.row_filter {
+            Some(filter) => {
+                let expected_data = expected_data
+                    .into_iter()
+                    .zip(filter.iter())
+                    .filter_map(|(d, f)| f.then(|| d))
+                    .collect();
+
+                let mut filter_offset = 0;
+                let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
+                    ProjectionMask::all(),
+                    move |b| {
+                        let array = BooleanArray::from_iter(
+                            filter
+                                .iter()
+                                .skip(filter_offset)
+                                .take(b.num_rows())
+                                .map(|x| Some(*x)),
+                        );
+                        filter_offset += b.num_rows();
+                        Ok(array)
+                    },
+                ))]);
+
+                builder = builder.with_row_filter(filter);
+                expected_data
+            }
+            None => expected_data,
+        };
+
         let mut record_reader = builder
             .with_batch_size(opts.record_batch_size)
             .build()
diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs
index abe34cf1e..6c449bef4 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -96,8 +96,8 @@ use arrow::record_batch::RecordBatch;
 
 use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
 use crate::arrow::arrow_reader::{
-    evaluate_predicate, ArrowReaderBuilder, ParquetRecordBatchReader, RowFilter,
-    RowSelection,
+    evaluate_predicate, selects_any, ArrowReaderBuilder, ParquetRecordBatchReader,
+    RowFilter, RowSelection,
 };
 use crate::arrow::ProjectionMask;
 use crate::basic::Compression;
@@ -283,9 +283,6 @@ where
         batch_size: usize,
     ) -> ReadResult<T> {
         // TODO: calling build_array multiple times is wasteful
-        let selects_any = |selection: Option<&RowSelection>| {
-            selection.map(|x| x.selects_any()).unwrap_or(true)
-        };
 
         let meta = self.metadata.row_group(row_group_idx);
         let mut row_group = InMemoryRowGroup {