You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/02 17:21:56 UTC
[arrow-datafusion] branch master updated: Support pushdown multi-columns in PageIndex pruning. (#3967)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 1a5f6ab39 Support pushdown multi-columns in PageIndex pruning. (#3967)
1a5f6ab39 is described below
commit 1a5f6ab399259513109a0a6d74c412f912c5b20c
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Thu Nov 3 01:21:50 2022 +0800
Support pushdown multi-columns in PageIndex pruning. (#3967)
* Support pushdown multi-columns in PageIndex pruning.
Signed-off-by: yangjiang <ya...@ebay.com>
* fix test
Signed-off-by: yangjiang <ya...@ebay.com>
* fix comments
Signed-off-by: yangjiang <ya...@ebay.com>
* avoid extract predicates when enable is false
Signed-off-by: yangjiang <ya...@ebay.com>
Signed-off-by: yangjiang <ya...@ebay.com>
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
.../core/src/physical_plan/file_format/parquet.rs | 621 +++++++++++++--------
.../core/tests/parquet_page_index_pruning.rs | 205 +++++++
2 files changed, 582 insertions(+), 244 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 7573a263b..aa74437da 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -18,6 +18,7 @@
//! Execution plan for reading Parquet files
use fmt::Debug;
+use std::collections::{HashSet, VecDeque};
use std::fmt;
use std::fs;
use std::ops::Range;
@@ -56,12 +57,14 @@ use arrow::{
};
use bytes::Bytes;
use datafusion_common::Column;
+use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::Expr;
+use datafusion_optimizer::utils::split_conjunction;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
-use log::debug;
+use log::{debug, error};
use object_store::{ObjectMeta, ObjectStore};
-use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector};
+use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelector};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::basic::{ConvertedType, LogicalType};
@@ -497,37 +500,67 @@ impl FileOpener for ParquetOpener {
&file_metrics,
);
- if enable_page_index && check_page_index_push_down_valid(&pruning_predicate) {
- let file_offset_indexes = file_metadata.offset_indexes();
- let file_page_indexes = file_metadata.page_indexes();
- if let (Some(file_offset_indexes), Some(file_page_indexes)) =
- (file_offset_indexes, file_page_indexes)
- {
- let mut selectors = Vec::with_capacity(row_groups.len());
- for r in &row_groups {
- selectors.extend(
- prune_pages_in_one_row_group(
- &groups[*r],
- pruning_predicate.clone(),
- file_offset_indexes.get(*r),
- file_page_indexes.get(*r),
- &file_metrics,
- )
- .map_err(|e| {
- ArrowError::ParquetError(format!(
- "Fail in prune_pages_in_one_row_group: {}",
- e
- ))
- }),
+ if enable_page_index {
+ let page_index_predicates = extract_page_index_push_down_predicates(
+ &pruning_predicate,
+ builder.schema().clone(),
+ )?;
+ if !page_index_predicates.is_empty() {
+ let file_offset_indexes = file_metadata.offset_indexes();
+ let file_page_indexes = file_metadata.page_indexes();
+ if let (Some(file_offset_indexes), Some(file_page_indexes)) =
+ (file_offset_indexes, file_page_indexes)
+ {
+ let mut row_selections =
+ VecDeque::with_capacity(page_index_predicates.len());
+ for predicate in page_index_predicates {
+ // `extract_page_index_push_down_predicates` only return predicate with one col.
+ let col_id = *predicate
+ .need_input_columns_ids()
+ .iter()
+ .next()
+ .unwrap();
+ let mut selectors = Vec::with_capacity(row_groups.len());
+ for r in &row_groups {
+ let rg_offset_indexes = file_offset_indexes.get(*r);
+ let rg_page_indexes = file_page_indexes.get(*r);
+ if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
+ (rg_page_indexes, rg_offset_indexes)
+ {
+ selectors.extend(
+ prune_pages_in_one_row_group(
+ &groups[*r],
+ &predicate,
+ rg_offset_indexes.get(col_id),
+ rg_page_indexes.get(col_id),
+ &file_metrics,
+ )
+ .map_err(|e| {
+ ArrowError::ParquetError(format!(
+ "Fail in prune_pages_in_one_row_group: {}",
+ e
+ ))
+ }),
+ );
+ } else {
+ // fallback select all rows
+ let all_selected = vec![RowSelector::select(
+ groups[*r].num_rows() as usize,
+ )];
+ selectors.push(all_selected);
+ }
+ }
+ debug!(
+ "Use filter and page index create RowSelection {:?} from predicate:{:?}",
+ &selectors, predicate
);
+ row_selections.push_back(
+ selectors.into_iter().flatten().collect::<Vec<_>>(),
+ );
+ }
+ let final_selection = combine_multi_col_selection(row_selections);
+ builder = builder.with_row_selection(final_selection.into());
}
- debug!(
- "Use filter and page index create RowSelection {:?} ",
- &selectors
- );
- builder = builder.with_row_selection(RowSelection::from(
- selectors.into_iter().flatten().collect::<Vec<_>>(),
- ));
}
}
@@ -552,18 +585,164 @@ impl FileOpener for ParquetOpener {
}
}
-// Check PruningPredicates just work on one column.
-fn check_page_index_push_down_valid(predicate: &Option<PruningPredicate>) -> bool {
- if let Some(predicate) = predicate {
- // for now we only support pushDown on one col, because each col may have different page numbers, its hard to get
- // `num_containers` from `PruningStatistics`.
- let cols = predicate.need_input_columns_ids();
- //Todo more specific rules
- if cols.len() == 1 {
- return true;
+// For example:
+// > ┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━
+// > ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃
+// > ┃ ┌──────────────┐ │ ┌──────────────┐ │ ┃
+// > ┃ │ │ │ │ │ │ ┃
+// > ┃ │ │ │ │ Page │ │
+// > │ │ │ │ │ 3 │ ┃
+// > ┃ │ │ │ │ min: "A" │ │ ┃
+// > ┃ │ │ │ │ │ max: "C" │ ┃
+// > ┃ │ Page │ │ │ first_row: 0 │ │
+// > │ │ 1 │ │ │ │ ┃
+// > ┃ │ min: 10 │ │ └──────────────┘ │ ┃
+// > ┃ │ │ max: 20 │ │ ┌──────────────┐ ┃
+// > ┃ │ first_row: 0 │ │ │ │ │
+// > │ │ │ │ │ Page │ ┃
+// > ┃ │ │ │ │ 4 │ │ ┃
+// > ┃ │ │ │ │ │ min: "D" │ ┃
+// > ┃ │ │ │ │ max: "G" │ │
+// > │ │ │ │ │first_row: 100│ ┃
+// > ┃ └──────────────┘ │ │ │ │ ┃
+// > ┃ │ ┌──────────────┐ │ │ │ ┃
+// > ┃ │ │ │ └──────────────┘ │
+// > │ │ Page │ │ ┌──────────────┐ ┃
+// > ┃ │ 2 │ │ │ │ │ ┃
+// > ┃ │ │ min: 30 │ │ │ Page │ ┃
+// > ┃ │ max: 40 │ │ │ 5 │ │
+// > │ │first_row: 200│ │ │ min: "H" │ ┃
+// > ┃ │ │ │ │ max: "Z" │ │ ┃
+// > ┃ │ │ │ │ │first_row: 250│ ┃
+// > ┃ └──────────────┘ │ │ │ │
+// > │ │ └──────────────┘ ┃
+// > ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃
+// > ┃ ColumnChunk ColumnChunk ┃
+// > ┃ A B
+// > ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛
+// >
+// > Total rows: 300
+//
+// Given the predicate 'A > 35 AND B = "F"':
+// using `extract_page_index_push_down_predicates` get two single column predicate:
+// Using 'A > 35': could get RowSelector1: [ Skip(0~199), Read(200~299)]
+// Using B = "F": could get RowSelector2: [ Skip(0~99), Read(100~249), Skip(250~299)]
+//
+// As the Final selection is the intersection of each columns RowSelectors:
+// final_selection:[ Skip(0~199), Read(200~249), Skip(250~299)]
+fn combine_multi_col_selection(
+ row_selections: VecDeque<Vec<RowSelector>>,
+) -> Vec<RowSelector> {
+ row_selections
+ .into_iter()
+ .reduce(intersect_row_selection)
+ .unwrap()
+}
+
+// combine two `RowSelection` return the intersection
+// For example:
+// self: NNYYYYNNY
+// other: NYNNNNNNY
+//
+// returned: NNNNNNNNY
+// set `need_combine` true will combine result: Select(2) + Select(1) + Skip(2) -> Select(3) + Skip(2)
+pub(crate) fn intersect_row_selection(
+ left: Vec<RowSelector>,
+ right: Vec<RowSelector>,
+) -> Vec<RowSelector> {
+ let mut res = vec![];
+ let mut l_iter = left.into_iter().peekable();
+ let mut r_iter = right.into_iter().peekable();
+
+ while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
+ if a.row_count == 0 {
+ l_iter.next().unwrap();
+ continue;
+ }
+ if b.row_count == 0 {
+ r_iter.next().unwrap();
+ continue;
+ }
+ match (a.skip, b.skip) {
+ // Keep both ranges
+ (false, false) => {
+ if a.row_count < b.row_count {
+ res.push(RowSelector::select(a.row_count));
+ b.row_count -= a.row_count;
+ l_iter.next().unwrap();
+ } else {
+ res.push(RowSelector::select(b.row_count));
+ a.row_count -= b.row_count;
+ r_iter.next().unwrap();
+ }
+ }
+ // skip at least one
+ _ => {
+ if a.row_count < b.row_count {
+ res.push(RowSelector::skip(a.row_count));
+ b.row_count -= a.row_count;
+ l_iter.next().unwrap();
+ } else {
+ res.push(RowSelector::skip(b.row_count));
+ a.row_count -= b.row_count;
+ r_iter.next().unwrap();
+ }
+ }
+ }
+ }
+ if l_iter.peek().is_some() {
+ res.extend(l_iter);
+ }
+ if r_iter.peek().is_some() {
+ res.extend(r_iter);
+ }
+ // combine the adjacent same operators and last zero row count
+ // TODO: remove when https://github.com/apache/arrow-rs/pull/2994 is released~
+
+ let mut pre = res[0];
+ let mut after_combine = vec![];
+ for selector in res.iter_mut().skip(1) {
+ if selector.skip == pre.skip {
+ pre.row_count += selector.row_count;
+ } else {
+ after_combine.push(pre);
+ pre = *selector;
}
}
- false
+ if pre.row_count != 0 {
+ after_combine.push(pre);
+ }
+ after_combine
+}
+
+// Extract single col pruningPredicate from input predicate for evaluating page Index.
+fn extract_page_index_push_down_predicates(
+ predicate: &Option<PruningPredicate>,
+ schema: SchemaRef,
+) -> Result<Vec<PruningPredicate>> {
+ let mut one_col_predicates = vec![];
+ if let Some(predicate) = predicate {
+ let expr = predicate.logical_expr();
+ // todo try use CNF rewrite when ready
+ let predicates = split_conjunction(expr);
+ let mut one_col_expr = vec![];
+ predicates
+ .into_iter()
+ .try_for_each::<_, Result<()>>(|predicate| {
+ let mut columns: HashSet<Column> = HashSet::new();
+ expr_to_columns(predicate, &mut columns)?;
+ if columns.len() == 1 {
+ one_col_expr.push(predicate);
+ }
+ Ok(())
+ })?;
+ one_col_predicates = one_col_expr
+ .into_iter()
+ .map(|e| PruningPredicate::try_new(e.clone(), schema.clone()))
+ .collect::<Result<Vec<_>>>()
+ .unwrap_or_default();
+ }
+ Ok(one_col_predicates)
}
/// Factory of parquet file readers.
@@ -723,14 +902,11 @@ struct RowGroupPruningStatistics<'a> {
parquet_schema: &'a Schema,
}
-/// Wraps page_index statistics in a way
+/// Wraps one col page_index in one rowGroup statistics in a way
/// that implements [`PruningStatistics`]
struct PagesPruningStatistics<'a> {
- //row_group_metadata: &'a RowGroupMetaData,
- page_indexes: &'a Vec<Index>,
- offset_indexes: &'a Vec<Vec<PageLocation>>,
- parquet_schema: &'a Schema,
- col_id: usize,
+ col_page_indexes: &'a Index,
+ col_offset_indexes: &'a Vec<PageLocation>,
}
// TODO: consolidate code with arrow-rs
@@ -867,55 +1043,43 @@ macro_rules! get_null_count_values {
// Extract the min or max value calling `func` from page idex
macro_rules! get_min_max_values_for_page_index {
- ($self:expr, $column:expr, $func:ident) => {{
- if let Some((col_id_index, _field)) =
- $self.parquet_schema.column_with_name(&$column.name)
- {
- if let Some(page_index) = $self.page_indexes.get(col_id_index) {
- match page_index {
- Index::NONE => None,
- Index::INT32(index) => {
- let vec = &index.indexes;
- Some(Arc::new(Int32Array::from_iter(
- vec.iter().map(|x| x.$func().cloned()),
- )))
- }
- Index::INT64(index) => {
- let vec = &index.indexes;
- Some(Arc::new(Int64Array::from_iter(
- vec.iter().map(|x| x.$func().cloned()),
- )))
- }
- Index::FLOAT(index) => {
- let vec = &index.indexes;
- Some(Arc::new(Float32Array::from_iter(
- vec.iter().map(|x| x.$func().cloned()),
- )))
- }
- Index::DOUBLE(index) => {
- let vec = &index.indexes;
- Some(Arc::new(Float64Array::from_iter(
- vec.iter().map(|x| x.$func().cloned()),
- )))
- }
- Index::BOOLEAN(index) => {
- let vec = &index.indexes;
- Some(Arc::new(BooleanArray::from_iter(
- vec.iter().map(|x| x.$func().cloned()),
- )))
- }
- Index::INT96(_)
- | Index::BYTE_ARRAY(_)
- | Index::FIXED_LEN_BYTE_ARRAY(_) => {
- //Todo support these type
- None
- }
- }
- } else {
+ ($self:expr, $func:ident) => {{
+ match $self.col_page_indexes {
+ Index::NONE => None,
+ Index::INT32(index) => {
+ let vec = &index.indexes;
+ Some(Arc::new(Int32Array::from_iter(
+ vec.iter().map(|x| x.$func().cloned()),
+ )))
+ }
+ Index::INT64(index) => {
+ let vec = &index.indexes;
+ Some(Arc::new(Int64Array::from_iter(
+ vec.iter().map(|x| x.$func().cloned()),
+ )))
+ }
+ Index::FLOAT(index) => {
+ let vec = &index.indexes;
+ Some(Arc::new(Float32Array::from_iter(
+ vec.iter().map(|x| x.$func().cloned()),
+ )))
+ }
+ Index::DOUBLE(index) => {
+ let vec = &index.indexes;
+ Some(Arc::new(Float64Array::from_iter(
+ vec.iter().map(|x| x.$func().cloned()),
+ )))
+ }
+ Index::BOOLEAN(index) => {
+ let vec = &index.indexes;
+ Some(Arc::new(BooleanArray::from_iter(
+ vec.iter().map(|x| x.$func().cloned()),
+ )))
+ }
+ Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => {
+ //Todo support these type
None
}
- } else {
- None
}
}};
}
@@ -957,52 +1121,40 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
}
impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
- fn min_values(&self, column: &Column) -> Option<ArrayRef> {
- get_min_max_values_for_page_index!(self, column, min)
+ fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
+ get_min_max_values_for_page_index!(self, min)
}
- fn max_values(&self, column: &Column) -> Option<ArrayRef> {
- get_min_max_values_for_page_index!(self, column, max)
+ fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
+ get_min_max_values_for_page_index!(self, max)
}
fn num_containers(&self) -> usize {
- self.offset_indexes.get(self.col_id).unwrap().len()
- }
-
- fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
- if let Some((col_id_index, _field)) =
- self.parquet_schema.column_with_name(&column.name)
- {
- if let Some(page_index) = self.page_indexes.get(col_id_index) {
- match page_index {
- Index::NONE => None,
- Index::BOOLEAN(index) => Some(Arc::new(Int64Array::from_iter(
- index.indexes.iter().map(|x| x.null_count),
- ))),
- Index::INT32(index) => Some(Arc::new(Int64Array::from_iter(
- index.indexes.iter().map(|x| x.null_count),
- ))),
- Index::INT64(index) => Some(Arc::new(Int64Array::from_iter(
- index.indexes.iter().map(|x| x.null_count),
- ))),
- Index::FLOAT(index) => Some(Arc::new(Int64Array::from_iter(
- index.indexes.iter().map(|x| x.null_count),
- ))),
- Index::DOUBLE(index) => Some(Arc::new(Int64Array::from_iter(
- index.indexes.iter().map(|x| x.null_count),
- ))),
- Index::INT96(_)
- | Index::BYTE_ARRAY(_)
- | Index::FIXED_LEN_BYTE_ARRAY(_) => {
- // Todo support these types
- None
- }
- }
- } else {
+ self.col_offset_indexes.len()
+ }
+
+ fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
+ match self.col_page_indexes {
+ Index::NONE => None,
+ Index::BOOLEAN(index) => Some(Arc::new(Int64Array::from_iter(
+ index.indexes.iter().map(|x| x.null_count),
+ ))),
+ Index::INT32(index) => Some(Arc::new(Int64Array::from_iter(
+ index.indexes.iter().map(|x| x.null_count),
+ ))),
+ Index::INT64(index) => Some(Arc::new(Int64Array::from_iter(
+ index.indexes.iter().map(|x| x.null_count),
+ ))),
+ Index::FLOAT(index) => Some(Arc::new(Int64Array::from_iter(
+ index.indexes.iter().map(|x| x.null_count),
+ ))),
+ Index::DOUBLE(index) => Some(Arc::new(Int64Array::from_iter(
+ index.indexes.iter().map(|x| x.null_count),
+ ))),
+ Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => {
+ // Todo support these types
None
}
- } else {
- None
}
}
}
@@ -1052,72 +1204,57 @@ fn prune_row_groups(
fn prune_pages_in_one_row_group(
group: &RowGroupMetaData,
- predicate: Option<PruningPredicate>,
- offset_indexes: Option<&Vec<Vec<PageLocation>>>,
- page_indexes: Option<&Vec<Index>>,
+ predicate: &PruningPredicate,
+ col_offset_indexes: Option<&Vec<PageLocation>>,
+ col_page_indexes: Option<&Index>,
metrics: &ParquetFileMetrics,
) -> Result<Vec<RowSelector>> {
let num_rows = group.num_rows() as usize;
- if let (Some(predicate), Some(offset_indexes), Some(page_indexes)) =
- (&predicate, offset_indexes, page_indexes)
+ if let (Some(col_offset_indexes), Some(col_page_indexes)) =
+ (col_offset_indexes, col_page_indexes)
{
let pruning_stats = PagesPruningStatistics {
- page_indexes,
- offset_indexes,
- parquet_schema: predicate.schema().as_ref(),
- // now we assume only support one col.
- col_id: *predicate
- .need_input_columns_ids()
- .iter()
- .take(1)
- .next()
- .unwrap(),
+ col_page_indexes,
+ col_offset_indexes,
};
match predicate.prune(&pruning_stats) {
Ok(values) => {
let mut vec = Vec::with_capacity(values.len());
- if let Some(cols_offset_indexes) =
- offset_indexes.get(pruning_stats.col_id)
- {
- let row_vec =
- create_row_count_in_each_page(cols_offset_indexes, num_rows);
- assert_eq!(row_vec.len(), values.len());
- let mut sum_row = *row_vec.first().unwrap();
- let mut selected = *values.first().unwrap();
-
- for (i, &f) in values.iter().skip(1).enumerate() {
- if f == selected {
- sum_row += *row_vec.get(i).unwrap();
+ let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows);
+ assert_eq!(row_vec.len(), values.len());
+ let mut sum_row = *row_vec.first().unwrap();
+ let mut selected = *values.first().unwrap();
+
+ for (i, &f) in values.iter().skip(1).enumerate() {
+ if f == selected {
+ sum_row += *row_vec.get(i).unwrap();
+ } else {
+ let selector = if selected {
+ RowSelector::select(sum_row)
} else {
- let selector = if selected {
- RowSelector::select(sum_row)
- } else {
- RowSelector::skip(sum_row)
- };
- vec.push(selector);
- sum_row = *row_vec.get(i).unwrap();
- selected = f;
- }
+ RowSelector::skip(sum_row)
+ };
+ vec.push(selector);
+ sum_row = *row_vec.get(i).unwrap();
+ selected = f;
}
+ }
- let selector = if selected {
- RowSelector::select(sum_row)
- } else {
- RowSelector::skip(sum_row)
- };
- vec.push(selector);
- return Ok(vec);
+ let selector = if selected {
+ RowSelector::select(sum_row)
} else {
- debug!("Error evaluating page index predicate values missing page index col_id is{}", pruning_stats.col_id);
- metrics.predicate_evaluation_errors.add(1);
- }
+ RowSelector::skip(sum_row)
+ };
+ vec.push(selector);
+ return Ok(vec);
}
// stats filter array could not be built
- // return a closure which will not filter out any row groups
+ // return a result which will not filter out any pages
Err(e) => {
- debug!("Error evaluating page index predicate values {}", e);
+ error!("Error evaluating page index predicate values {}", e);
metrics.predicate_evaluation_errors.add(1);
+ return Ok(vec![RowSelector::select(group.num_rows() as usize)]);
}
}
}
@@ -2052,6 +2189,65 @@ mod tests {
ParquetFileMetrics::new(0, "file.parquet", &metrics)
}
+ #[test]
+ fn test_combine_row_selection() {
+ // a size equal b size
+ let a = vec![
+ RowSelector::select(5),
+ RowSelector::skip(4),
+ RowSelector::select(1),
+ ];
+ let b = vec![
+ RowSelector::select(8),
+ RowSelector::skip(1),
+ RowSelector::select(1),
+ ];
+
+ let res = intersect_row_selection(a, b);
+ assert_eq!(
+ res,
+ vec![
+ RowSelector::select(5),
+ RowSelector::skip(4),
+ RowSelector::select(1)
+ ],
+ );
+
+ // a size larger than b size
+ let a = vec![
+ RowSelector::select(3),
+ RowSelector::skip(33),
+ RowSelector::select(3),
+ RowSelector::skip(33),
+ ];
+ let b = vec![RowSelector::select(36), RowSelector::skip(36)];
+ let res = intersect_row_selection(a, b);
+ assert_eq!(res, vec![RowSelector::select(3), RowSelector::skip(69)]);
+
+ // a size less than b size
+ let a = vec![RowSelector::select(3), RowSelector::skip(7)];
+ let b = vec![
+ RowSelector::select(2),
+ RowSelector::skip(2),
+ RowSelector::select(2),
+ RowSelector::skip(2),
+ RowSelector::select(2),
+ ];
+ let res = intersect_row_selection(a, b);
+ assert_eq!(res, vec![RowSelector::select(2), RowSelector::skip(8)]);
+
+ let a = vec![RowSelector::select(3), RowSelector::skip(7)];
+ let b = vec![
+ RowSelector::select(2),
+ RowSelector::skip(2),
+ RowSelector::select(2),
+ RowSelector::skip(2),
+ RowSelector::select(2),
+ ];
+ let res = intersect_row_selection(a, b);
+ assert_eq!(res, vec![RowSelector::select(2), RowSelector::skip(8),]);
+ }
+
#[test]
fn row_group_pruning_predicate_simple_expr() {
use datafusion_expr::{col, lit};
@@ -2614,67 +2810,4 @@ mod tests {
Ok(())
}
-
- #[tokio::test]
- async fn parquet_exec_with_page_index_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
-
- let object_store_url = ObjectStoreUrl::local_filesystem();
- let store = session_ctx
- .runtime_env()
- .object_store(&object_store_url)
- .unwrap();
-
- let testdata = crate::test_util::parquet_test_data();
- let filename = format!("{}/alltypes_tiny_pages.parquet", testdata);
-
- let meta = local_unpartitioned_file(filename);
-
- let schema = ParquetFormat::default()
- .infer_schema(&store, &[meta.clone()])
- .await
- .unwrap();
-
- let partitioned_file = PartitionedFile {
- object_meta: meta,
- partition_values: vec![],
- range: None,
- extensions: None,
- };
-
- // create filter month == 1;
- let filter = col("month").eq(lit(1_i32));
- let parquet_exec = ParquetExec::new(
- FileScanConfig {
- object_store_url,
- file_groups: vec![vec![partitioned_file]],
- file_schema: schema,
- statistics: Statistics::default(),
- // file has 10 cols so index 12 should be month
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- config_options: ConfigOptions::new().into_shareable(),
- },
- Some(filter),
- None,
- );
-
- let parquet_exec_page_index = parquet_exec.clone().with_enable_page_index(true);
-
- let mut results = parquet_exec_page_index.execute(0, task_ctx)?;
-
- let batch = results.next().await.unwrap()?;
-
- // from the page index should create below RowSelection
- // vec.push(RowSelector::select(312));
- // vec.push(RowSelector::skip(3330));
- // vec.push(RowSelector::select(333));
- // vec.push(RowSelector::skip(3330));
- // total 645 row
-
- assert_eq!(batch.num_rows(), 645);
- Ok(())
- }
}
diff --git a/datafusion/core/tests/parquet_page_index_pruning.rs b/datafusion/core/tests/parquet_page_index_pruning.rs
new file mode 100644
index 000000000..2a8791b69
--- /dev/null
+++ b/datafusion/core/tests/parquet_page_index_pruning.rs
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::config::ConfigOptions;
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::file_format::FileFormat;
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+use datafusion_common::Statistics;
+use datafusion_expr::{col, lit, Expr};
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use tokio_stream::StreamExt;
+
+async fn get_parquet_exec(filter: Expr, session_ctx: SessionContext) -> ParquetExec {
+ let object_store_url = ObjectStoreUrl::local_filesystem();
+ let store = session_ctx
+ .runtime_env()
+ .object_store(&object_store_url)
+ .unwrap();
+
+ let testdata = datafusion::test_util::parquet_test_data();
+ let filename = format!("{}/alltypes_tiny_pages.parquet", testdata);
+
+ let location = Path::from_filesystem_path(filename.as_str()).unwrap();
+ let metadata = std::fs::metadata(filename).expect("Local file metadata");
+ let meta = ObjectMeta {
+ location,
+ last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
+ size: metadata.len() as usize,
+ };
+
+ let schema = ParquetFormat::default()
+ .infer_schema(&store, &[meta.clone()])
+ .await
+ .unwrap();
+
+ let partitioned_file = PartitionedFile {
+ object_meta: meta,
+ partition_values: vec![],
+ range: None,
+ extensions: None,
+ };
+
+ let parquet_exec = ParquetExec::new(
+ FileScanConfig {
+ object_store_url,
+ file_groups: vec![vec![partitioned_file]],
+ file_schema: schema,
+ statistics: Statistics::default(),
+ // file has 10 cols so index 12 should be month
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
+ },
+ Some(filter),
+ None,
+ );
+ parquet_exec.with_enable_page_index(true)
+}
+
+#[tokio::test]
+async fn page_index_filter_one_col() {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+
+ // 1.create filter month == 1;
+ let filter = col("month").eq(lit(1_i32));
+
+ let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+ let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+ let batch = results.next().await.unwrap().unwrap();
+
+ // `month = 1` from the page index should create below RowSelection
+ // vec.push(RowSelector::select(312));
+ // vec.push(RowSelector::skip(3330));
+ // vec.push(RowSelector::select(333));
+ // vec.push(RowSelector::skip(3330));
+ // total 645 row
+ assert_eq!(batch.num_rows(), 645);
+
+ // 2. create filter month == 1 or month == 2;
+ let filter = col("month").eq(lit(1_i32)).or(col("month").eq(lit(2_i32)));
+
+ let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+ let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+ let batch = results.next().await.unwrap().unwrap();
+
+ // `month = 12` from the page index should create below RowSelection
+ // vec.push(RowSelector::skip(894));
+ // vec.push(RowSelector::select(339));
+ // vec.push(RowSelector::skip(3330));
+ // vec.push(RowSelector::select(312));
+ // vec.push(RowSelector::skip(2430));
+ // combine with before filter total 1275 row
+ assert_eq!(batch.num_rows(), 1275);
+
+ // 3. create filter month == 1 and month == 12;
+ let filter = col("month")
+ .eq(lit(1_i32))
+ .and(col("month").eq(lit(12_i32)));
+
+ let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+ let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+ let batch = results.next().await;
+
+ assert!(batch.is_none());
+
+ // 4.create filter 0 < month < 2 ;
+ let filter = col("month").gt(lit(0_i32)).and(col("month").lt(lit(2_i32)));
+
+ let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+ let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+ let batch = results.next().await.unwrap().unwrap();
+
+ // should same with `month = 1`
+ assert_eq!(batch.num_rows(), 645);
+}
+
+#[tokio::test]
+async fn page_index_filter_multi_col() {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+
+ // create filter month == 1 and year = 2009;
+ let filter = col("month").eq(lit(1_i32)).and(col("year").eq(lit(2009)));
+
+ let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+ let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+ let batch = results.next().await.unwrap().unwrap();
+
+ // `year = 2009` from the page index should create below RowSelection
+ // vec.push(RowSelector::select(3663));
+ // vec.push(RowSelector::skip(3642));
+ // combine with `month = 1` total 333 row
+ assert_eq!(batch.num_rows(), 333);
+
+ // create filter (year = 2009 or id = 1) and month = 1;
+ // this should only use `month = 1` to evaluate the page index.
+ let filter = col("month")
+ .eq(lit(1_i32))
+ .and(col("year").eq(lit(2009)).or(col("id").eq(lit(1))));
+
+ let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+ let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+ let batch = results.next().await.unwrap().unwrap();
+ assert_eq!(batch.num_rows(), 645);
+
+ // create filter (year = 2009 or id = 1)
+ // this filter use two columns will not push down
+ let filter = col("year").eq(lit(2009)).or(col("id").eq(lit(1)));
+
+ let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+ let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+ let batch = results.next().await.unwrap().unwrap();
+ assert_eq!(batch.num_rows(), 7300);
+
+ // create filter (year = 2009 and id = 1) or (year = 2010)
+ // this filter use two columns will not push down
+ // todo but after use CNF rewrite it could rewrite to (year = 2009 or year = 2010) and (id = 1 or year = 2010)
+ // which could push (year = 2009 or year = 2010) down.
+ let filter = col("year")
+ .eq(lit(2009))
+ .and(col("id").eq(lit(1)))
+ .or(col("year").eq(lit(2010)));
+
+ let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+ let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+ let batch = results.next().await.unwrap().unwrap();
+ assert_eq!(batch.num_rows(), 7300);
+}