You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/02 17:21:56 UTC

[arrow-datafusion] branch master updated: Support pushdown multi-columns in PageIndex pruning. (#3967)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a5f6ab39 Support pushdown multi-columns in PageIndex pruning. (#3967)
1a5f6ab39 is described below

commit 1a5f6ab399259513109a0a6d74c412f912c5b20c
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Thu Nov 3 01:21:50 2022 +0800

    Support pushdown multi-columns in PageIndex pruning. (#3967)
    
    * Support pushdown multi-columns in PageIndex pruning.
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    * fix test
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    * fix comments
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    * avoid extract predicates when enable is false
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 .../core/src/physical_plan/file_format/parquet.rs  | 621 +++++++++++++--------
 .../core/tests/parquet_page_index_pruning.rs       | 205 +++++++
 2 files changed, 582 insertions(+), 244 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 7573a263b..aa74437da 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -18,6 +18,7 @@
 //! Execution plan for reading Parquet files
 
 use fmt::Debug;
+use std::collections::{HashSet, VecDeque};
 use std::fmt;
 use std::fs;
 use std::ops::Range;
@@ -56,12 +57,14 @@ use arrow::{
 };
 use bytes::Bytes;
 use datafusion_common::Column;
+use datafusion_expr::utils::expr_to_columns;
 use datafusion_expr::Expr;
+use datafusion_optimizer::utils::split_conjunction;
 use futures::future::BoxFuture;
 use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
-use log::debug;
+use log::{debug, error};
 use object_store::{ObjectMeta, ObjectStore};
-use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector};
+use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelector};
 use parquet::arrow::async_reader::AsyncFileReader;
 use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
 use parquet::basic::{ConvertedType, LogicalType};
@@ -497,37 +500,67 @@ impl FileOpener for ParquetOpener {
                 &file_metrics,
             );
 
-            if enable_page_index && check_page_index_push_down_valid(&pruning_predicate) {
-                let file_offset_indexes = file_metadata.offset_indexes();
-                let file_page_indexes = file_metadata.page_indexes();
-                if let (Some(file_offset_indexes), Some(file_page_indexes)) =
-                    (file_offset_indexes, file_page_indexes)
-                {
-                    let mut selectors = Vec::with_capacity(row_groups.len());
-                    for r in &row_groups {
-                        selectors.extend(
-                            prune_pages_in_one_row_group(
-                                &groups[*r],
-                                pruning_predicate.clone(),
-                                file_offset_indexes.get(*r),
-                                file_page_indexes.get(*r),
-                                &file_metrics,
-                            )
-                            .map_err(|e| {
-                                ArrowError::ParquetError(format!(
-                                    "Fail in prune_pages_in_one_row_group: {}",
-                                    e
-                                ))
-                            }),
+            if enable_page_index {
+                let page_index_predicates = extract_page_index_push_down_predicates(
+                    &pruning_predicate,
+                    builder.schema().clone(),
+                )?;
+                if !page_index_predicates.is_empty() {
+                    let file_offset_indexes = file_metadata.offset_indexes();
+                    let file_page_indexes = file_metadata.page_indexes();
+                    if let (Some(file_offset_indexes), Some(file_page_indexes)) =
+                        (file_offset_indexes, file_page_indexes)
+                    {
+                        let mut row_selections =
+                            VecDeque::with_capacity(page_index_predicates.len());
+                        for predicate in page_index_predicates {
+                            // `extract_page_index_push_down_predicates` only return predicate with one col.
+                            let col_id = *predicate
+                                .need_input_columns_ids()
+                                .iter()
+                                .next()
+                                .unwrap();
+                            let mut selectors = Vec::with_capacity(row_groups.len());
+                            for r in &row_groups {
+                                let rg_offset_indexes = file_offset_indexes.get(*r);
+                                let rg_page_indexes = file_page_indexes.get(*r);
+                                if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
+                                    (rg_page_indexes, rg_offset_indexes)
+                                {
+                                    selectors.extend(
+                                        prune_pages_in_one_row_group(
+                                            &groups[*r],
+                                            &predicate,
+                                            rg_offset_indexes.get(col_id),
+                                            rg_page_indexes.get(col_id),
+                                            &file_metrics,
+                                        )
+                                            .map_err(|e| {
+                                                ArrowError::ParquetError(format!(
+                                                    "Fail in prune_pages_in_one_row_group: {}",
+                                                    e
+                                                ))
+                                            }),
+                                    );
+                                } else {
+                                    // fallback select all rows
+                                    let all_selected = vec![RowSelector::select(
+                                        groups[*r].num_rows() as usize,
+                                    )];
+                                    selectors.push(all_selected);
+                                }
+                            }
+                            debug!(
+                            "Use filter and page index create RowSelection {:?} from predicate:{:?}",
+                            &selectors, predicate
                         );
+                            row_selections.push_back(
+                                selectors.into_iter().flatten().collect::<Vec<_>>(),
+                            );
+                        }
+                        let final_selection = combine_multi_col_selection(row_selections);
+                        builder = builder.with_row_selection(final_selection.into());
                     }
-                    debug!(
-                        "Use filter and page index create RowSelection {:?} ",
-                        &selectors
-                    );
-                    builder = builder.with_row_selection(RowSelection::from(
-                        selectors.into_iter().flatten().collect::<Vec<_>>(),
-                    ));
                 }
             }
 
@@ -552,18 +585,164 @@ impl FileOpener for ParquetOpener {
     }
 }
 
-// Check PruningPredicates just work on one column.
-fn check_page_index_push_down_valid(predicate: &Option<PruningPredicate>) -> bool {
-    if let Some(predicate) = predicate {
-        // for now we only support pushDown on one col, because each col may have different page numbers, its hard to get
-        // `num_containers` from `PruningStatistics`.
-        let cols = predicate.need_input_columns_ids();
-        //Todo more specific rules
-        if cols.len() == 1 {
-            return true;
+// For example:
+// > ┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━
+// >    ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┃
+// > ┃     ┌──────────────┐  │     ┌──────────────┐  │  ┃
+// > ┃  │  │              │     │  │              │     ┃
+// > ┃     │              │  │     │     Page     │  │
+// >    │  │              │     │  │      3       │     ┃
+// > ┃     │              │  │     │   min: "A"   │  │  ┃
+// > ┃  │  │              │     │  │   max: "C"   │     ┃
+// > ┃     │     Page     │  │     │ first_row: 0 │  │
+// >    │  │      1       │     │  │              │     ┃
+// > ┃     │   min: 10    │  │     └──────────────┘  │  ┃
+// > ┃  │  │   max: 20    │     │  ┌──────────────┐     ┃
+// > ┃     │ first_row: 0 │  │     │              │  │
+// >    │  │              │     │  │     Page     │     ┃
+// > ┃     │              │  │     │      4       │  │  ┃
+// > ┃  │  │              │     │  │   min: "D"   │     ┃
+// > ┃     │              │  │     │   max: "G"   │  │
+// >    │  │              │     │  │first_row: 100│     ┃
+// > ┃     └──────────────┘  │     │              │  │  ┃
+// > ┃  │  ┌──────────────┐     │  │              │     ┃
+// > ┃     │              │  │     └──────────────┘  │
+// >    │  │     Page     │     │  ┌──────────────┐     ┃
+// > ┃     │      2       │  │     │              │  │  ┃
+// > ┃  │  │   min: 30    │     │  │     Page     │     ┃
+// > ┃     │   max: 40    │  │     │      5       │  │
+// >    │  │first_row: 200│     │  │   min: "H"   │     ┃
+// > ┃     │              │  │     │   max: "Z"   │  │  ┃
+// > ┃  │  │              │     │  │first_row: 250│     ┃
+// > ┃     └──────────────┘  │     │              │  │
+// >    │                       │  └──────────────┘     ┃
+// > ┃   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
+// > ┃       ColumnChunk            ColumnChunk         ┃
+// > ┃            A                      B
+// >  ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛
+// >
+// >   Total rows: 300
+//
+// Given the predicate 'A > 35 AND B = "F"':
+// using `extract_page_index_push_down_predicates` get two single column predicate:
+// Using 'A > 35': could get RowSelector1: [ Skip(0~199), Read(200~299)]
+// Using  B = "F": could get RowSelector2: [ Skip(0~99), Read(100~249), Skip(250~299)]
+//
+// As the Final selection is the intersection of each columns RowSelectors:
+// final_selection:[ Skip(0~199), Read(200~249), Skip(250~299)]
+fn combine_multi_col_selection(
+    row_selections: VecDeque<Vec<RowSelector>>,
+) -> Vec<RowSelector> {
+    row_selections
+        .into_iter()
+        .reduce(intersect_row_selection)
+        .unwrap()
+}
+
+// combine two `RowSelection` return the intersection
+// For example:
+// self:     NNYYYYNNY
+// other:    NYNNNNNNY
+//
+// returned: NNNNNNNNY
+// set `need_combine` true will combine result: Select(2) + Select(1) + Skip(2) -> Select(3) + Skip(2)
+pub(crate) fn intersect_row_selection(
+    left: Vec<RowSelector>,
+    right: Vec<RowSelector>,
+) -> Vec<RowSelector> {
+    let mut res = vec![];
+    let mut l_iter = left.into_iter().peekable();
+    let mut r_iter = right.into_iter().peekable();
+
+    while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
+        if a.row_count == 0 {
+            l_iter.next().unwrap();
+            continue;
+        }
+        if b.row_count == 0 {
+            r_iter.next().unwrap();
+            continue;
+        }
+        match (a.skip, b.skip) {
+            // Keep both ranges
+            (false, false) => {
+                if a.row_count < b.row_count {
+                    res.push(RowSelector::select(a.row_count));
+                    b.row_count -= a.row_count;
+                    l_iter.next().unwrap();
+                } else {
+                    res.push(RowSelector::select(b.row_count));
+                    a.row_count -= b.row_count;
+                    r_iter.next().unwrap();
+                }
+            }
+            // skip at least one
+            _ => {
+                if a.row_count < b.row_count {
+                    res.push(RowSelector::skip(a.row_count));
+                    b.row_count -= a.row_count;
+                    l_iter.next().unwrap();
+                } else {
+                    res.push(RowSelector::skip(b.row_count));
+                    a.row_count -= b.row_count;
+                    r_iter.next().unwrap();
+                }
+            }
+        }
+    }
+    if l_iter.peek().is_some() {
+        res.extend(l_iter);
+    }
+    if r_iter.peek().is_some() {
+        res.extend(r_iter);
+    }
+    // combine the adjacent same operators and last zero row count
+    // TODO: remove when https://github.com/apache/arrow-rs/pull/2994 is released~
+
+    let mut pre = res[0];
+    let mut after_combine = vec![];
+    for selector in res.iter_mut().skip(1) {
+        if selector.skip == pre.skip {
+            pre.row_count += selector.row_count;
+        } else {
+            after_combine.push(pre);
+            pre = *selector;
         }
     }
-    false
+    if pre.row_count != 0 {
+        after_combine.push(pre);
+    }
+    after_combine
+}
+
+// Extract single col pruningPredicate from input predicate for evaluating page Index.
+fn extract_page_index_push_down_predicates(
+    predicate: &Option<PruningPredicate>,
+    schema: SchemaRef,
+) -> Result<Vec<PruningPredicate>> {
+    let mut one_col_predicates = vec![];
+    if let Some(predicate) = predicate {
+        let expr = predicate.logical_expr();
+        // todo try use CNF rewrite when ready
+        let predicates = split_conjunction(expr);
+        let mut one_col_expr = vec![];
+        predicates
+            .into_iter()
+            .try_for_each::<_, Result<()>>(|predicate| {
+                let mut columns: HashSet<Column> = HashSet::new();
+                expr_to_columns(predicate, &mut columns)?;
+                if columns.len() == 1 {
+                    one_col_expr.push(predicate);
+                }
+                Ok(())
+            })?;
+        one_col_predicates = one_col_expr
+            .into_iter()
+            .map(|e| PruningPredicate::try_new(e.clone(), schema.clone()))
+            .collect::<Result<Vec<_>>>()
+            .unwrap_or_default();
+    }
+    Ok(one_col_predicates)
 }
 
 /// Factory of parquet file readers.
@@ -723,14 +902,11 @@ struct RowGroupPruningStatistics<'a> {
     parquet_schema: &'a Schema,
 }
 
-/// Wraps page_index statistics in a way
+/// Wraps one col page_index in one rowGroup statistics in a way
 /// that implements [`PruningStatistics`]
 struct PagesPruningStatistics<'a> {
-    //row_group_metadata: &'a RowGroupMetaData,
-    page_indexes: &'a Vec<Index>,
-    offset_indexes: &'a Vec<Vec<PageLocation>>,
-    parquet_schema: &'a Schema,
-    col_id: usize,
+    col_page_indexes: &'a Index,
+    col_offset_indexes: &'a Vec<PageLocation>,
 }
 
 // TODO: consolidate code with arrow-rs
@@ -867,55 +1043,43 @@ macro_rules! get_null_count_values {
 
 // Extract the min or max value calling `func` from page idex
 macro_rules! get_min_max_values_for_page_index {
-    ($self:expr, $column:expr, $func:ident) => {{
-        if let Some((col_id_index, _field)) =
-            $self.parquet_schema.column_with_name(&$column.name)
-        {
-            if let Some(page_index) = $self.page_indexes.get(col_id_index) {
-                match page_index {
-                    Index::NONE => None,
-                    Index::INT32(index) => {
-                        let vec = &index.indexes;
-                        Some(Arc::new(Int32Array::from_iter(
-                            vec.iter().map(|x| x.$func().cloned()),
-                        )))
-                    }
-                    Index::INT64(index) => {
-                        let vec = &index.indexes;
-                        Some(Arc::new(Int64Array::from_iter(
-                            vec.iter().map(|x| x.$func().cloned()),
-                        )))
-                    }
-                    Index::FLOAT(index) => {
-                        let vec = &index.indexes;
-                        Some(Arc::new(Float32Array::from_iter(
-                            vec.iter().map(|x| x.$func().cloned()),
-                        )))
-                    }
-                    Index::DOUBLE(index) => {
-                        let vec = &index.indexes;
-                        Some(Arc::new(Float64Array::from_iter(
-                            vec.iter().map(|x| x.$func().cloned()),
-                        )))
-                    }
-                    Index::BOOLEAN(index) => {
-                        let vec = &index.indexes;
-                        Some(Arc::new(BooleanArray::from_iter(
-                            vec.iter().map(|x| x.$func().cloned()),
-                        )))
-                    }
-                    Index::INT96(_)
-                    | Index::BYTE_ARRAY(_)
-                    | Index::FIXED_LEN_BYTE_ARRAY(_) => {
-                        //Todo support these type
-                        None
-                    }
-                }
-            } else {
+    ($self:expr, $func:ident) => {{
+        match $self.col_page_indexes {
+            Index::NONE => None,
+            Index::INT32(index) => {
+                let vec = &index.indexes;
+                Some(Arc::new(Int32Array::from_iter(
+                    vec.iter().map(|x| x.$func().cloned()),
+                )))
+            }
+            Index::INT64(index) => {
+                let vec = &index.indexes;
+                Some(Arc::new(Int64Array::from_iter(
+                    vec.iter().map(|x| x.$func().cloned()),
+                )))
+            }
+            Index::FLOAT(index) => {
+                let vec = &index.indexes;
+                Some(Arc::new(Float32Array::from_iter(
+                    vec.iter().map(|x| x.$func().cloned()),
+                )))
+            }
+            Index::DOUBLE(index) => {
+                let vec = &index.indexes;
+                Some(Arc::new(Float64Array::from_iter(
+                    vec.iter().map(|x| x.$func().cloned()),
+                )))
+            }
+            Index::BOOLEAN(index) => {
+                let vec = &index.indexes;
+                Some(Arc::new(BooleanArray::from_iter(
+                    vec.iter().map(|x| x.$func().cloned()),
+                )))
+            }
+            Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => {
+                //Todo support these type
                 None
             }
-        } else {
-            None
         }
     }};
 }
@@ -957,52 +1121,40 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
 }
 
 impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
-    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
-        get_min_max_values_for_page_index!(self, column, min)
+    fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
+        get_min_max_values_for_page_index!(self, min)
     }
 
-    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
-        get_min_max_values_for_page_index!(self, column, max)
+    fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
+        get_min_max_values_for_page_index!(self, max)
     }
 
     fn num_containers(&self) -> usize {
-        self.offset_indexes.get(self.col_id).unwrap().len()
-    }
-
-    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
-        if let Some((col_id_index, _field)) =
-            self.parquet_schema.column_with_name(&column.name)
-        {
-            if let Some(page_index) = self.page_indexes.get(col_id_index) {
-                match page_index {
-                    Index::NONE => None,
-                    Index::BOOLEAN(index) => Some(Arc::new(Int64Array::from_iter(
-                        index.indexes.iter().map(|x| x.null_count),
-                    ))),
-                    Index::INT32(index) => Some(Arc::new(Int64Array::from_iter(
-                        index.indexes.iter().map(|x| x.null_count),
-                    ))),
-                    Index::INT64(index) => Some(Arc::new(Int64Array::from_iter(
-                        index.indexes.iter().map(|x| x.null_count),
-                    ))),
-                    Index::FLOAT(index) => Some(Arc::new(Int64Array::from_iter(
-                        index.indexes.iter().map(|x| x.null_count),
-                    ))),
-                    Index::DOUBLE(index) => Some(Arc::new(Int64Array::from_iter(
-                        index.indexes.iter().map(|x| x.null_count),
-                    ))),
-                    Index::INT96(_)
-                    | Index::BYTE_ARRAY(_)
-                    | Index::FIXED_LEN_BYTE_ARRAY(_) => {
-                        // Todo support these types
-                        None
-                    }
-                }
-            } else {
+        self.col_offset_indexes.len()
+    }
+
+    fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
+        match self.col_page_indexes {
+            Index::NONE => None,
+            Index::BOOLEAN(index) => Some(Arc::new(Int64Array::from_iter(
+                index.indexes.iter().map(|x| x.null_count),
+            ))),
+            Index::INT32(index) => Some(Arc::new(Int64Array::from_iter(
+                index.indexes.iter().map(|x| x.null_count),
+            ))),
+            Index::INT64(index) => Some(Arc::new(Int64Array::from_iter(
+                index.indexes.iter().map(|x| x.null_count),
+            ))),
+            Index::FLOAT(index) => Some(Arc::new(Int64Array::from_iter(
+                index.indexes.iter().map(|x| x.null_count),
+            ))),
+            Index::DOUBLE(index) => Some(Arc::new(Int64Array::from_iter(
+                index.indexes.iter().map(|x| x.null_count),
+            ))),
+            Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => {
+                // Todo support these types
                 None
             }
-        } else {
-            None
         }
     }
 }
@@ -1052,72 +1204,57 @@ fn prune_row_groups(
 
 fn prune_pages_in_one_row_group(
     group: &RowGroupMetaData,
-    predicate: Option<PruningPredicate>,
-    offset_indexes: Option<&Vec<Vec<PageLocation>>>,
-    page_indexes: Option<&Vec<Index>>,
+    predicate: &PruningPredicate,
+    col_offset_indexes: Option<&Vec<PageLocation>>,
+    col_page_indexes: Option<&Index>,
     metrics: &ParquetFileMetrics,
 ) -> Result<Vec<RowSelector>> {
     let num_rows = group.num_rows() as usize;
-    if let (Some(predicate), Some(offset_indexes), Some(page_indexes)) =
-        (&predicate, offset_indexes, page_indexes)
+    if let (Some(col_offset_indexes), Some(col_page_indexes)) =
+        (col_offset_indexes, col_page_indexes)
     {
         let pruning_stats = PagesPruningStatistics {
-            page_indexes,
-            offset_indexes,
-            parquet_schema: predicate.schema().as_ref(),
-            // now we assume only support one col.
-            col_id: *predicate
-                .need_input_columns_ids()
-                .iter()
-                .take(1)
-                .next()
-                .unwrap(),
+            col_page_indexes,
+            col_offset_indexes,
         };
 
         match predicate.prune(&pruning_stats) {
             Ok(values) => {
                 let mut vec = Vec::with_capacity(values.len());
-                if let Some(cols_offset_indexes) =
-                    offset_indexes.get(pruning_stats.col_id)
-                {
-                    let row_vec =
-                        create_row_count_in_each_page(cols_offset_indexes, num_rows);
-                    assert_eq!(row_vec.len(), values.len());
-                    let mut sum_row = *row_vec.first().unwrap();
-                    let mut selected = *values.first().unwrap();
-
-                    for (i, &f) in values.iter().skip(1).enumerate() {
-                        if f == selected {
-                            sum_row += *row_vec.get(i).unwrap();
+                let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows);
+                assert_eq!(row_vec.len(), values.len());
+                let mut sum_row = *row_vec.first().unwrap();
+                let mut selected = *values.first().unwrap();
+
+                for (i, &f) in values.iter().skip(1).enumerate() {
+                    if f == selected {
+                        sum_row += *row_vec.get(i).unwrap();
+                    } else {
+                        let selector = if selected {
+                            RowSelector::select(sum_row)
                         } else {
-                            let selector = if selected {
-                                RowSelector::select(sum_row)
-                            } else {
-                                RowSelector::skip(sum_row)
-                            };
-                            vec.push(selector);
-                            sum_row = *row_vec.get(i).unwrap();
-                            selected = f;
-                        }
+                            RowSelector::skip(sum_row)
+                        };
+                        vec.push(selector);
+                        sum_row = *row_vec.get(i).unwrap();
+                        selected = f;
                     }
+                }
 
-                    let selector = if selected {
-                        RowSelector::select(sum_row)
-                    } else {
-                        RowSelector::skip(sum_row)
-                    };
-                    vec.push(selector);
-                    return Ok(vec);
+                let selector = if selected {
+                    RowSelector::select(sum_row)
                 } else {
-                    debug!("Error evaluating page index predicate values missing page index col_id is{}", pruning_stats.col_id);
-                    metrics.predicate_evaluation_errors.add(1);
-                }
+                    RowSelector::skip(sum_row)
+                };
+                vec.push(selector);
+                return Ok(vec);
             }
             // stats filter array could not be built
-            // return a closure which will not filter out any row groups
+            // return a result which will not filter out any pages
             Err(e) => {
-                debug!("Error evaluating page index predicate values {}", e);
+                error!("Error evaluating page index predicate values {}", e);
                 metrics.predicate_evaluation_errors.add(1);
+                return Ok(vec![RowSelector::select(group.num_rows() as usize)]);
             }
         }
     }
@@ -2052,6 +2189,65 @@ mod tests {
         ParquetFileMetrics::new(0, "file.parquet", &metrics)
     }
 
+    #[test]
+    fn test_combine_row_selection() {
+        // a size equal b size
+        let a = vec![
+            RowSelector::select(5),
+            RowSelector::skip(4),
+            RowSelector::select(1),
+        ];
+        let b = vec![
+            RowSelector::select(8),
+            RowSelector::skip(1),
+            RowSelector::select(1),
+        ];
+
+        let res = intersect_row_selection(a, b);
+        assert_eq!(
+            res,
+            vec![
+                RowSelector::select(5),
+                RowSelector::skip(4),
+                RowSelector::select(1)
+            ],
+        );
+
+        // a size larger than b size
+        let a = vec![
+            RowSelector::select(3),
+            RowSelector::skip(33),
+            RowSelector::select(3),
+            RowSelector::skip(33),
+        ];
+        let b = vec![RowSelector::select(36), RowSelector::skip(36)];
+        let res = intersect_row_selection(a, b);
+        assert_eq!(res, vec![RowSelector::select(3), RowSelector::skip(69)]);
+
+        // a size less than b size
+        let a = vec![RowSelector::select(3), RowSelector::skip(7)];
+        let b = vec![
+            RowSelector::select(2),
+            RowSelector::skip(2),
+            RowSelector::select(2),
+            RowSelector::skip(2),
+            RowSelector::select(2),
+        ];
+        let res = intersect_row_selection(a, b);
+        assert_eq!(res, vec![RowSelector::select(2), RowSelector::skip(8)]);
+
+        let a = vec![RowSelector::select(3), RowSelector::skip(7)];
+        let b = vec![
+            RowSelector::select(2),
+            RowSelector::skip(2),
+            RowSelector::select(2),
+            RowSelector::skip(2),
+            RowSelector::select(2),
+        ];
+        let res = intersect_row_selection(a, b);
+        assert_eq!(res, vec![RowSelector::select(2), RowSelector::skip(8),]);
+    }
+
     #[test]
     fn row_group_pruning_predicate_simple_expr() {
         use datafusion_expr::{col, lit};
@@ -2614,67 +2810,4 @@ mod tests {
 
         Ok(())
     }
-
-    #[tokio::test]
-    async fn parquet_exec_with_page_index_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
-
-        let object_store_url = ObjectStoreUrl::local_filesystem();
-        let store = session_ctx
-            .runtime_env()
-            .object_store(&object_store_url)
-            .unwrap();
-
-        let testdata = crate::test_util::parquet_test_data();
-        let filename = format!("{}/alltypes_tiny_pages.parquet", testdata);
-
-        let meta = local_unpartitioned_file(filename);
-
-        let schema = ParquetFormat::default()
-            .infer_schema(&store, &[meta.clone()])
-            .await
-            .unwrap();
-
-        let partitioned_file = PartitionedFile {
-            object_meta: meta,
-            partition_values: vec![],
-            range: None,
-            extensions: None,
-        };
-
-        // create filter month == 1;
-        let filter = col("month").eq(lit(1_i32));
-        let parquet_exec = ParquetExec::new(
-            FileScanConfig {
-                object_store_url,
-                file_groups: vec![vec![partitioned_file]],
-                file_schema: schema,
-                statistics: Statistics::default(),
-                // file has 10 cols so index 12 should be month
-                projection: None,
-                limit: None,
-                table_partition_cols: vec![],
-                config_options: ConfigOptions::new().into_shareable(),
-            },
-            Some(filter),
-            None,
-        );
-
-        let parquet_exec_page_index = parquet_exec.clone().with_enable_page_index(true);
-
-        let mut results = parquet_exec_page_index.execute(0, task_ctx)?;
-
-        let batch = results.next().await.unwrap()?;
-
-        //  from the page index should create below RowSelection
-        //  vec.push(RowSelector::select(312));
-        //  vec.push(RowSelector::skip(3330));
-        //  vec.push(RowSelector::select(333));
-        //  vec.push(RowSelector::skip(3330));
-        // total 645 row
-
-        assert_eq!(batch.num_rows(), 645);
-        Ok(())
-    }
 }
diff --git a/datafusion/core/tests/parquet_page_index_pruning.rs b/datafusion/core/tests/parquet_page_index_pruning.rs
new file mode 100644
index 000000000..2a8791b69
--- /dev/null
+++ b/datafusion/core/tests/parquet_page_index_pruning.rs
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::config::ConfigOptions;
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::file_format::FileFormat;
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+use datafusion_common::Statistics;
+use datafusion_expr::{col, lit, Expr};
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use tokio_stream::StreamExt;
+
+async fn get_parquet_exec(filter: Expr, session_ctx: SessionContext) -> ParquetExec {
+    let object_store_url = ObjectStoreUrl::local_filesystem();
+    let store = session_ctx
+        .runtime_env()
+        .object_store(&object_store_url)
+        .unwrap();
+
+    let testdata = datafusion::test_util::parquet_test_data();
+    let filename = format!("{}/alltypes_tiny_pages.parquet", testdata);
+
+    let location = Path::from_filesystem_path(filename.as_str()).unwrap();
+    let metadata = std::fs::metadata(filename).expect("Local file metadata");
+    let meta = ObjectMeta {
+        location,
+        last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
+        size: metadata.len() as usize,
+    };
+
+    let schema = ParquetFormat::default()
+        .infer_schema(&store, &[meta.clone()])
+        .await
+        .unwrap();
+
+    let partitioned_file = PartitionedFile {
+        object_meta: meta,
+        partition_values: vec![],
+        range: None,
+        extensions: None,
+    };
+
+    let parquet_exec = ParquetExec::new(
+        FileScanConfig {
+            object_store_url,
+            file_groups: vec![vec![partitioned_file]],
+            file_schema: schema,
+            statistics: Statistics::default(),
+            // file has 10 cols so index 12 should be month
+            projection: None,
+            limit: None,
+            table_partition_cols: vec![],
+            config_options: ConfigOptions::new().into_shareable(),
+        },
+        Some(filter),
+        None,
+    );
+    parquet_exec.with_enable_page_index(true)
+}
+
+#[tokio::test]
+async fn page_index_filter_one_col() {
+    let session_ctx = SessionContext::new();
+    let task_ctx = session_ctx.task_ctx();
+
+    // 1.create filter month == 1;
+    let filter = col("month").eq(lit(1_i32));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await.unwrap().unwrap();
+
+    // `month = 1` from the page index should create below RowSelection
+    //  vec.push(RowSelector::select(312));
+    //  vec.push(RowSelector::skip(3330));
+    //  vec.push(RowSelector::select(333));
+    //  vec.push(RowSelector::skip(3330));
+    // total 645 row
+    assert_eq!(batch.num_rows(), 645);
+
+    // 2. create filter month == 1 or month == 2;
+    let filter = col("month").eq(lit(1_i32)).or(col("month").eq(lit(2_i32)));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await.unwrap().unwrap();
+
+    // `month = 12` from the page index should create below RowSelection
+    //  vec.push(RowSelector::skip(894));
+    //  vec.push(RowSelector::select(339));
+    //  vec.push(RowSelector::skip(3330));
+    //  vec.push(RowSelector::select(312));
+    //  vec.push(RowSelector::skip(2430));
+    //  combine with before filter total 1275 row
+    assert_eq!(batch.num_rows(), 1275);
+
+    // 3. create filter month == 1 and month == 12;
+    let filter = col("month")
+        .eq(lit(1_i32))
+        .and(col("month").eq(lit(12_i32)));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await;
+
+    assert!(batch.is_none());
+
+    // 4.create filter 0 < month < 2 ;
+    let filter = col("month").gt(lit(0_i32)).and(col("month").lt(lit(2_i32)));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await.unwrap().unwrap();
+
+    // should same with `month = 1`
+    assert_eq!(batch.num_rows(), 645);
+}
+
+#[tokio::test]
+async fn page_index_filter_multi_col() {
+    let session_ctx = SessionContext::new();
+    let task_ctx = session_ctx.task_ctx();
+
+    // create filter month == 1 and year = 2009;
+    let filter = col("month").eq(lit(1_i32)).and(col("year").eq(lit(2009)));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await.unwrap().unwrap();
+
+    //  `year = 2009` from the page index should create below RowSelection
+    //  vec.push(RowSelector::select(3663));
+    //  vec.push(RowSelector::skip(3642));
+    //  combine with `month = 1` total 333 row
+    assert_eq!(batch.num_rows(), 333);
+
+    // create filter (year = 2009 or id = 1) and month = 1;
+    // this should only use `month = 1` to evaluate the page index.
+    let filter = col("month")
+        .eq(lit(1_i32))
+        .and(col("year").eq(lit(2009)).or(col("id").eq(lit(1))));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await.unwrap().unwrap();
+    assert_eq!(batch.num_rows(), 645);
+
+    // create filter (year = 2009 or id = 1)
+    // this filter use two columns will not push down
+    let filter = col("year").eq(lit(2009)).or(col("id").eq(lit(1)));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await.unwrap().unwrap();
+    assert_eq!(batch.num_rows(), 7300);
+
+    // create filter (year = 2009 and id = 1) or (year = 2010)
+    // this filter use two columns will not push down
+    // todo but after use CNF rewrite it could rewrite to (year = 2009 or  year = 2010) and (id = 1 or year = 2010)
+    // which could push (year = 2009 or year = 2010) down.
+    let filter = col("year")
+        .eq(lit(2009))
+        .and(col("id").eq(lit(1)))
+        .or(col("year").eq(lit(2010)));
+
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+
+    let batch = results.next().await.unwrap().unwrap();
+    assert_eq!(batch.num_rows(), 7300);
+}