You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/05/20 21:29:32 UTC

[arrow-datafusion] branch master updated: Refactor: move RowGroupPredicateBuilder into its own module, rename to PruningPredicateBuilder (#365)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 913bf86  Refactor: move RowGroupPredicateBuilder into its own module, rename to PruningPredicateBuilder (#365)
913bf86 is described below

commit 913bf86147773863ec96b76f1f7cfc1cfbb84a6e
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu May 20 17:29:24 2021 -0400

    Refactor: move RowGroupPredicateBuilder into its own module, rename to PruningPredicateBuilder (#365)
    
    * Move row group pruning logic into pruning.rs
    
    * Rename RowGroup Predicates --> Pruning Predicates, touch up comments
    
    * add rat
    
    * Remove commented out println! and code
    
    * use .any instead of filter/count
---
 datafusion/src/physical_optimizer/mod.rs     |   1 +
 datafusion/src/physical_optimizer/pruning.rs | 751 ++++++++++++++++++++++++++
 datafusion/src/physical_plan/parquet.rs      | 781 ++-------------------------
 3 files changed, 784 insertions(+), 749 deletions(-)

diff --git a/datafusion/src/physical_optimizer/mod.rs b/datafusion/src/physical_optimizer/mod.rs
index eca63db..8e79fe9 100644
--- a/datafusion/src/physical_optimizer/mod.rs
+++ b/datafusion/src/physical_optimizer/mod.rs
@@ -21,4 +21,5 @@
 pub mod coalesce_batches;
 pub mod merge_exec;
 pub mod optimizer;
+pub mod pruning;
 pub mod repartition;
diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs
new file mode 100644
index 0000000..a13ca56
--- /dev/null
+++ b/datafusion/src/physical_optimizer/pruning.rs
@@ -0,0 +1,751 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains code to rule out row groups / partitions /
+//! etc based on statistics prior in order to skip evaluating entire
+//! swaths of rows.
+//!
+//! This code is currently specific to Parquet, but soon (TM), via
+//! https://github.com/apache/arrow-datafusion/issues/363 it will
+//! be genericized.
+
+use std::{collections::HashSet, sync::Arc};
+
+use arrow::{
+    array::{
+        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
+        BooleanBufferBuilder,
+    },
+    buffer::MutableBuffer,
+    datatypes::{DataType, Field, Schema},
+    record_batch::RecordBatch,
+};
+
+use parquet::file::{
+    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
+};
+
+use crate::{
+    error::{DataFusionError, Result},
+    execution::context::ExecutionContextState,
+    logical_plan::{Expr, Operator},
+    optimizer::utils,
+    physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
+};
+
+#[derive(Debug, Clone)]
+/// Builder used for generating predicate functions that can be used
+/// to prune data based on statistics (e.g. parquet row group metadata)
+pub struct PruningPredicateBuilder {
+    schema: Schema,
+    predicate_expr: Arc<dyn PhysicalExpr>,
+    stat_column_req: Vec<(String, StatisticsType, Field)>,
+}
+
+impl PruningPredicateBuilder {
+    /// Try to create a new instance of [`PruningPredicateBuilder`]
+    ///
+    /// This will translate the filter expression into a statistics predicate expression
+    ///
+    /// For example,  `(column / 2) = 4` becomes `(column_min / 2) <= 4 && 4 <= (column_max / 2))`
+    pub fn try_new(expr: &Expr, schema: Schema) -> Result<Self> {
+        // build predicate expression once
+        let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new();
+        let logical_predicate_expr =
+            build_predicate_expression(expr, &schema, &mut stat_column_req)?;
+        let stat_fields = stat_column_req
+            .iter()
+            .map(|(_, _, f)| f.clone())
+            .collect::<Vec<_>>();
+        let stat_schema = Schema::new(stat_fields);
+        let execution_context_state = ExecutionContextState::new();
+        let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr(
+            &logical_predicate_expr,
+            &stat_schema,
+            &execution_context_state,
+        )?;
+        Ok(Self {
+            schema,
+            predicate_expr,
+            stat_column_req,
+        })
+    }
+
+    /// Generate a predicate function used to filter based on
+    /// statistics
+    ///
+    /// This function takes a slice of statistics as parameter, so
+    /// that DataFusion's physical expressions can be executed once
+    /// against a single RecordBatch, containing statistics arrays, on
+    /// which the physical predicate expression is executed to
+    /// generate a row group filter array.
+    ///
+    /// The generated filter function is then used in the returned
+    /// closure to filter row groups. NOTE this is parquet specific at the moment
+    pub fn build_pruning_predicate(
+        &self,
+        row_group_metadata: &[RowGroupMetaData],
+    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+        // build statistics record batch
+        let predicate_result = build_statistics_record_batch(
+            row_group_metadata,
+            &self.schema,
+            &self.stat_column_req,
+        )
+        .and_then(|statistics_batch| {
+            // execute predicate expression
+            self.predicate_expr.evaluate(&statistics_batch)
+        })
+        .and_then(|v| match v {
+            ColumnarValue::Array(array) => Ok(array),
+            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
+                "predicate expression didn't return an array".to_string(),
+            )),
+        });
+
+        let predicate_array = match predicate_result {
+            Ok(array) => array,
+            // row group filter array could not be built
+            // return a closure which will not filter out any row groups
+            _ => return Box::new(|_r, _i| true),
+        };
+
+        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
+        match predicate_array {
+            // return row group predicate function
+            Some(array) => {
+                // when the result of the predicate expression for a row group is null / undefined,
+                // e.g. due to missing statistics, this row group can't be filtered out,
+                // so replace with true
+                let predicate_values =
+                    array.iter().map(|x| x.unwrap_or(true)).collect::<Vec<_>>();
+                Box::new(move |_, i| predicate_values[i])
+            }
+            // predicate result is not a BooleanArray
+            // return a closure which will not filter out any row groups
+            _ => Box::new(|_r, _i| true),
+        }
+    }
+}
+
+/// Build a RecordBatch from a list of statistics (currently parquet
+/// [`RowGroupMetadata`] structs), creating arrays, one for each
+/// statistics column, as requested in the stat_column_req parameter.
+fn build_statistics_record_batch(
+    row_groups: &[RowGroupMetaData],
+    schema: &Schema,
+    stat_column_req: &[(String, StatisticsType, Field)],
+) -> Result<RecordBatch> {
+    let mut fields = Vec::<Field>::new();
+    let mut arrays = Vec::<ArrayRef>::new();
+    for (column_name, statistics_type, stat_field) in stat_column_req {
+        if let Some((column_index, _)) = schema.column_with_name(column_name) {
+            let statistics = row_groups
+                .iter()
+                .map(|g| g.column(column_index).statistics())
+                .collect::<Vec<_>>();
+            let array = build_statistics_array(
+                &statistics,
+                *statistics_type,
+                stat_field.data_type(),
+            );
+            fields.push(stat_field.clone());
+            arrays.push(array);
+        }
+    }
+    let schema = Arc::new(Schema::new(fields));
+    RecordBatch::try_new(schema, arrays)
+        .map_err(|err| DataFusionError::Plan(err.to_string()))
+}
+
+struct StatisticsExpressionBuilder<'a> {
+    column_name: String,
+    column_expr: &'a Expr,
+    scalar_expr: &'a Expr,
+    field: &'a Field,
+    stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    reverse_operator: bool,
+}
+
+impl<'a> StatisticsExpressionBuilder<'a> {
+    fn try_new(
+        left: &'a Expr,
+        right: &'a Expr,
+        schema: &'a Schema,
+        stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    ) -> Result<Self> {
+        // find column name; input could be a more complicated expression
+        let mut left_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(left, &mut left_columns)?;
+        let mut right_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(right, &mut right_columns)?;
+        let (column_expr, scalar_expr, column_names, reverse_operator) =
+            match (left_columns.len(), right_columns.len()) {
+                (1, 0) => (left, right, left_columns, false),
+                (0, 1) => (right, left, right_columns, true),
+                _ => {
+                    // if more than one column used in expression - not supported
+                    return Err(DataFusionError::Plan(
+                        "Multi-column expressions are not currently supported"
+                            .to_string(),
+                    ));
+                }
+            };
+        let column_name = column_names.iter().next().unwrap().clone();
+        let field = match schema.column_with_name(&column_name) {
+            Some((_, f)) => f,
+            _ => {
+                return Err(DataFusionError::Plan(
+                    "Field not found in schema".to_string(),
+                ));
+            }
+        };
+
+        Ok(Self {
+            column_name,
+            column_expr,
+            scalar_expr,
+            field,
+            stat_column_req,
+            reverse_operator,
+        })
+    }
+
+    fn correct_operator(&self, op: Operator) -> Operator {
+        if !self.reverse_operator {
+            return op;
+        }
+
+        match op {
+            Operator::Lt => Operator::Gt,
+            Operator::Gt => Operator::Lt,
+            Operator::LtEq => Operator::GtEq,
+            Operator::GtEq => Operator::LtEq,
+            _ => op,
+        }
+    }
+
+    fn scalar_expr(&self) -> &Expr {
+        self.scalar_expr
+    }
+
+    fn is_stat_column_missing(&self, statistics_type: StatisticsType) -> bool {
+        !self
+            .stat_column_req
+            .iter()
+            .any(|(c, t, _f)| c == &self.column_name && t == &statistics_type)
+    }
+
+    fn stat_column_expr(
+        &mut self,
+        stat_type: StatisticsType,
+        suffix: &str,
+    ) -> Result<Expr> {
+        let stat_column_name = format!("{}_{}", self.column_name, suffix);
+        let stat_field = Field::new(
+            stat_column_name.as_str(),
+            self.field.data_type().clone(),
+            self.field.is_nullable(),
+        );
+        if self.is_stat_column_missing(stat_type) {
+            // only add statistics column if not previously added
+            self.stat_column_req
+                .push((self.column_name.clone(), stat_type, stat_field));
+        }
+        rewrite_column_expr(
+            self.column_expr,
+            self.column_name.as_str(),
+            stat_column_name.as_str(),
+        )
+    }
+
+    fn min_column_expr(&mut self) -> Result<Expr> {
+        self.stat_column_expr(StatisticsType::Min, "min")
+    }
+
+    fn max_column_expr(&mut self) -> Result<Expr> {
+        self.stat_column_expr(StatisticsType::Max, "max")
+    }
+}
+
+/// replaces a column with an old name with a new name in an expression
+fn rewrite_column_expr(
+    expr: &Expr,
+    column_old_name: &str,
+    column_new_name: &str,
+) -> Result<Expr> {
+    let expressions = utils::expr_sub_expressions(&expr)?;
+    let expressions = expressions
+        .iter()
+        .map(|e| rewrite_column_expr(e, column_old_name, column_new_name))
+        .collect::<Result<Vec<_>>>()?;
+
+    if let Expr::Column(name) = expr {
+        if name == column_old_name {
+            return Ok(Expr::Column(column_new_name.to_string()));
+        }
+    }
+    utils::rewrite_expression(&expr, &expressions)
+}
+
+/// Translate logical filter expression into statistics predicate expression
+fn build_predicate_expression(
+    expr: &Expr,
+    schema: &Schema,
+    stat_column_req: &mut Vec<(String, StatisticsType, Field)>,
+) -> Result<Expr> {
+    use crate::logical_plan;
+    // predicate expression can only be a binary expression
+    let (left, op, right) = match expr {
+        Expr::BinaryExpr { left, op, right } => (left, *op, right),
+        _ => {
+            // unsupported expression - replace with TRUE
+            // this can still be useful when multiple conditions are joined using AND
+            // such as: column > 10 AND TRUE
+            return Ok(logical_plan::lit(true));
+        }
+    };
+
+    if op == Operator::And || op == Operator::Or {
+        let left_expr = build_predicate_expression(left, schema, stat_column_req)?;
+        let right_expr = build_predicate_expression(right, schema, stat_column_req)?;
+        return Ok(logical_plan::binary_expr(left_expr, op, right_expr));
+    }
+
+    let expr_builder =
+        StatisticsExpressionBuilder::try_new(left, right, schema, stat_column_req);
+    let mut expr_builder = match expr_builder {
+        Ok(builder) => builder,
+        // allow partial failure in predicate expression generation
+        // this can still produce a useful predicate when multiple conditions are joined using AND
+        Err(_) => {
+            return Ok(logical_plan::lit(true));
+        }
+    };
+    let corrected_op = expr_builder.correct_operator(op);
+    let statistics_expr = match corrected_op {
+        Operator::Eq => {
+            // column = literal => (min, max) = literal => min <= literal && literal <= max
+            // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
+            let min_column_expr = expr_builder.min_column_expr()?;
+            let max_column_expr = expr_builder.max_column_expr()?;
+            min_column_expr
+                .lt_eq(expr_builder.scalar_expr().clone())
+                .and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr))
+        }
+        Operator::Gt => {
+            // column > literal => (min, max) > literal => max > literal
+            expr_builder
+                .max_column_expr()?
+                .gt(expr_builder.scalar_expr().clone())
+        }
+        Operator::GtEq => {
+            // column >= literal => (min, max) >= literal => max >= literal
+            expr_builder
+                .max_column_expr()?
+                .gt_eq(expr_builder.scalar_expr().clone())
+        }
+        Operator::Lt => {
+            // column < literal => (min, max) < literal => min < literal
+            expr_builder
+                .min_column_expr()?
+                .lt(expr_builder.scalar_expr().clone())
+        }
+        Operator::LtEq => {
+            // column <= literal => (min, max) <= literal => min <= literal
+            expr_builder
+                .min_column_expr()?
+                .lt_eq(expr_builder.scalar_expr().clone())
+        }
+        // other expressions are not supported
+        _ => logical_plan::lit(true),
+    };
+    Ok(statistics_expr)
+}
+
+#[derive(Debug, Copy, Clone, PartialEq)]
+enum StatisticsType {
+    Min,
+    Max,
+}
+
+fn build_statistics_array(
+    statistics: &[Option<&ParquetStatistics>],
+    statistics_type: StatisticsType,
+    data_type: &DataType,
+) -> ArrayRef {
+    let statistics_count = statistics.len();
+    let first_group_stats = statistics.iter().find(|s| s.is_some());
+    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
+        // found first row group with statistics defined
+        statistics
+    } else {
+        // no row group has statistics defined
+        return new_null_array(data_type, statistics_count);
+    };
+
+    let (data_size, arrow_type) = match first_group_stats {
+        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
+        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
+        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
+        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
+        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
+            (0, DataType::Utf8)
+        }
+        _ => {
+            // type of statistics not supported
+            return new_null_array(data_type, statistics_count);
+        }
+    };
+
+    let statistics = statistics.iter().map(|s| {
+        s.filter(|s| s.has_min_max_set())
+            .map(|s| match statistics_type {
+                StatisticsType::Min => s.min_bytes(),
+                StatisticsType::Max => s.max_bytes(),
+            })
+    });
+
+    if arrow_type == DataType::Utf8 {
+        let data_size = statistics
+            .clone()
+            .map(|x| x.map(|b| b.len()).unwrap_or(0))
+            .sum();
+        let mut builder =
+            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
+        let string_statistics =
+            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
+        for maybe_string in string_statistics {
+            match maybe_string {
+                Some(string_value) => builder.append_value(string_value).unwrap(),
+                None => builder.append_null().unwrap(),
+            };
+        }
+        return Arc::new(builder.finish());
+    }
+
+    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
+    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
+    let mut null_count = 0;
+    for s in statistics {
+        if let Some(stat_data) = s {
+            bitmap_builder.append(true);
+            data_buffer.extend_from_slice(stat_data);
+        } else {
+            bitmap_builder.append(false);
+            data_buffer.resize(data_buffer.len() + data_size, 0);
+            null_count += 1;
+        }
+    }
+
+    let mut builder = ArrayData::builder(arrow_type)
+        .len(statistics_count)
+        .add_buffer(data_buffer.into());
+    if null_count > 0 {
+        builder = builder.null_bit_buffer(bitmap_builder.finish());
+    }
+    let array_data = builder.build();
+    let statistics_array = make_array(array_data);
+    if statistics_array.data_type() == data_type {
+        return statistics_array;
+    }
+    // cast statistics array to required data type
+    arrow::compute::cast(&statistics_array, data_type)
+        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::physical_optimizer::pruning::StatisticsType;
+    use arrow::{
+        array::{Int32Array, StringArray},
+        datatypes::DataType,
+    };
+    use parquet::file::statistics::Statistics as ParquetStatistics;
+
+    #[test]
+    fn build_statistics_array_int32() {
+        // build row group metadata array
+        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
+        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
+        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
+        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
+
+        let statistics_array =
+            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
+        let int32_array = statistics_array
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
+        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
+
+        let statistics_array =
+            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
+        let int32_array = statistics_array
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
+        // here the first max value is None and not the Some(10) value which was actually set
+        // because the min value is None
+        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    }
+
+    #[test]
+    fn build_statistics_array_utf8() {
+        // build row group metadata array
+        let s1 = ParquetStatistics::byte_array(None, Some("10".into()), None, 0, false);
+        let s2 = ParquetStatistics::byte_array(
+            Some("2".into()),
+            Some("20".into()),
+            None,
+            0,
+            false,
+        );
+        let s3 = ParquetStatistics::byte_array(
+            Some("3".into()),
+            Some("30".into()),
+            None,
+            0,
+            false,
+        );
+        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
+
+        let statistics_array =
+            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Utf8);
+        let string_array = statistics_array
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        let string_vec = string_array.into_iter().collect::<Vec<_>>();
+        assert_eq!(string_vec, vec![None, Some("2"), Some("3")]);
+
+        let statistics_array =
+            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Utf8);
+        let string_array = statistics_array
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        let string_vec = string_array.into_iter().collect::<Vec<_>>();
+        // here the first max value is None and not the Some("10") value which was actually set
+        // because the min value is None
+        assert_eq!(string_vec, vec![None, Some("20"), Some("30")]);
+    }
+
+    #[test]
+    fn build_statistics_array_empty_stats() {
+        let data_type = DataType::Int32;
+        let statistics = vec![];
+        let statistics_array =
+            build_statistics_array(&statistics, StatisticsType::Min, &data_type);
+        assert_eq!(statistics_array.len(), 0);
+
+        let statistics = vec![None, None];
+        let statistics_array =
+            build_statistics_array(&statistics, StatisticsType::Min, &data_type);
+        assert_eq!(statistics_array.len(), statistics.len());
+        assert_eq!(statistics_array.data_type(), &data_type);
+        for i in 0..statistics_array.len() {
+            assert_eq!(statistics_array.is_null(i), true);
+            assert_eq!(statistics_array.is_valid(i), false);
+        }
+    }
+
+    #[test]
+    fn build_statistics_array_unsupported_type() {
+        // boolean is not currently a supported type for statistics
+        let s1 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false);
+        let s2 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false);
+        let statistics = vec![Some(&s1), Some(&s2)];
+        let data_type = DataType::Boolean;
+        let statistics_array =
+            build_statistics_array(&statistics, StatisticsType::Min, &data_type);
+        assert_eq!(statistics_array.len(), statistics.len());
+        assert_eq!(statistics_array.data_type(), &data_type);
+        for i in 0..statistics_array.len() {
+            assert_eq!(statistics_array.is_null(i), true);
+            assert_eq!(statistics_array.is_valid(i), false);
+        }
+    }
+
+    #[test]
+    fn row_group_predicate_eq() -> Result<()> {
+        use crate::logical_plan::{col, lit};
+        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+        let expected_expr = "#c1_min LtEq Int32(1) And Int32(1) LtEq #c1_max";
+
+        // test column on the left
+        let expr = col("c1").eq(lit(1));
+        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
+        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+
+        // test column on the right
+        let expr = lit(1).eq(col("c1"));
+        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
+        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+
+        Ok(())
+    }
+
+    #[test]
+    fn row_group_predicate_gt() -> Result<()> {
+        use crate::logical_plan::{col, lit};
+        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+        let expected_expr = "#c1_max Gt Int32(1)";
+
+        // test column on the left
+        let expr = col("c1").gt(lit(1));
+        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
+        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+
+        // test column on the right
+        let expr = lit(1).lt(col("c1"));
+        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
+        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+
+        Ok(())
+    }
+
+    #[test]
+    fn row_group_predicate_gt_eq() -> Result<()> {
+        use crate::logical_plan::{col, lit};
+        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+        let expected_expr = "#c1_max GtEq Int32(1)";
+
+        // test column on the left
+        let expr = col("c1").gt_eq(lit(1));
+        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
+        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+        // test column on the right
+        let expr = lit(1).lt_eq(col("c1"));
+        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
+        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+
+        Ok(())
+    }
+
+    #[test]
+    fn row_group_predicate_lt() -> Result<()> {
+        use crate::logical_plan::{col, lit};
+        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+        let expected_expr = "#c1_min Lt Int32(1)";
+
+        // test column on the left
+        let expr = col("c1").lt(lit(1));
+        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
+        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+
+        // test column on the right
+        let expr = lit(1).gt(col("c1"));
+        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
+        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+
+        Ok(())
+    }
+
+    #[test]
+    fn row_group_predicate_lt_eq() -> Result<()> {
+        use crate::logical_plan::{col, lit};
+        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+        let expected_expr = "#c1_min LtEq Int32(1)";
+
+        // test column on the left
+        let expr = col("c1").lt_eq(lit(1));
+        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
+        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+        // test column on the right
+        let expr = lit(1).gt_eq(col("c1"));
+        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
+        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+
+        Ok(())
+    }
+
+    #[test]
+    fn row_group_predicate_and() -> Result<()> {
+        use crate::logical_plan::{col, lit};
+        let schema = Schema::new(vec![
+            Field::new("c1", DataType::Int32, false),
+            Field::new("c2", DataType::Int32, false),
+            Field::new("c3", DataType::Int32, false),
+        ]);
+        // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression
+        let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
+        let expected_expr = "#c1_min Lt Int32(1) And Boolean(true)";
+        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
+        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+
+        Ok(())
+    }
+
+    #[test]
+    fn row_group_predicate_or() -> Result<()> {
+        use crate::logical_plan::{col, lit};
+        let schema = Schema::new(vec![
+            Field::new("c1", DataType::Int32, false),
+            Field::new("c2", DataType::Int32, false),
+        ]);
+        // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 expression
+        let expr = col("c1").lt(lit(1)).or(col("c2").modulus(lit(2)));
+        let expected_expr = "#c1_min Lt Int32(1) Or Boolean(true)";
+        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
+        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+
+        Ok(())
+    }
+
+    #[test]
+    fn row_group_predicate_stat_column_req() -> Result<()> {
+        use crate::logical_plan::{col, lit};
+        let schema = Schema::new(vec![
+            Field::new("c1", DataType::Int32, false),
+            Field::new("c2", DataType::Int32, false),
+        ]);
+        let mut stat_column_req = vec![];
+        // c1 < 1 and (c2 = 2 or c2 = 3)
+        let expr = col("c1")
+            .lt(lit(1))
+            .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
+        let expected_expr = "#c1_min Lt Int32(1) And #c2_min LtEq Int32(2) And Int32(2) LtEq #c2_max Or #c2_min LtEq Int32(3) And Int32(3) LtEq #c2_max";
+        let predicate_expr =
+            build_predicate_expression(&expr, &schema, &mut stat_column_req)?;
+        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+        // c1 < 1 should add c1_min
+        let c1_min_field = Field::new("c1_min", DataType::Int32, false);
+        assert_eq!(
+            stat_column_req[0],
+            ("c1".to_owned(), StatisticsType::Min, c1_min_field)
+        );
+        // c2 = 2 should add c2_min and c2_max
+        let c2_min_field = Field::new("c2_min", DataType::Int32, false);
+        assert_eq!(
+            stat_column_req[1],
+            ("c2".to_owned(), StatisticsType::Min, c2_min_field)
+        );
+        let c2_max_field = Field::new("c2_max", DataType::Int32, false);
+        assert_eq!(
+            stat_column_req[2],
+            ("c2".to_owned(), StatisticsType::Max, c2_max_field)
+        );
+        // c2 = 3 shouldn't add any new statistics fields
+        assert_eq!(stat_column_req.len(), 3);
+
+        Ok(())
+    }
+}
diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs
index dd5e77b..66b1253 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -17,38 +17,28 @@
 
 //! Execution plan for reading Parquet files
 
+use std::any::Any;
 use std::fmt;
 use std::fs::File;
 use std::sync::Arc;
 use std::task::{Context, Poll};
-use std::{any::Any, collections::HashSet};
 
-use super::{
-    planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr, RecordBatchStream,
-    SendableRecordBatchStream,
-};
-use crate::physical_plan::{common, DisplayFormatType, ExecutionPlan, Partitioning};
 use crate::{
     error::{DataFusionError, Result},
-    execution::context::ExecutionContextState,
-    logical_plan::{Expr, Operator},
-    optimizer::utils,
+    logical_plan::Expr,
+    physical_optimizer::pruning::PruningPredicateBuilder,
+    physical_plan::{
+        common, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+        SendableRecordBatchStream,
+    },
 };
-use arrow::record_batch::RecordBatch;
+
 use arrow::{
-    array::new_null_array,
+    datatypes::{Schema, SchemaRef},
     error::{ArrowError, Result as ArrowResult},
+    record_batch::RecordBatch,
 };
-use arrow::{
-    array::{make_array, ArrayData, ArrayRef, BooleanArray, BooleanBufferBuilder},
-    buffer::MutableBuffer,
-    datatypes::{DataType, Field, Schema, SchemaRef},
-};
-use parquet::file::{
-    metadata::RowGroupMetaData,
-    reader::{FileReader, SerializedFileReader},
-    statistics::Statistics as ParquetStatistics,
-};
+use parquet::file::reader::{FileReader, SerializedFileReader};
 
 use fmt::Debug;
 use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
@@ -76,7 +66,7 @@ pub struct ParquetExec {
     /// Statistics for the data set (sum of statistics for all partitions)
     statistics: Statistics,
     /// Optional predicate builder
-    predicate_builder: Option<RowGroupPredicateBuilder>,
+    predicate_builder: Option<PruningPredicateBuilder>,
     /// Optional limit of the number of rows
     limit: Option<usize>,
 }
@@ -229,7 +219,7 @@ impl ParquetExec {
         }
         let schema = schemas[0].clone();
         let predicate_builder = predicate.and_then(|predicate_expr| {
-            RowGroupPredicateBuilder::try_new(&predicate_expr, schema.clone()).ok()
+            PruningPredicateBuilder::try_new(&predicate_expr, schema.clone()).ok()
         });
 
         Ok(Self::new(
@@ -247,7 +237,7 @@ impl ParquetExec {
         partitions: Vec<ParquetPartition>,
         schema: Schema,
         projection: Option<Vec<usize>>,
-        predicate_builder: Option<RowGroupPredicateBuilder>,
+        predicate_builder: Option<PruningPredicateBuilder>,
         batch_size: usize,
         limit: Option<usize>,
     ) -> Self {
@@ -358,445 +348,6 @@ impl ParquetPartition {
     }
 }
 
-#[derive(Debug, Clone)]
-/// Predicate builder used for generating of predicate functions, used to filter row group metadata
-pub struct RowGroupPredicateBuilder {
-    parquet_schema: Schema,
-    predicate_expr: Arc<dyn PhysicalExpr>,
-    stat_column_req: Vec<(String, StatisticsType, Field)>,
-}
-
-impl RowGroupPredicateBuilder {
-    /// Try to create a new instance of PredicateExpressionBuilder.
-    /// This will translate the filter expression into a statistics predicate expression
-    /// (for example (column / 2) = 4 becomes (column_min / 2) <= 4 && 4 <= (column_max / 2)),
-    /// then convert it to a DataFusion PhysicalExpression and cache it for later use by build_row_group_predicate.
-    pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result<Self> {
-        // build predicate expression once
-        let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new();
-        let logical_predicate_expr =
-            build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?;
-        // println!(
-        //     "RowGroupPredicateBuilder::try_new, logical_predicate_expr: {:?}",
-        //     logical_predicate_expr
-        // );
-        // build physical predicate expression
-        let stat_fields = stat_column_req
-            .iter()
-            .map(|(_, _, f)| f.clone())
-            .collect::<Vec<_>>();
-        let stat_schema = Schema::new(stat_fields);
-        let execution_context_state = ExecutionContextState::new();
-        let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr(
-            &logical_predicate_expr,
-            &stat_schema,
-            &execution_context_state,
-        )?;
-        // println!(
-        //     "RowGroupPredicateBuilder::try_new, predicate_expr: {:?}",
-        //     predicate_expr
-        // );
-        Ok(Self {
-            parquet_schema,
-            predicate_expr,
-            stat_column_req,
-        })
-    }
-
-    /// Generate a predicate function used to filter row group metadata.
-    /// This function takes a list of all row groups as parameter,
-    /// so that DataFusion's physical expressions can be re-used by
-    /// generating a RecordBatch, containing statistics arrays,
-    /// on which the physical predicate expression is executed to generate a row group filter array.
-    /// The generated filter array is then used in the returned closure to filter row groups.
-    pub fn build_row_group_predicate(
-        &self,
-        row_group_metadata: &[RowGroupMetaData],
-    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
-        // build statistics record batch
-        let predicate_result = build_statistics_record_batch(
-            row_group_metadata,
-            &self.parquet_schema,
-            &self.stat_column_req,
-        )
-        .and_then(|statistics_batch| {
-            // execute predicate expression
-            self.predicate_expr.evaluate(&statistics_batch)
-        })
-        .and_then(|v| match v {
-            ColumnarValue::Array(array) => Ok(array),
-            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
-                "predicate expression didn't return an array".to_string(),
-            )),
-        });
-
-        let predicate_array = match predicate_result {
-            Ok(array) => array,
-            // row group filter array could not be built
-            // return a closure which will not filter out any row groups
-            _ => return Box::new(|_r, _i| true),
-        };
-
-        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
-        match predicate_array {
-            // return row group predicate function
-            Some(array) => {
-                // when the result of the predicate expression for a row group is null / undefined,
-                // e.g. due to missing statistics, this row group can't be filtered out,
-                // so replace with true
-                let predicate_values =
-                    array.iter().map(|x| x.unwrap_or(true)).collect::<Vec<_>>();
-                Box::new(move |_, i| predicate_values[i])
-            }
-            // predicate result is not a BooleanArray
-            // return a closure which will not filter out any row groups
-            _ => Box::new(|_r, _i| true),
-        }
-    }
-}
-
-/// Build a RecordBatch from a list of RowGroupMetadata structs,
-/// creating arrays, one for each statistics column,
-/// as requested in the stat_column_req parameter.
-fn build_statistics_record_batch(
-    row_groups: &[RowGroupMetaData],
-    parquet_schema: &Schema,
-    stat_column_req: &[(String, StatisticsType, Field)],
-) -> Result<RecordBatch> {
-    let mut fields = Vec::<Field>::new();
-    let mut arrays = Vec::<ArrayRef>::new();
-    for (column_name, statistics_type, stat_field) in stat_column_req {
-        if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) {
-            let statistics = row_groups
-                .iter()
-                .map(|g| g.column(column_index).statistics())
-                .collect::<Vec<_>>();
-            let array = build_statistics_array(
-                &statistics,
-                *statistics_type,
-                stat_field.data_type(),
-            );
-            fields.push(stat_field.clone());
-            arrays.push(array);
-        }
-    }
-    let schema = Arc::new(Schema::new(fields));
-    RecordBatch::try_new(schema, arrays)
-        .map_err(|err| DataFusionError::Plan(err.to_string()))
-}
-
-struct StatisticsExpressionBuilder<'a> {
-    column_name: String,
-    column_expr: &'a Expr,
-    scalar_expr: &'a Expr,
-    parquet_field: &'a Field,
-    stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
-    reverse_operator: bool,
-}
-
-impl<'a> StatisticsExpressionBuilder<'a> {
-    fn try_new(
-        left: &'a Expr,
-        right: &'a Expr,
-        parquet_schema: &'a Schema,
-        stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
-    ) -> Result<Self> {
-        // find column name; input could be a more complicated expression
-        let mut left_columns = HashSet::<String>::new();
-        utils::expr_to_column_names(left, &mut left_columns)?;
-        let mut right_columns = HashSet::<String>::new();
-        utils::expr_to_column_names(right, &mut right_columns)?;
-        let (column_expr, scalar_expr, column_names, reverse_operator) =
-            match (left_columns.len(), right_columns.len()) {
-                (1, 0) => (left, right, left_columns, false),
-                (0, 1) => (right, left, right_columns, true),
-                _ => {
-                    // if more than one column used in expression - not supported
-                    return Err(DataFusionError::Plan(
-                        "Multi-column expressions are not currently supported"
-                            .to_string(),
-                    ));
-                }
-            };
-        let column_name = column_names.iter().next().unwrap().clone();
-        let field = match parquet_schema.column_with_name(&column_name) {
-            Some((_, f)) => f,
-            _ => {
-                // field not found in parquet schema
-                return Err(DataFusionError::Plan(
-                    "Field not found in parquet schema".to_string(),
-                ));
-            }
-        };
-
-        Ok(Self {
-            column_name,
-            column_expr,
-            scalar_expr,
-            parquet_field: field,
-            stat_column_req,
-            reverse_operator,
-        })
-    }
-
-    fn correct_operator(&self, op: Operator) -> Operator {
-        if !self.reverse_operator {
-            return op;
-        }
-
-        match op {
-            Operator::Lt => Operator::Gt,
-            Operator::Gt => Operator::Lt,
-            Operator::LtEq => Operator::GtEq,
-            Operator::GtEq => Operator::LtEq,
-            _ => op,
-        }
-    }
-
-    // fn column_expr(&self) -> &Expr {
-    //     self.column_expr
-    // }
-
-    fn scalar_expr(&self) -> &Expr {
-        self.scalar_expr
-    }
-
-    // fn column_name(&self) -> &String {
-    //     &self.column_name
-    // }
-
-    fn is_stat_column_missing(&self, statistics_type: StatisticsType) -> bool {
-        self.stat_column_req
-            .iter()
-            .filter(|(c, t, _f)| c == &self.column_name && t == &statistics_type)
-            .count()
-            == 0
-    }
-
-    fn stat_column_expr(
-        &mut self,
-        stat_type: StatisticsType,
-        suffix: &str,
-    ) -> Result<Expr> {
-        let stat_column_name = format!("{}_{}", self.column_name, suffix);
-        let stat_field = Field::new(
-            stat_column_name.as_str(),
-            self.parquet_field.data_type().clone(),
-            self.parquet_field.is_nullable(),
-        );
-        if self.is_stat_column_missing(stat_type) {
-            // only add statistics column if not previously added
-            self.stat_column_req
-                .push((self.column_name.clone(), stat_type, stat_field));
-        }
-        rewrite_column_expr(
-            self.column_expr,
-            self.column_name.as_str(),
-            stat_column_name.as_str(),
-        )
-    }
-
-    fn min_column_expr(&mut self) -> Result<Expr> {
-        self.stat_column_expr(StatisticsType::Min, "min")
-    }
-
-    fn max_column_expr(&mut self) -> Result<Expr> {
-        self.stat_column_expr(StatisticsType::Max, "max")
-    }
-}
-
-/// replaces a column with an old name with a new name in an expression
-fn rewrite_column_expr(
-    expr: &Expr,
-    column_old_name: &str,
-    column_new_name: &str,
-) -> Result<Expr> {
-    let expressions = utils::expr_sub_expressions(&expr)?;
-    let expressions = expressions
-        .iter()
-        .map(|e| rewrite_column_expr(e, column_old_name, column_new_name))
-        .collect::<Result<Vec<_>>>()?;
-
-    if let Expr::Column(name) = expr {
-        if name == column_old_name {
-            return Ok(Expr::Column(column_new_name.to_string()));
-        }
-    }
-    utils::rewrite_expression(&expr, &expressions)
-}
-
-/// Translate logical filter expression into parquet statistics predicate expression
-fn build_predicate_expression(
-    expr: &Expr,
-    parquet_schema: &Schema,
-    stat_column_req: &mut Vec<(String, StatisticsType, Field)>,
-) -> Result<Expr> {
-    use crate::logical_plan;
-    // predicate expression can only be a binary expression
-    let (left, op, right) = match expr {
-        Expr::BinaryExpr { left, op, right } => (left, *op, right),
-        _ => {
-            // unsupported expression - replace with TRUE
-            // this can still be useful when multiple conditions are joined using AND
-            // such as: column > 10 AND TRUE
-            return Ok(logical_plan::lit(true));
-        }
-    };
-
-    if op == Operator::And || op == Operator::Or {
-        let left_expr =
-            build_predicate_expression(left, parquet_schema, stat_column_req)?;
-        let right_expr =
-            build_predicate_expression(right, parquet_schema, stat_column_req)?;
-        return Ok(logical_plan::binary_expr(left_expr, op, right_expr));
-    }
-
-    let expr_builder = StatisticsExpressionBuilder::try_new(
-        left,
-        right,
-        parquet_schema,
-        stat_column_req,
-    );
-    let mut expr_builder = match expr_builder {
-        Ok(builder) => builder,
-        // allow partial failure in predicate expression generation
-        // this can still produce a useful predicate when multiple conditions are joined using AND
-        Err(_) => {
-            return Ok(logical_plan::lit(true));
-        }
-    };
-    let corrected_op = expr_builder.correct_operator(op);
-    let statistics_expr = match corrected_op {
-        Operator::Eq => {
-            // column = literal => (min, max) = literal => min <= literal && literal <= max
-            // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
-            let min_column_expr = expr_builder.min_column_expr()?;
-            let max_column_expr = expr_builder.max_column_expr()?;
-            min_column_expr
-                .lt_eq(expr_builder.scalar_expr().clone())
-                .and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr))
-        }
-        Operator::Gt => {
-            // column > literal => (min, max) > literal => max > literal
-            expr_builder
-                .max_column_expr()?
-                .gt(expr_builder.scalar_expr().clone())
-        }
-        Operator::GtEq => {
-            // column >= literal => (min, max) >= literal => max >= literal
-            expr_builder
-                .max_column_expr()?
-                .gt_eq(expr_builder.scalar_expr().clone())
-        }
-        Operator::Lt => {
-            // column < literal => (min, max) < literal => min < literal
-            expr_builder
-                .min_column_expr()?
-                .lt(expr_builder.scalar_expr().clone())
-        }
-        Operator::LtEq => {
-            // column <= literal => (min, max) <= literal => min <= literal
-            expr_builder
-                .min_column_expr()?
-                .lt_eq(expr_builder.scalar_expr().clone())
-        }
-        // other expressions are not supported
-        _ => logical_plan::lit(true),
-    };
-    Ok(statistics_expr)
-}
-
-#[derive(Debug, Copy, Clone, PartialEq)]
-enum StatisticsType {
-    Min,
-    Max,
-}
-
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
-
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
-        }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
-        }
-    };
-
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
-        }
-        return Arc::new(builder.finish());
-    }
-
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
-        }
-    }
-
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
-    }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
 #[async_trait]
 impl ExecutionPlan for ParquetExec {
     /// Return a reference to Any that can be used for downcasting
@@ -906,7 +457,7 @@ fn send_result(
 fn read_files(
     filenames: &[String],
     projection: &[usize],
-    predicate_builder: &Option<RowGroupPredicateBuilder>,
+    predicate_builder: &Option<PruningPredicateBuilder>,
     batch_size: usize,
     response_tx: Sender<ArrowResult<RecordBatch>>,
     limit: Option<usize>,
@@ -917,7 +468,7 @@ fn read_files(
         let mut file_reader = SerializedFileReader::new(file)?;
         if let Some(predicate_builder) = predicate_builder {
             let row_group_predicate = predicate_builder
-                .build_row_group_predicate(file_reader.metadata().row_groups());
+                .build_pruning_predicate(file_reader.metadata().row_groups());
             file_reader.filter_row_groups(&row_group_predicate);
         }
         let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
@@ -992,10 +543,13 @@ impl RecordBatchStream for ParquetStream {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use arrow::array::{Int32Array, StringArray};
+    use arrow::datatypes::{DataType, Field};
     use futures::StreamExt;
-    use parquet::basic::Type as PhysicalType;
-    use parquet::schema::types::SchemaDescPtr;
+    use parquet::{
+        basic::Type as PhysicalType,
+        file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
+        schema::types::SchemaDescPtr,
+    };
 
     #[test]
     fn test_split_files() {
@@ -1071,283 +625,12 @@ mod tests {
     }
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
-    }
-
-    #[test]
-    fn build_statistics_array_utf8() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::byte_array(None, Some("10".into()), None, 0, false);
-        let s2 = ParquetStatistics::byte_array(
-            Some("2".into()),
-            Some("20".into()),
-            None,
-            0,
-            false,
-        );
-        let s3 = ParquetStatistics::byte_array(
-            Some("3".into()),
-            Some("30".into()),
-            None,
-            0,
-            false,
-        );
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Utf8);
-        let string_array = statistics_array
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-        let string_vec = string_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(string_vec, vec![None, Some("2"), Some("3")]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Utf8);
-        let string_array = statistics_array
-            .as_any()
-            .downcast_ref::<StringArray>()
-            .unwrap();
-        let string_vec = string_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some("10") value which was actually set
-        // because the min value is None
-        assert_eq!(string_vec, vec![None, Some("20"), Some("30")]);
-    }
-
-    #[test]
-    fn build_statistics_array_empty_stats() {
-        let data_type = DataType::Int32;
-        let statistics = vec![];
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &data_type);
-        assert_eq!(statistics_array.len(), 0);
-
-        let statistics = vec![None, None];
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &data_type);
-        assert_eq!(statistics_array.len(), statistics.len());
-        assert_eq!(statistics_array.data_type(), &data_type);
-        for i in 0..statistics_array.len() {
-            assert_eq!(statistics_array.is_null(i), true);
-            assert_eq!(statistics_array.is_valid(i), false);
-        }
-    }
-
-    #[test]
-    fn build_statistics_array_unsupported_type() {
-        // boolean is not currently a supported type for statistics
-        let s1 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false);
-        let s2 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2)];
-        let data_type = DataType::Boolean;
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &data_type);
-        assert_eq!(statistics_array.len(), statistics.len());
-        assert_eq!(statistics_array.data_type(), &data_type);
-        for i in 0..statistics_array.len() {
-            assert_eq!(statistics_array.is_null(i), true);
-            assert_eq!(statistics_array.is_valid(i), false);
-        }
-    }
-
-    #[test]
-    fn row_group_predicate_eq() -> Result<()> {
-        use crate::logical_plan::{col, lit};
-        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
-        let expected_expr = "#c1_min LtEq Int32(1) And Int32(1) LtEq #c1_max";
-
-        // test column on the left
-        let expr = col("c1").eq(lit(1));
-        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
-        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
-
-        // test column on the right
-        let expr = lit(1).eq(col("c1"));
-        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
-        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
-
-        Ok(())
-    }
-
-    #[test]
-    fn row_group_predicate_gt() -> Result<()> {
-        use crate::logical_plan::{col, lit};
-        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
-        let expected_expr = "#c1_max Gt Int32(1)";
-
-        // test column on the left
-        let expr = col("c1").gt(lit(1));
-        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
-        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
-
-        // test column on the right
-        let expr = lit(1).lt(col("c1"));
-        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
-        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
-
-        Ok(())
-    }
-
-    #[test]
-    fn row_group_predicate_gt_eq() -> Result<()> {
-        use crate::logical_plan::{col, lit};
-        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
-        let expected_expr = "#c1_max GtEq Int32(1)";
-
-        // test column on the left
-        let expr = col("c1").gt_eq(lit(1));
-        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
-        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
-        // test column on the right
-        let expr = lit(1).lt_eq(col("c1"));
-        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
-        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
-
-        Ok(())
-    }
-
-    #[test]
-    fn row_group_predicate_lt() -> Result<()> {
-        use crate::logical_plan::{col, lit};
-        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
-        let expected_expr = "#c1_min Lt Int32(1)";
-
-        // test column on the left
-        let expr = col("c1").lt(lit(1));
-        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
-        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
-
-        // test column on the right
-        let expr = lit(1).gt(col("c1"));
-        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
-        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
-
-        Ok(())
-    }
-
-    #[test]
-    fn row_group_predicate_lt_eq() -> Result<()> {
-        use crate::logical_plan::{col, lit};
-        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
-        let expected_expr = "#c1_min LtEq Int32(1)";
-
-        // test column on the left
-        let expr = col("c1").lt_eq(lit(1));
-        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
-        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
-        // test column on the right
-        let expr = lit(1).gt_eq(col("c1"));
-        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
-        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
-
-        Ok(())
-    }
-
-    #[test]
-    fn row_group_predicate_and() -> Result<()> {
-        use crate::logical_plan::{col, lit};
-        let schema = Schema::new(vec![
-            Field::new("c1", DataType::Int32, false),
-            Field::new("c2", DataType::Int32, false),
-            Field::new("c3", DataType::Int32, false),
-        ]);
-        // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression
-        let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
-        let expected_expr = "#c1_min Lt Int32(1) And Boolean(true)";
-        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
-        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
-
-        Ok(())
-    }
-
-    #[test]
-    fn row_group_predicate_or() -> Result<()> {
-        use crate::logical_plan::{col, lit};
-        let schema = Schema::new(vec![
-            Field::new("c1", DataType::Int32, false),
-            Field::new("c2", DataType::Int32, false),
-        ]);
-        // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 expression
-        let expr = col("c1").lt(lit(1)).or(col("c2").modulus(lit(2)));
-        let expected_expr = "#c1_min Lt Int32(1) Or Boolean(true)";
-        let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?;
-        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
-
-        Ok(())
-    }
-
-    #[test]
-    fn row_group_predicate_stat_column_req() -> Result<()> {
-        use crate::logical_plan::{col, lit};
-        let schema = Schema::new(vec![
-            Field::new("c1", DataType::Int32, false),
-            Field::new("c2", DataType::Int32, false),
-        ]);
-        let mut stat_column_req = vec![];
-        // c1 < 1 and (c2 = 2 or c2 = 3)
-        let expr = col("c1")
-            .lt(lit(1))
-            .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
-        let expected_expr = "#c1_min Lt Int32(1) And #c2_min LtEq Int32(2) And Int32(2) LtEq #c2_max Or #c2_min LtEq Int32(3) And Int32(3) LtEq #c2_max";
-        let predicate_expr =
-            build_predicate_expression(&expr, &schema, &mut stat_column_req)?;
-        assert_eq!(format!("{:?}", predicate_expr), expected_expr);
-        // c1 < 1 should add c1_min
-        let c1_min_field = Field::new("c1_min", DataType::Int32, false);
-        assert_eq!(
-            stat_column_req[0],
-            ("c1".to_owned(), StatisticsType::Min, c1_min_field)
-        );
-        // c2 = 2 should add c2_min and c2_max
-        let c2_min_field = Field::new("c2_min", DataType::Int32, false);
-        assert_eq!(
-            stat_column_req[1],
-            ("c2".to_owned(), StatisticsType::Min, c2_min_field)
-        );
-        let c2_max_field = Field::new("c2_max", DataType::Int32, false);
-        assert_eq!(
-            stat_column_req[2],
-            ("c2".to_owned(), StatisticsType::Max, c2_max_field)
-        );
-        // c2 = 3 shouldn't add any new statistics fields
-        assert_eq!(stat_column_req.len(), 3);
-
-        Ok(())
-    }
-
-    #[test]
     fn row_group_predicate_builder_simple_expr() -> Result<()> {
         use crate::logical_plan::{col, lit};
         // int > 1 => c1_max > 1
         let expr = col("c1").gt(lit(15));
         let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
-        let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?;
+        let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?;
 
         let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]);
         let rgm1 = get_row_group_meta_data(
@@ -1360,7 +643,7 @@ mod tests {
         );
         let row_group_metadata = vec![rgm1, rgm2];
         let row_group_predicate =
-            predicate_builder.build_row_group_predicate(&row_group_metadata);
+            predicate_builder.build_pruning_predicate(&row_group_metadata);
         let row_group_filter = row_group_metadata
             .iter()
             .enumerate()
@@ -1377,7 +660,7 @@ mod tests {
         // int > 1 => c1_max > 1
         let expr = col("c1").gt(lit(15));
         let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
-        let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?;
+        let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?;
 
         let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]);
         let rgm1 = get_row_group_meta_data(
@@ -1390,7 +673,7 @@ mod tests {
         );
         let row_group_metadata = vec![rgm1, rgm2];
         let row_group_predicate =
-            predicate_builder.build_row_group_predicate(&row_group_metadata);
+            predicate_builder.build_pruning_predicate(&row_group_metadata);
         let row_group_filter = row_group_metadata
             .iter()
             .enumerate()
@@ -1413,7 +696,7 @@ mod tests {
             Field::new("c1", DataType::Int32, false),
             Field::new("c2", DataType::Int32, false),
         ]);
-        let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema.clone())?;
+        let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema.clone())?;
 
         let schema_descr = get_test_schema_descr(vec![
             ("c1", PhysicalType::INT32),
@@ -1435,7 +718,7 @@ mod tests {
         );
         let row_group_metadata = vec![rgm1, rgm2];
         let row_group_predicate =
-            predicate_builder.build_row_group_predicate(&row_group_metadata);
+            predicate_builder.build_pruning_predicate(&row_group_metadata);
         let row_group_filter = row_group_metadata
             .iter()
             .enumerate()
@@ -1448,9 +731,9 @@ mod tests {
         // if conditions in predicate are joined with OR and an unsupported expression is used
         // this bypasses the entire predicate expression and no row groups are filtered out
         let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
-        let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?;
+        let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?;
         let row_group_predicate =
-            predicate_builder.build_row_group_predicate(&row_group_metadata);
+            predicate_builder.build_pruning_predicate(&row_group_metadata);
         let row_group_filter = row_group_metadata
             .iter()
             .enumerate()
@@ -1472,7 +755,7 @@ mod tests {
             Field::new("c1", DataType::Int32, false),
             Field::new("c2", DataType::Boolean, false),
         ]);
-        let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?;
+        let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?;
 
         let schema_descr = get_test_schema_descr(vec![
             ("c1", PhysicalType::INT32),
@@ -1494,7 +777,7 @@ mod tests {
         );
         let row_group_metadata = vec![rgm1, rgm2];
         let row_group_predicate =
-            predicate_builder.build_row_group_predicate(&row_group_metadata);
+            predicate_builder.build_pruning_predicate(&row_group_metadata);
         let row_group_filter = row_group_metadata
             .iter()
             .enumerate()