You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/15 16:44:37 UTC
[arrow-rs] branch master updated: Support RowFilter within ParquetRecordBatchReader (#2431) (#2452)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 569c78129 Support RowFilter within ParquetRecordBatchReader (#2431) (#2452)
569c78129 is described below
commit 569c781290dc12a8e2d86069df3827225a53d563
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Aug 15 17:44:33 2022 +0100
Support RowFilter within ParquetRecordBatchReader (#2431) (#2452)
* Support RowFilter within ParquetRecordBatchReader (#2431)
* Fix doc
* Review feedback
---
parquet/src/arrow/arrow_reader/mod.rs | 143 ++++++++++++++++++++++++++++++----
parquet/src/arrow/async_reader.rs | 7 +-
2 files changed, 129 insertions(+), 21 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index e96b5d8fa..052ef40ee 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -400,22 +400,46 @@ impl<T: ChunkReader + 'static> ArrowReaderBuilder<SyncReader<T>> {
Self::new_builder(SyncReader(reader), metadata, options)
}
+ /// Build a [`ParquetRecordBatchReader`]
+ ///
+ /// Note: this will eagerly evaluate any `RowFilter` before returning
pub fn build(self) -> Result<ParquetRecordBatchReader> {
let reader =
FileReaderRowGroupCollection::new(Arc::new(self.input.0), self.row_groups);
+
+ let mut filter = self.filter;
+ let mut selection = self.selection;
+
+ if let Some(filter) = filter.as_mut() {
+ for predicate in filter.predicates.iter_mut() {
+ if !selects_any(selection.as_ref()) {
+ break;
+ }
+
+ let projection = predicate.projection().clone();
+ let array_reader =
+ build_array_reader(Arc::clone(&self.schema), projection, &reader)?;
+
+ selection = Some(evaluate_predicate(
+ self.batch_size,
+ array_reader,
+ selection,
+ predicate.as_mut(),
+ )?);
+ }
+ }
+
let array_reader = build_array_reader(self.schema, self.projection, &reader)?;
- if self.filter.is_some() {
- // TODO: Support RowFilter within sync interface (#2431)
- return Err(nyi_err!(
- "RowFilter is currently not supported within the sync interface"
- ));
+ // If selection is empty, truncate
+ if !selects_any(selection.as_ref()) {
+ selection = Some(RowSelection::from(vec![]));
}
Ok(ParquetRecordBatchReader::new(
self.batch_size,
array_reader,
- self.selection,
+ selection,
))
}
}
@@ -541,12 +565,16 @@ impl ParquetRecordBatchReader {
}
}
+/// Returns `true` if `selection` is `None` or selects some rows
+pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
+ selection.map(|x| x.selects_any()).unwrap_or(true)
+}
+
/// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`]
///
/// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the
/// returned [`RowSelection`] will be the conjunction of this and
/// the rows selected by `predicate`
-#[allow(unused)]
pub(crate) fn evaluate_predicate(
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
@@ -576,6 +604,7 @@ mod tests {
use bytes::Bytes;
use std::cmp::min;
use std::collections::VecDeque;
+ use std::fmt::Formatter;
use std::fs::File;
use std::io::Seek;
use std::path::PathBuf;
@@ -591,8 +620,8 @@ mod tests {
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use crate::arrow::arrow_reader::{
- ArrowReaderOptions, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder,
- RowSelection, RowSelector,
+ ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReader,
+ ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
};
use crate::arrow::buffer::converter::{
Converter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter,
@@ -1021,7 +1050,7 @@ mod tests {
}
/// Parameters for single_column_reader_test
- #[derive(Debug, Clone)]
+ #[derive(Clone)]
struct TestOptions {
/// Number of row group to write to parquet (row group size =
/// num_row_groups / num_rows)
@@ -1047,8 +1076,30 @@ mod tests {
enabled_statistics: EnabledStatistics,
/// Encoding
encoding: Encoding,
- //row selections and total selected row count
+ /// row selections and total selected row count
row_selections: Option<(RowSelection, usize)>,
+ /// row filter
+ row_filter: Option<Vec<bool>>,
+ }
+
+ /// Manually implement this to avoid printing entire contents of row_selections and row_filter
+ impl std::fmt::Debug for TestOptions {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("TestOptions")
+ .field("num_row_groups", &self.num_row_groups)
+ .field("num_rows", &self.num_rows)
+ .field("record_batch_size", &self.record_batch_size)
+ .field("null_percent", &self.null_percent)
+ .field("write_batch_size", &self.write_batch_size)
+ .field("max_data_page_size", &self.max_data_page_size)
+ .field("max_dict_page_size", &self.max_dict_page_size)
+ .field("writer_version", &self.writer_version)
+ .field("enabled_statistics", &self.enabled_statistics)
+ .field("encoding", &self.encoding)
+ .field("row_selections", &self.row_selections.is_some())
+ .field("row_filter", &self.row_filter.is_some())
+ .finish()
+ }
}
impl Default for TestOptions {
@@ -1065,6 +1116,7 @@ mod tests {
enabled_statistics: EnabledStatistics::Page,
encoding: Encoding::PLAIN,
row_selections: None,
+ row_filter: None,
}
}
}
@@ -1108,6 +1160,8 @@ mod tests {
}
fn with_row_selections(self) -> Self {
+ assert!(self.row_filter.is_none(), "Must set row selection first");
+
let mut rng = thread_rng();
let step = rng.gen_range(self.record_batch_size..self.num_rows);
let row_selections = create_test_selection(
@@ -1121,6 +1175,19 @@ mod tests {
}
}
+ fn with_row_filter(self) -> Self {
+ let row_count = match &self.row_selections {
+ Some((_, count)) => *count,
+ None => self.num_row_groups * self.num_rows,
+ };
+
+ let mut rng = thread_rng();
+ Self {
+ row_filter: Some((0..row_count).map(|_| rng.gen_bool(0.9)).collect()),
+ ..self
+ }
+ }
+
fn writer_props(&self) -> WriterProperties {
let builder = WriterProperties::builder()
.set_data_pagesize_limit(self.max_data_page_size)
@@ -1158,7 +1225,7 @@ mod tests {
G: RandGen<T>,
F: Fn(&[Option<T::T>]) -> ArrayRef,
{
- let mut all_options = vec![
+ let all_options = vec![
// choose record_batch_batch (15) so batches cross row
// group boundaries (50 rows in 2 row groups) cases.
TestOptions::new(2, 100, 15),
@@ -1187,9 +1254,8 @@ mod tests {
TestOptions::new(2, 256, 91)
.with_null_percent(25)
.with_enabled_statistics(EnabledStatistics::None),
- ];
+ // Test skip
- let skip_options = vec![
// choose record_batch_batch (15) so batches cross row
// group boundaries (50 rows in 2 row groups) cases.
TestOptions::new(2, 100, 15).with_row_selections(),
@@ -1218,10 +1284,25 @@ mod tests {
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_row_selections(),
+ // Test filter
+
+ // Test with row filter
+ TestOptions::new(4, 100, 25).with_row_filter(),
+ // Test with row selection and row filter
+ TestOptions::new(4, 100, 25)
+ .with_row_selections()
+ .with_row_filter(),
+ // Test with nulls and row filter
+ TestOptions::new(2, 256, 93)
+ .with_null_percent(25)
+ .with_row_filter(),
+ // Test with nulls and row filter
+ TestOptions::new(2, 256, 93)
+ .with_null_percent(25)
+ .with_row_selections()
+ .with_row_filter(),
];
- all_options.extend(skip_options);
-
all_options.into_iter().for_each(|opts| {
for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0]
{
@@ -1365,6 +1446,36 @@ mod tests {
}
};
+ let expected_data = match opts.row_filter {
+ Some(filter) => {
+ let expected_data = expected_data
+ .into_iter()
+ .zip(filter.iter())
+ .filter_map(|(d, f)| f.then(|| d))
+ .collect();
+
+ let mut filter_offset = 0;
+ let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
+ ProjectionMask::all(),
+ move |b| {
+ let array = BooleanArray::from_iter(
+ filter
+ .iter()
+ .skip(filter_offset)
+ .take(b.num_rows())
+ .map(|x| Some(*x)),
+ );
+ filter_offset += b.num_rows();
+ Ok(array)
+ },
+ ))]);
+
+ builder = builder.with_row_filter(filter);
+ expected_data
+ }
+ None => expected_data,
+ };
+
let mut record_reader = builder
.with_batch_size(opts.record_batch_size)
.build()
diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs
index abe34cf1e..6c449bef4 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -96,8 +96,8 @@ use arrow::record_batch::RecordBatch;
use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
use crate::arrow::arrow_reader::{
- evaluate_predicate, ArrowReaderBuilder, ParquetRecordBatchReader, RowFilter,
- RowSelection,
+ evaluate_predicate, selects_any, ArrowReaderBuilder, ParquetRecordBatchReader,
+ RowFilter, RowSelection,
};
use crate::arrow::ProjectionMask;
use crate::basic::Compression;
@@ -283,9 +283,6 @@ where
batch_size: usize,
) -> ReadResult<T> {
// TODO: calling build_array multiple times is wasteful
- let selects_any = |selection: Option<&RowSelection>| {
- selection.map(|x| x.selects_any()).unwrap_or(true)
- };
let meta = self.metadata.row_group(row_group_idx);
let mut row_group = InMemoryRowGroup {