You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/21 21:38:24 UTC

[GitHub] [arrow-datafusion] NGA-TRAN commented on a change in pull request #380: Support statistics pruning for formats other than parquet

NGA-TRAN commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r637235898



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -15,58 +15,88 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! This module contains code to rule out row groups / partitions /
-//! etc based on statistics prior in order to skip evaluating entire
-//! swaths of rows.
+//! This module contains code to prune "containers" of row groups
+//! based on statistics prior to execution. This can lead to
+//! significant performance improvements by avoiding the need
+//! to evaluate a plan on entire containers (e.g. an entire file)
+//!
+//! For example, it is used to prune (skip) row groups while reading
+//! parquet files if it can be determined from the predicate that
+//! nothing in the row group can match.
 //!
 //! This code is currently specific to Parquet, but soon (TM), via
 //! https://github.com/apache/arrow-datafusion/issues/363 it will
 //! be genericized.
 
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, convert::TryInto, sync::Arc};
 
 use arrow::{
-    array::{
-        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
-        BooleanBufferBuilder,
-    },
-    buffer::MutableBuffer,
-    datatypes::{DataType, Field, Schema},
+    array::{ArrayRef, BooleanArray},
+    datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
 
-use parquet::file::{
-    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
-};
-
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
     logical_plan::{Expr, Operator},
     optimizer::utils,
     physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
+    scalar::ScalarValue,
 };
 
+/// Interface to pass statistics information to [`PruningPredicates`]
+pub trait PruningStatistics {
+    /// return the minimum value for the named column, if known
+    fn min_value(&self, column: &str) -> Option<ScalarValue>;
+
+    /// return the maximum value for the named column, if known
+    fn max_value(&self, column: &str) -> Option<ScalarValue>;
+}
+
+/// Evaluates filter expressions on statistics in order to
+/// prune data containers (e.g. parquet row group)
+///
+/// See [`try_new`] for more information.
 #[derive(Debug, Clone)]
-/// Builder used for generating predicate functions that can be used
-/// to prune data based on statistics (e.g. parquet row group metadata)
-pub struct PruningPredicateBuilder {
-    schema: Schema,
+pub struct PruningPredicate {
+    /// The input schema against which the predicate will be evaluated
+    schema: SchemaRef,
+    /// Actual pruning predicate (rewritten in terms of column min/max statistics)
     predicate_expr: Arc<dyn PhysicalExpr>,
+    /// The statistics required to evaluate this predicate:
+    /// * The column name in the input schema
+    /// * Statstics type (e.g. Min or Ma)

Review comment:
       ```suggestion
       /// * Statstics type (e.g. Min or Max)
   ```

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -85,94 +115,132 @@ impl PruningPredicateBuilder {
         })
     }
 
-    /// Generate a predicate function used to filter based on
-    /// statistics
+    /// For each set of statistics, evalates the pruning predicate
+    /// and returns a `bool` with the following meaning for a
+    /// all rows whose values match the statistics:
     ///
-    /// This function takes a slice of statistics as parameter, so
-    /// that DataFusion's physical expressions can be executed once
-    /// against a single RecordBatch, containing statistics arrays, on
-    /// which the physical predicate expression is executed to
-    /// generate a row group filter array.
+    /// `true`: There MAY be rows that match the predicate
     ///
-    /// The generated filter function is then used in the returned
-    /// closure to filter row groups. NOTE this is parquet specific at the moment
-    pub fn build_pruning_predicate(
-        &self,
-        row_group_metadata: &[RowGroupMetaData],
-    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+    /// `false`: There are no rows that could match the predicate
+    ///
+    /// Note this function takes a slice of statistics as a parameter
+    /// to amortize the cost of the evaluation of the predicate
+    /// against a single record batch.
+    pub fn prune<S: PruningStatistics>(&self, statistics: &[S]) -> Result<Vec<bool>> {
         // build statistics record batch
-        let predicate_result = build_statistics_record_batch(
-            row_group_metadata,
-            &self.schema,
-            &self.stat_column_req,
-        )
-        .and_then(|statistics_batch| {
-            // execute predicate expression
-            self.predicate_expr.evaluate(&statistics_batch)
-        })
-        .and_then(|v| match v {
-            ColumnarValue::Array(array) => Ok(array),
-            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
-                "predicate expression didn't return an array".to_string(),
-            )),
-        });
-
-        let predicate_array = match predicate_result {
-            Ok(array) => array,
-            // row group filter array could not be built
-            // return a closure which will not filter out any row groups
-            _ => return Box::new(|_r, _i| true),
-        };
+        let predicate_array =
+            build_statistics_record_batch(statistics, &self.stat_column_req)
+                .and_then(|statistics_batch| {
+                    // execute predicate expression
+                    self.predicate_expr.evaluate(&statistics_batch)
+                })
+                .and_then(|v| match v {
+                    ColumnarValue::Array(array) => Ok(array),
+                    ColumnarValue::Scalar(_) => Err(DataFusionError::Internal(
+                        "predicate expression didn't return an array".to_string(),
+                    )),
+                })?;
+
+        let predicate_array = predicate_array
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "Expected pruning predicate evaluation to be BooleanArray, \
+                     but was {:?}",
+                    predicate_array
+                ))
+            })?;
+
+        // when the result of the predicate expression for a row group is null / undefined,
+        // e.g. due to missing statistics, this row group can't be filtered out,
+        // so replace with true
+        Ok(predicate_array
+            .into_iter()
+            .map(|x| x.unwrap_or(true))
+            .collect::<Vec<_>>())
+    }
 
-        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
-        match predicate_array {
-            // return row group predicate function
-            Some(array) => {
-                // when the result of the predicate expression for a row group is null / undefined,
-                // e.g. due to missing statistics, this row group can't be filtered out,
-                // so replace with true
-                let predicate_values =
-                    array.iter().map(|x| x.unwrap_or(true)).collect::<Vec<_>>();
-                Box::new(move |_, i| predicate_values[i])
-            }
-            // predicate result is not a BooleanArray
-            // return a closure which will not filter out any row groups
-            _ => Box::new(|_r, _i| true),
-        }
+    /// Return a reference to the input schema
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
     }
 }
 
-/// Build a RecordBatch from a list of statistics (currently parquet
-/// [`RowGroupMetadata`] structs), creating arrays, one for each
-/// statistics column, as requested in the stat_column_req parameter.
-fn build_statistics_record_batch(
-    row_groups: &[RowGroupMetaData],
-    schema: &Schema,
+/// Build a RecordBatch from a list of statistics, creating arrays,
+/// with one row for each PruningStatistics and columns specified in
+/// in the stat_column_req parameter.
+///
+/// For example, if the requested columns are
+/// ```text
+/// ("s1", Min, Field:s1_min)
+/// ("s2", Max, field:s2_maxx)

Review comment:
       ```suggestion
   /// ("s2", Max, field:s2_max)
   ```

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {
+        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
+        let stat_column_req = vec![
+            // min of original column s1, named s1_min
+            (
+                "s1".to_string(),
+                StatisticsType::Min,
+                Field::new("s1_min", DataType::Int32, true),
+            ),
+            // max of original column s2, named s2_max
+            (
+                "s2".to_string(),
+                StatisticsType::Max,
+                Field::new("s2_min", DataType::Int32, true),
+            ),
+            // max of original column s3, named s3_max
+            (
+                "s3".to_string(),
+                StatisticsType::Max,
+                Field::new("s3_max", DataType::Utf8, true),
+            ),
+            // min of original column s3, named s3_min
+            (
+                "s3".to_string(),
+                StatisticsType::Min,
+                Field::new("s3_min", DataType::Utf8, true),
+            ),
+        ];
+
+        // s1: [None, 10]
+        // s2: [2, 20]
+        // s2: ["a", "q"]
+        let stats1 = TestStatistics::new()
+            .with("s1", MinMax::new(None, Some(10i32.into())))
+            .with("s2", MinMax::new(Some(2i32.into()), Some(20i32.into())))
+            .with("s3", MinMax::new(Some("a".into()), Some("q".into())));
+
+        // s1: [None, None]
+        // s2: [None, None]
+        // s2: [None, None]

Review comment:
       ```suggestion
           // s3: [None, None]
   ```

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {
+        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
+        let stat_column_req = vec![
+            // min of original column s1, named s1_min
+            (
+                "s1".to_string(),
+                StatisticsType::Min,
+                Field::new("s1_min", DataType::Int32, true),
+            ),
+            // max of original column s2, named s2_max
+            (
+                "s2".to_string(),
+                StatisticsType::Max,
+                Field::new("s2_min", DataType::Int32, true),

Review comment:
       ```suggestion
                   Field::new("s2_max", DataType::Int32, true),
   ```

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {
+        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
+        let stat_column_req = vec![
+            // min of original column s1, named s1_min
+            (
+                "s1".to_string(),
+                StatisticsType::Min,
+                Field::new("s1_min", DataType::Int32, true),
+            ),
+            // max of original column s2, named s2_max
+            (
+                "s2".to_string(),
+                StatisticsType::Max,
+                Field::new("s2_min", DataType::Int32, true),
+            ),
+            // max of original column s3, named s3_max
+            (
+                "s3".to_string(),
+                StatisticsType::Max,
+                Field::new("s3_max", DataType::Utf8, true),
+            ),
+            // min of original column s3, named s3_min
+            (
+                "s3".to_string(),
+                StatisticsType::Min,
+                Field::new("s3_min", DataType::Utf8, true),
+            ),
+        ];
+
+        // s1: [None, 10]
+        // s2: [2, 20]
+        // s2: ["a", "q"]
+        let stats1 = TestStatistics::new()
+            .with("s1", MinMax::new(None, Some(10i32.into())))
+            .with("s2", MinMax::new(Some(2i32.into()), Some(20i32.into())))
+            .with("s3", MinMax::new(Some("a".into()), Some("q".into())));
+
+        // s1: [None, None]
+        // s2: [None, None]
+        // s2: [None, None]
+        let stats2 = TestStatistics::new()
+            .with("s1", MinMax::new(None, None))
+            .with("s2", MinMax::new(None, None))
+            .with("s3", MinMax::new(None, None));
+
+        // s1: [9, None]
+        // s2: None
+        // s2: [None, "r"]
+        let stats3 = TestStatistics::new()
+            .with("s1", MinMax::new(Some(9i32.into()), None))
+            .with("s3", MinMax::new(None, Some("r".into())));
+
+        // This one returns a statistics value, but the value itself is NULL
+        // s1: [Some(None), None]
+        let stats4 = TestStatistics::new()
+            .with("s1", MinMax::new(Some(ScalarValue::Int32(None)), None));
+
+        let statistics = [stats1, stats2, stats3, stats4];
+        let batch = build_statistics_record_batch(&statistics, &stat_column_req).unwrap();
+        let expected = vec![
+            "+--------+--------+--------+--------+",
+            "| s1_min | s2_min | s3_max | s3_min |",

Review comment:
       With the change above, s2_min will become s2_max to make more send with the 20 value

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -85,94 +115,132 @@ impl PruningPredicateBuilder {
         })
     }
 
-    /// Generate a predicate function used to filter based on
-    /// statistics
+    /// For each set of statistics, evalates the pruning predicate
+    /// and returns a `bool` with the following meaning for a
+    /// all rows whose values match the statistics:
     ///
-    /// This function takes a slice of statistics as parameter, so
-    /// that DataFusion's physical expressions can be executed once
-    /// against a single RecordBatch, containing statistics arrays, on
-    /// which the physical predicate expression is executed to
-    /// generate a row group filter array.
+    /// `true`: There MAY be rows that match the predicate
     ///
-    /// The generated filter function is then used in the returned
-    /// closure to filter row groups. NOTE this is parquet specific at the moment
-    pub fn build_pruning_predicate(
-        &self,
-        row_group_metadata: &[RowGroupMetaData],
-    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+    /// `false`: There are no rows that could match the predicate
+    ///
+    /// Note this function takes a slice of statistics as a parameter
+    /// to amortize the cost of the evaluation of the predicate
+    /// against a single record batch.
+    pub fn prune<S: PruningStatistics>(&self, statistics: &[S]) -> Result<Vec<bool>> {
         // build statistics record batch
-        let predicate_result = build_statistics_record_batch(
-            row_group_metadata,
-            &self.schema,
-            &self.stat_column_req,
-        )
-        .and_then(|statistics_batch| {
-            // execute predicate expression
-            self.predicate_expr.evaluate(&statistics_batch)
-        })
-        .and_then(|v| match v {
-            ColumnarValue::Array(array) => Ok(array),
-            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
-                "predicate expression didn't return an array".to_string(),
-            )),
-        });
-
-        let predicate_array = match predicate_result {
-            Ok(array) => array,
-            // row group filter array could not be built
-            // return a closure which will not filter out any row groups
-            _ => return Box::new(|_r, _i| true),
-        };
+        let predicate_array =
+            build_statistics_record_batch(statistics, &self.stat_column_req)
+                .and_then(|statistics_batch| {
+                    // execute predicate expression
+                    self.predicate_expr.evaluate(&statistics_batch)
+                })
+                .and_then(|v| match v {
+                    ColumnarValue::Array(array) => Ok(array),
+                    ColumnarValue::Scalar(_) => Err(DataFusionError::Internal(
+                        "predicate expression didn't return an array".to_string(),
+                    )),
+                })?;
+
+        let predicate_array = predicate_array
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "Expected pruning predicate evaluation to be BooleanArray, \
+                     but was {:?}",
+                    predicate_array
+                ))
+            })?;
+
+        // when the result of the predicate expression for a row group is null / undefined,
+        // e.g. due to missing statistics, this row group can't be filtered out,
+        // so replace with true
+        Ok(predicate_array
+            .into_iter()
+            .map(|x| x.unwrap_or(true))
+            .collect::<Vec<_>>())
+    }
 
-        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
-        match predicate_array {
-            // return row group predicate function
-            Some(array) => {
-                // when the result of the predicate expression for a row group is null / undefined,
-                // e.g. due to missing statistics, this row group can't be filtered out,
-                // so replace with true
-                let predicate_values =
-                    array.iter().map(|x| x.unwrap_or(true)).collect::<Vec<_>>();
-                Box::new(move |_, i| predicate_values[i])
-            }
-            // predicate result is not a BooleanArray
-            // return a closure which will not filter out any row groups
-            _ => Box::new(|_r, _i| true),
-        }
+    /// Return a reference to the input schema
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
     }
 }
 
-/// Build a RecordBatch from a list of statistics (currently parquet
-/// [`RowGroupMetadata`] structs), creating arrays, one for each
-/// statistics column, as requested in the stat_column_req parameter.
-fn build_statistics_record_batch(
-    row_groups: &[RowGroupMetaData],
-    schema: &Schema,
+/// Build a RecordBatch from a list of statistics, creating arrays,
+/// with one row for each PruningStatistics and columns specified in
+/// in the stat_column_req parameter.
+///
+/// For example, if the requested columns are
+/// ```text
+/// ("s1", Min, Field:s1_min)
+/// ("s2", Max, field:s2_maxx)
+///```
+///
+/// And the input statistics had
+/// ```text
+/// S1(Min: 5, Max: 10)
+/// S2(Min: 99, Max: 1000)
+/// S3(Min: 1, Max: 2)
+/// ```
+///
+/// Then this function would build a record batch with 2 columns and
+/// one row s1_min and s2_maxx as follows (s3 is not requested):
+///
+/// ```text
+/// s1_min | s2_maxx

Review comment:
       ```suggestion
   /// s1_min | s2_max
   ```

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -85,94 +115,132 @@ impl PruningPredicateBuilder {
         })
     }
 
-    /// Generate a predicate function used to filter based on
-    /// statistics
+    /// For each set of statistics, evalates the pruning predicate
+    /// and returns a `bool` with the following meaning for a
+    /// all rows whose values match the statistics:
     ///
-    /// This function takes a slice of statistics as parameter, so
-    /// that DataFusion's physical expressions can be executed once
-    /// against a single RecordBatch, containing statistics arrays, on
-    /// which the physical predicate expression is executed to
-    /// generate a row group filter array.
+    /// `true`: There MAY be rows that match the predicate
     ///
-    /// The generated filter function is then used in the returned
-    /// closure to filter row groups. NOTE this is parquet specific at the moment
-    pub fn build_pruning_predicate(
-        &self,
-        row_group_metadata: &[RowGroupMetaData],
-    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+    /// `false`: There are no rows that could match the predicate
+    ///
+    /// Note this function takes a slice of statistics as a parameter
+    /// to amortize the cost of the evaluation of the predicate
+    /// against a single record batch.
+    pub fn prune<S: PruningStatistics>(&self, statistics: &[S]) -> Result<Vec<bool>> {
         // build statistics record batch
-        let predicate_result = build_statistics_record_batch(
-            row_group_metadata,
-            &self.schema,
-            &self.stat_column_req,
-        )
-        .and_then(|statistics_batch| {
-            // execute predicate expression
-            self.predicate_expr.evaluate(&statistics_batch)
-        })
-        .and_then(|v| match v {
-            ColumnarValue::Array(array) => Ok(array),
-            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
-                "predicate expression didn't return an array".to_string(),
-            )),
-        });
-
-        let predicate_array = match predicate_result {
-            Ok(array) => array,
-            // row group filter array could not be built
-            // return a closure which will not filter out any row groups
-            _ => return Box::new(|_r, _i| true),
-        };
+        let predicate_array =
+            build_statistics_record_batch(statistics, &self.stat_column_req)
+                .and_then(|statistics_batch| {
+                    // execute predicate expression
+                    self.predicate_expr.evaluate(&statistics_batch)
+                })
+                .and_then(|v| match v {
+                    ColumnarValue::Array(array) => Ok(array),
+                    ColumnarValue::Scalar(_) => Err(DataFusionError::Internal(
+                        "predicate expression didn't return an array".to_string(),
+                    )),
+                })?;
+
+        let predicate_array = predicate_array
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "Expected pruning predicate evaluation to be BooleanArray, \
+                     but was {:?}",
+                    predicate_array
+                ))
+            })?;
+
+        // when the result of the predicate expression for a row group is null / undefined,
+        // e.g. due to missing statistics, this row group can't be filtered out,
+        // so replace with true
+        Ok(predicate_array
+            .into_iter()
+            .map(|x| x.unwrap_or(true))
+            .collect::<Vec<_>>())
+    }
 
-        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
-        match predicate_array {
-            // return row group predicate function
-            Some(array) => {
-                // when the result of the predicate expression for a row group is null / undefined,
-                // e.g. due to missing statistics, this row group can't be filtered out,
-                // so replace with true
-                let predicate_values =
-                    array.iter().map(|x| x.unwrap_or(true)).collect::<Vec<_>>();
-                Box::new(move |_, i| predicate_values[i])
-            }
-            // predicate result is not a BooleanArray
-            // return a closure which will not filter out any row groups
-            _ => Box::new(|_r, _i| true),
-        }
+    /// Return a reference to the input schema
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
     }
 }
 
-/// Build a RecordBatch from a list of statistics (currently parquet
-/// [`RowGroupMetadata`] structs), creating arrays, one for each
-/// statistics column, as requested in the stat_column_req parameter.
-fn build_statistics_record_batch(
-    row_groups: &[RowGroupMetaData],
-    schema: &Schema,
+/// Build a RecordBatch from a list of statistics, creating arrays,
+/// with one row for each PruningStatistics and columns specified in
+/// in the stat_column_req parameter.
+///
+/// For example, if the requested columns are
+/// ```text
+/// ("s1", Min, Field:s1_min)
+/// ("s2", Max, field:s2_maxx)
+///```
+///
+/// And the input statistics had
+/// ```text
+/// S1(Min: 5, Max: 10)
+/// S2(Min: 99, Max: 1000)
+/// S3(Min: 1, Max: 2)
+/// ```
+///
+/// Then this function would build a record batch with 2 columns and
+/// one row s1_min and s2_maxx as follows (s3 is not requested):
+///
+/// ```text
+/// s1_min | s2_maxx
+/// -------+--------
+///   5    | 10

Review comment:
       Do you mean s2_max = 1000?

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {
+        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
+        let stat_column_req = vec![
+            // min of original column s1, named s1_min
+            (
+                "s1".to_string(),
+                StatisticsType::Min,
+                Field::new("s1_min", DataType::Int32, true),
+            ),
+            // max of original column s2, named s2_max
+            (
+                "s2".to_string(),
+                StatisticsType::Max,
+                Field::new("s2_min", DataType::Int32, true),
+            ),
+            // max of original column s3, named s3_max
+            (
+                "s3".to_string(),
+                StatisticsType::Max,
+                Field::new("s3_max", DataType::Utf8, true),
+            ),
+            // min of original column s3, named s3_min
+            (
+                "s3".to_string(),
+                StatisticsType::Min,
+                Field::new("s3_min", DataType::Utf8, true),
+            ),
+        ];
+
+        // s1: [None, 10]
+        // s2: [2, 20]
+        // s2: ["a", "q"]
+        let stats1 = TestStatistics::new()
+            .with("s1", MinMax::new(None, Some(10i32.into())))
+            .with("s2", MinMax::new(Some(2i32.into()), Some(20i32.into())))
+            .with("s3", MinMax::new(Some("a".into()), Some("q".into())));
+
+        // s1: [None, None]
+        // s2: [None, None]
+        // s2: [None, None]
+        let stats2 = TestStatistics::new()
+            .with("s1", MinMax::new(None, None))
+            .with("s2", MinMax::new(None, None))
+            .with("s3", MinMax::new(None, None));
+
+        // s1: [9, None]
+        // s2: None
+        // s2: [None, "r"]

Review comment:
       ```suggestion
           // s3: [None, "r"]
   ```

##########
File path: datafusion/src/scalar.rs
##########
@@ -283,6 +283,155 @@ impl ScalarValue {
         self.to_array_of_size(1)
     }
 
+    /// Converts an iterator of references [`ScalarValue`] into an [`ArrayRef`]
+    /// corresponding to those values. For example,
+    ///
+    /// Returns an error if the iterator is empty or if the
+    /// [`ScalarValue`]s are not all the same type
+    ///
+    /// Example
+    /// ```
+    /// use datafusion::scalar::ScalarValue;
+    /// use arrow::array::{ArrayRef, BooleanArray};
+    ///
+    /// let scalars = vec![
+    ///   ScalarValue::Boolean(Some(true)),
+    ///   ScalarValue::Boolean(None),
+    ///   ScalarValue::Boolean(Some(false)),
+    /// ];
+    ///
+    /// // Build an Array from the list of ScalarValues
+    /// let array = ScalarValue::iter_to_array(scalars.iter())
+    ///   .unwrap();
+    ///
+    /// let expected: ArrayRef = std::sync::Arc::new(
+    ///   BooleanArray::from(vec![
+    ///     Some(true),
+    ///     None,
+    ///     Some(false)
+    ///   ]
+    /// ));
+    ///
+    /// assert_eq!(&array, &expected);
+    /// ```
+    pub fn iter_to_array<'a>(
+        scalars: impl IntoIterator<Item = &'a ScalarValue>,
+    ) -> Result<ArrayRef> {
+        let mut scalars = scalars.into_iter().peekable();
+
+        // figure out the type based on the first element
+        let data_type = match scalars.peek() {
+            None => {
+                return Err(DataFusionError::Internal(
+                    "empty iterator passed to ScalarValue::iter_to_array".to_string(),
+                ))
+            }
+            Some(sv) => sv.get_datatype(),
+        };
+
+        /// Creates an array of $ARRAY_TY by unpacking values of
+        /// SCALAR_TY for primitive types
+        macro_rules! build_array_primative {

Review comment:
       Do you mean primitive?

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -748,4 +821,40 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn prune_api() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("s1", DataType::Utf8, false),
+            Field::new("s2", DataType::Int32, false),
+        ]));
+
+        // Prune using s2 > 5
+        let expr = col("s2").gt(lit(5));
+
+        // s2 [0, 5] ==> no rows should pass
+        let stats1 = TestStatistics::new()
+            .with("s1", MinMax::new(None, None))
+            .with("s2", MinMax::new(Some(0i32.into()), Some(5i32.into())));
+
+        // s2 [4, 6] ==> some rows could pass
+        let stats2 = TestStatistics::new()
+            .with("s1", MinMax::new(None, None))
+            .with("s2", MinMax::new(Some(4i32.into()), Some(6i32.into())));
+
+        // No stats for s2 ==> some rows could pass
+        let stats3 = TestStatistics::new();
+
+        // s2 [3, None] (null max) ==> some rows could pass
+        let stats4 = TestStatistics::new().with(
+            "s2",
+            MinMax::new(Some(3i32.into()), Some(ScalarValue::Int32(None))),
+        );
+
+        let p = PruningPredicate::try_new(&expr, schema).unwrap();
+        let result = p.prune(&[stats1, stats2, stats3, stats4]).unwrap();
+        let expected = vec![false, true, true, true];

Review comment:
       Nice

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {
+        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
+        let stat_column_req = vec![
+            // min of original column s1, named s1_min
+            (
+                "s1".to_string(),
+                StatisticsType::Min,
+                Field::new("s1_min", DataType::Int32, true),
+            ),
+            // max of original column s2, named s2_max
+            (
+                "s2".to_string(),
+                StatisticsType::Max,
+                Field::new("s2_min", DataType::Int32, true),
+            ),
+            // max of original column s3, named s3_max
+            (
+                "s3".to_string(),
+                StatisticsType::Max,
+                Field::new("s3_max", DataType::Utf8, true),
+            ),
+            // min of original column s3, named s3_min
+            (
+                "s3".to_string(),
+                StatisticsType::Min,
+                Field::new("s3_min", DataType::Utf8, true),
+            ),
+        ];
+
+        // s1: [None, 10]
+        // s2: [2, 20]
+        // s2: ["a", "q"]

Review comment:
       ```suggestion
           // s3: ["a", "q"]
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org