You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/21 17:08:56 UTC

[GitHub] [arrow-datafusion] alamb opened a new pull request #380: Support statistics pruning for formats other than parquet

alamb opened a new pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380


   # Rationale
   As explained on #363 the high level goal is to make the parquet row group pruning logic generic to any types of min/max statistics (not just parquet metadata)
   
   # Changes:
   1. Introduce a new `PruningStatistics` trait
   2. Refactor `PruningPredicateBuilder` to be generic in terms of `PruningStatistics`
   3. Add documentation and tests
   
   
   # Notes:
   I am leaving this PR in draft state in case anyone is interested, as I
   1.  merge its dependencies(see below) in
   2. work on a POC demonstrating the use in IOx
   
   # Sequence:
   
   I am trying to do this in a few small PRs to reduce review burden; Here is how connect together:
   
   Planned changes:
   - [x] Refactor code into a new module (https://github.com/apache/arrow-datafusion/pull/365)
   - [x] Return bool rather than parquet specific output (https://github.com/apache/arrow-datafusion/pull/370)
   - [ ] Add `ScalarValue::iter_to_array` (TBD PR)
   - [ ] Add `PruningStatstics` Trait (this PR)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] codecov-commenter commented on pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#issuecomment-846124509


   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/380?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#380](https://codecov.io/gh/apache/arrow-datafusion/pull/380?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (159f1da) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/db4f098d38993b96ce1134c4bc7bf5c6579509cf?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (db4f098) will **increase** coverage by `0.04%`.
   > The diff coverage is `83.62%`.
   
   > :exclamation: Current head 159f1da differs from pull request most recent head 896d261. Consider uploading reports for the commit 896d261 to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-datafusion/pull/380/graphs/tree.svg?width=650&height=150&src=pr&token=JXwWBKD3D9&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-datafusion/pull/380?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #380      +/-   ##
   ==========================================
   + Coverage   74.94%   74.98%   +0.04%     
   ==========================================
     Files         146      146              
     Lines       24314    24448     +134     
   ==========================================
   + Hits        18221    18332     +111     
   - Misses       6093     6116      +23     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/380?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [datafusion/src/scalar.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/380/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvc2NhbGFyLnJz) | `60.61% <74.33%> (+5.13%)` | :arrow_up: |
   | [datafusion/src/physical\_plan/parquet.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/380/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9wYXJxdWV0LnJz) | `81.96% <86.66%> (+0.69%)` | :arrow_up: |
   | [datafusion/src/physical\_optimizer/pruning.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/380/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfb3B0aW1pemVyL3BydW5pbmcucnM=) | `90.61% <90.29%> (ø)` | |
   | [datafusion/src/physical\_optimizer/repartition.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/380/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfb3B0aW1pemVyL3JlcGFydGl0aW9uLnJz) | `96.92% <100.00%> (+0.09%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/380?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/380?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [db4f098...896d261](https://codecov.io/gh/apache/arrow-datafusion/pull/380?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r638143765



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -28,50 +28,75 @@
 //! https://github.com/apache/arrow-datafusion/issues/363 it will
 //! be genericized.
 
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, convert::TryInto, sync::Arc};
 
 use arrow::{
-    array::{
-        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
-        BooleanBufferBuilder,
-    },
-    buffer::MutableBuffer,
-    datatypes::{DataType, Field, Schema},
+    array::{ArrayRef, BooleanArray},
+    datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
 
-use parquet::file::{
-    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
-};
-
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
     logical_plan::{Expr, Operator},
     optimizer::utils,
     physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
+    scalar::ScalarValue,
 };
 
+/// Interface to pass statistics information to [`PruningPredicates`]
+pub trait PruningStatistics {
+    /// return the minimum value for the named column, if known
+    fn min_value(&self, column: &str) -> Option<ScalarValue>;

Review comment:
       In the longer run I would say we should start pushing more towards typed contiguous arrays (`Array`s or `Vec`), indeed generic. For example, here the min and max values per group could be stored in two arrays of corresponding types which would be faster and uses less memory.
   
   Vectorized processing is what we try to use Arrow for already, so I would say it is good to try to use it in more places where it makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r638136080



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -28,50 +28,75 @@
 //! https://github.com/apache/arrow-datafusion/issues/363 it will
 //! be genericized.
 
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, convert::TryInto, sync::Arc};
 
 use arrow::{
-    array::{
-        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
-        BooleanBufferBuilder,
-    },
-    buffer::MutableBuffer,
-    datatypes::{DataType, Field, Schema},
+    array::{ArrayRef, BooleanArray},
+    datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
 
-use parquet::file::{
-    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
-};
-
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
     logical_plan::{Expr, Operator},
     optimizer::utils,
     physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
+    scalar::ScalarValue,
 };
 
+/// Interface to pass statistics information to [`PruningPredicates`]
+pub trait PruningStatistics {
+    /// return the minimum value for the named column, if known
+    fn min_value(&self, column: &str) -> Option<ScalarValue>;

Review comment:
       One concern I have (in general with current state of DataFusion) is that we use `ScalarValue` a lot in code which can detrimental to performance in some cases (compared to a typed array).
   Also in this case, if we would have a large dataset with 1000s of statistics values, calculating statistics might be slower than it could be when it is stored in contiguous memory.
   Just a thought - not something we should block this PR for.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb edited a comment on pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb edited a comment on pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#issuecomment-847165154


   @Dandandan  I think this PR is now ready for review. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r637954100



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {
+        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
+        let stat_column_req = vec![
+            // min of original column s1, named s1_min
+            (
+                "s1".to_string(),
+                StatisticsType::Min,
+                Field::new("s1_min", DataType::Int32, true),
+            ),
+            // max of original column s2, named s2_max
+            (
+                "s2".to_string(),
+                StatisticsType::Max,
+                Field::new("s2_min", DataType::Int32, true),
+            ),
+            // max of original column s3, named s3_max
+            (
+                "s3".to_string(),
+                StatisticsType::Max,
+                Field::new("s3_max", DataType::Utf8, true),
+            ),
+            // min of original column s3, named s3_min
+            (
+                "s3".to_string(),
+                StatisticsType::Min,
+                Field::new("s3_min", DataType::Utf8, true),
+            ),
+        ];
+
+        // s1: [None, 10]
+        // s2: [2, 20]
+        // s2: ["a", "q"]
+        let stats1 = TestStatistics::new()
+            .with("s1", MinMax::new(None, Some(10i32.into())))
+            .with("s2", MinMax::new(Some(2i32.into()), Some(20i32.into())))
+            .with("s3", MinMax::new(Some("a".into()), Some("q".into())));
+
+        // s1: [None, None]
+        // s2: [None, None]
+        // s2: [None, None]
+        let stats2 = TestStatistics::new()
+            .with("s1", MinMax::new(None, None))
+            .with("s2", MinMax::new(None, None))
+            .with("s3", MinMax::new(None, None));
+
+        // s1: [9, None]
+        // s2: None
+        // s2: [None, "r"]
+        let stats3 = TestStatistics::new()
+            .with("s1", MinMax::new(Some(9i32.into()), None))
+            .with("s3", MinMax::new(None, Some("r".into())));
+
+        // This one returns a statistics value, but the value itself is NULL
+        // s1: [Some(None), None]
+        let stats4 = TestStatistics::new()
+            .with("s1", MinMax::new(Some(ScalarValue::Int32(None)), None));
+
+        let statistics = [stats1, stats2, stats3, stats4];
+        let batch = build_statistics_record_batch(&statistics, &stat_column_req).unwrap();
+        let expected = vec![
+            "+--------+--------+--------+--------+",
+            "| s1_min | s2_min | s3_max | s3_min |",

Review comment:
       I'll update it 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] jorgecarleitao commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r638145200



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -28,50 +28,75 @@
 //! https://github.com/apache/arrow-datafusion/issues/363 it will
 //! be genericized.
 
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, convert::TryInto, sync::Arc};
 
 use arrow::{
-    array::{
-        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
-        BooleanBufferBuilder,
-    },
-    buffer::MutableBuffer,
-    datatypes::{DataType, Field, Schema},
+    array::{ArrayRef, BooleanArray},
+    datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
 
-use parquet::file::{
-    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
-};
-
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
     logical_plan::{Expr, Operator},
     optimizer::utils,
     physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
+    scalar::ScalarValue,
 };
 
+/// Interface to pass statistics information to [`PruningPredicates`]
+pub trait PruningStatistics {
+    /// return the minimum value for the named column, if known
+    fn min_value(&self, column: &str) -> Option<ScalarValue>;

Review comment:
       I very much agree with you, @Dandandan . I have a PR on arrow2 exactly on this on arrow2. https://github.com/jorgecarleitao/arrow2/pull/56
   
   I am waiting for the experimental repos to be available so that we can discuss it further in apache/*




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#issuecomment-846274766


   > I think this is really cool, I think it would be also great to have this for in-memory tables.
   
   I agree -- I think all that is needed is to calculate the min/max statistics for each partition (or maybe even record batch) though we might have to be careful not to slow down queries where it wouldn't help. Maybe it could be opt in. Or perhaps we could compute the statistics "on demand" (after we have created a PruningPredicate)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb closed pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb closed pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#issuecomment-849096443


   https://github.com/apache/arrow-datafusion/pull/426 appears to be the more populate option; Closing in favor of that one


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan edited a comment on pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#issuecomment-846263403


   I think this is really cool, I think it would be also great to have this for in-memory tables.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r637953898



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {
+        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
+        let stat_column_req = vec![
+            // min of original column s1, named s1_min
+            (
+                "s1".to_string(),
+                StatisticsType::Min,
+                Field::new("s1_min", DataType::Int32, true),
+            ),
+            // max of original column s2, named s2_max
+            (
+                "s2".to_string(),
+                StatisticsType::Max,
+                Field::new("s2_min", DataType::Int32, true),
+            ),
+            // max of original column s3, named s3_max
+            (
+                "s3".to_string(),
+                StatisticsType::Max,
+                Field::new("s3_max", DataType::Utf8, true),
+            ),
+            // min of original column s3, named s3_min
+            (
+                "s3".to_string(),
+                StatisticsType::Min,
+                Field::new("s3_min", DataType::Utf8, true),
+            ),
+        ];
+
+        // s1: [None, 10]
+        // s2: [2, 20]
+        // s2: ["a", "q"]
+        let stats1 = TestStatistics::new()
+            .with("s1", MinMax::new(None, Some(10i32.into())))
+            .with("s2", MinMax::new(Some(2i32.into()), Some(20i32.into())))
+            .with("s3", MinMax::new(Some("a".into()), Some("q".into())));
+
+        // s1: [None, None]
+        // s2: [None, None]
+        // s2: [None, None]
+        let stats2 = TestStatistics::new()
+            .with("s1", MinMax::new(None, None))
+            .with("s2", MinMax::new(None, None))
+            .with("s3", MinMax::new(None, None));
+
+        // s1: [9, None]
+        // s2: None
+        // s2: [None, "r"]

Review comment:
       wow -- thank you @NGA-TRAN  for the careful review. 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#issuecomment-847165154


   @Dandandan  I think this PR is now ready for revew. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#issuecomment-846263403


   I think this is really cool, IMO it would be also great to have this for the in-memory table.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan edited a comment on pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#issuecomment-846263403


   I think this is really cool, IMO it would be also great to have this for the in-memory tables.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] NGA-TRAN commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
NGA-TRAN commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r637235898



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -15,58 +15,88 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! This module contains code to rule out row groups / partitions /
-//! etc based on statistics prior in order to skip evaluating entire
-//! swaths of rows.
+//! This module contains code to prune "containers" of row groups
+//! based on statistics prior to execution. This can lead to
+//! significant performance improvements by avoiding the need
+//! to evaluate a plan on entire containers (e.g. an entire file)
+//!
+//! For example, it is used to prune (skip) row groups while reading
+//! parquet files if it can be determined from the predicate that
+//! nothing in the row group can match.
 //!
 //! This code is currently specific to Parquet, but soon (TM), via
 //! https://github.com/apache/arrow-datafusion/issues/363 it will
 //! be genericized.
 
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, convert::TryInto, sync::Arc};
 
 use arrow::{
-    array::{
-        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
-        BooleanBufferBuilder,
-    },
-    buffer::MutableBuffer,
-    datatypes::{DataType, Field, Schema},
+    array::{ArrayRef, BooleanArray},
+    datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
 
-use parquet::file::{
-    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
-};
-
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
     logical_plan::{Expr, Operator},
     optimizer::utils,
     physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
+    scalar::ScalarValue,
 };
 
+/// Interface to pass statistics information to [`PruningPredicates`]
+pub trait PruningStatistics {
+    /// return the minimum value for the named column, if known
+    fn min_value(&self, column: &str) -> Option<ScalarValue>;
+
+    /// return the maximum value for the named column, if known
+    fn max_value(&self, column: &str) -> Option<ScalarValue>;
+}
+
+/// Evaluates filter expressions on statistics in order to
+/// prune data containers (e.g. parquet row group)
+///
+/// See [`try_new`] for more information.
 #[derive(Debug, Clone)]
-/// Builder used for generating predicate functions that can be used
-/// to prune data based on statistics (e.g. parquet row group metadata)
-pub struct PruningPredicateBuilder {
-    schema: Schema,
+pub struct PruningPredicate {
+    /// The input schema against which the predicate will be evaluated
+    schema: SchemaRef,
+    /// Actual pruning predicate (rewritten in terms of column min/max statistics)
     predicate_expr: Arc<dyn PhysicalExpr>,
+    /// The statistics required to evaluate this predicate:
+    /// * The column name in the input schema
+    /// * Statstics type (e.g. Min or Ma)

Review comment:
       ```suggestion
       /// * Statstics type (e.g. Min or Max)
   ```

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -85,94 +115,132 @@ impl PruningPredicateBuilder {
         })
     }
 
-    /// Generate a predicate function used to filter based on
-    /// statistics
+    /// For each set of statistics, evalates the pruning predicate
+    /// and returns a `bool` with the following meaning for a
+    /// all rows whose values match the statistics:
     ///
-    /// This function takes a slice of statistics as parameter, so
-    /// that DataFusion's physical expressions can be executed once
-    /// against a single RecordBatch, containing statistics arrays, on
-    /// which the physical predicate expression is executed to
-    /// generate a row group filter array.
+    /// `true`: There MAY be rows that match the predicate
     ///
-    /// The generated filter function is then used in the returned
-    /// closure to filter row groups. NOTE this is parquet specific at the moment
-    pub fn build_pruning_predicate(
-        &self,
-        row_group_metadata: &[RowGroupMetaData],
-    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+    /// `false`: There are no rows that could match the predicate
+    ///
+    /// Note this function takes a slice of statistics as a parameter
+    /// to amortize the cost of the evaluation of the predicate
+    /// against a single record batch.
+    pub fn prune<S: PruningStatistics>(&self, statistics: &[S]) -> Result<Vec<bool>> {
         // build statistics record batch
-        let predicate_result = build_statistics_record_batch(
-            row_group_metadata,
-            &self.schema,
-            &self.stat_column_req,
-        )
-        .and_then(|statistics_batch| {
-            // execute predicate expression
-            self.predicate_expr.evaluate(&statistics_batch)
-        })
-        .and_then(|v| match v {
-            ColumnarValue::Array(array) => Ok(array),
-            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
-                "predicate expression didn't return an array".to_string(),
-            )),
-        });
-
-        let predicate_array = match predicate_result {
-            Ok(array) => array,
-            // row group filter array could not be built
-            // return a closure which will not filter out any row groups
-            _ => return Box::new(|_r, _i| true),
-        };
+        let predicate_array =
+            build_statistics_record_batch(statistics, &self.stat_column_req)
+                .and_then(|statistics_batch| {
+                    // execute predicate expression
+                    self.predicate_expr.evaluate(&statistics_batch)
+                })
+                .and_then(|v| match v {
+                    ColumnarValue::Array(array) => Ok(array),
+                    ColumnarValue::Scalar(_) => Err(DataFusionError::Internal(
+                        "predicate expression didn't return an array".to_string(),
+                    )),
+                })?;
+
+        let predicate_array = predicate_array
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "Expected pruning predicate evaluation to be BooleanArray, \
+                     but was {:?}",
+                    predicate_array
+                ))
+            })?;
+
+        // when the result of the predicate expression for a row group is null / undefined,
+        // e.g. due to missing statistics, this row group can't be filtered out,
+        // so replace with true
+        Ok(predicate_array
+            .into_iter()
+            .map(|x| x.unwrap_or(true))
+            .collect::<Vec<_>>())
+    }
 
-        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
-        match predicate_array {
-            // return row group predicate function
-            Some(array) => {
-                // when the result of the predicate expression for a row group is null / undefined,
-                // e.g. due to missing statistics, this row group can't be filtered out,
-                // so replace with true
-                let predicate_values =
-                    array.iter().map(|x| x.unwrap_or(true)).collect::<Vec<_>>();
-                Box::new(move |_, i| predicate_values[i])
-            }
-            // predicate result is not a BooleanArray
-            // return a closure which will not filter out any row groups
-            _ => Box::new(|_r, _i| true),
-        }
+    /// Return a reference to the input schema
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
     }
 }
 
-/// Build a RecordBatch from a list of statistics (currently parquet
-/// [`RowGroupMetadata`] structs), creating arrays, one for each
-/// statistics column, as requested in the stat_column_req parameter.
-fn build_statistics_record_batch(
-    row_groups: &[RowGroupMetaData],
-    schema: &Schema,
+/// Build a RecordBatch from a list of statistics, creating arrays,
+/// with one row for each PruningStatistics and columns specified in
+/// in the stat_column_req parameter.
+///
+/// For example, if the requested columns are
+/// ```text
+/// ("s1", Min, Field:s1_min)
+/// ("s2", Max, field:s2_maxx)

Review comment:
       ```suggestion
   /// ("s2", Max, field:s2_max)
   ```

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {
+        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
+        let stat_column_req = vec![
+            // min of original column s1, named s1_min
+            (
+                "s1".to_string(),
+                StatisticsType::Min,
+                Field::new("s1_min", DataType::Int32, true),
+            ),
+            // max of original column s2, named s2_max
+            (
+                "s2".to_string(),
+                StatisticsType::Max,
+                Field::new("s2_min", DataType::Int32, true),
+            ),
+            // max of original column s3, named s3_max
+            (
+                "s3".to_string(),
+                StatisticsType::Max,
+                Field::new("s3_max", DataType::Utf8, true),
+            ),
+            // min of original column s3, named s3_min
+            (
+                "s3".to_string(),
+                StatisticsType::Min,
+                Field::new("s3_min", DataType::Utf8, true),
+            ),
+        ];
+
+        // s1: [None, 10]
+        // s2: [2, 20]
+        // s2: ["a", "q"]
+        let stats1 = TestStatistics::new()
+            .with("s1", MinMax::new(None, Some(10i32.into())))
+            .with("s2", MinMax::new(Some(2i32.into()), Some(20i32.into())))
+            .with("s3", MinMax::new(Some("a".into()), Some("q".into())));
+
+        // s1: [None, None]
+        // s2: [None, None]
+        // s2: [None, None]

Review comment:
       ```suggestion
           // s3: [None, None]
   ```

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {
+        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
+        let stat_column_req = vec![
+            // min of original column s1, named s1_min
+            (
+                "s1".to_string(),
+                StatisticsType::Min,
+                Field::new("s1_min", DataType::Int32, true),
+            ),
+            // max of original column s2, named s2_max
+            (
+                "s2".to_string(),
+                StatisticsType::Max,
+                Field::new("s2_min", DataType::Int32, true),

Review comment:
       ```suggestion
                   Field::new("s2_max", DataType::Int32, true),
   ```

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {
+        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
+        let stat_column_req = vec![
+            // min of original column s1, named s1_min
+            (
+                "s1".to_string(),
+                StatisticsType::Min,
+                Field::new("s1_min", DataType::Int32, true),
+            ),
+            // max of original column s2, named s2_max
+            (
+                "s2".to_string(),
+                StatisticsType::Max,
+                Field::new("s2_min", DataType::Int32, true),
+            ),
+            // max of original column s3, named s3_max
+            (
+                "s3".to_string(),
+                StatisticsType::Max,
+                Field::new("s3_max", DataType::Utf8, true),
+            ),
+            // min of original column s3, named s3_min
+            (
+                "s3".to_string(),
+                StatisticsType::Min,
+                Field::new("s3_min", DataType::Utf8, true),
+            ),
+        ];
+
+        // s1: [None, 10]
+        // s2: [2, 20]
+        // s2: ["a", "q"]
+        let stats1 = TestStatistics::new()
+            .with("s1", MinMax::new(None, Some(10i32.into())))
+            .with("s2", MinMax::new(Some(2i32.into()), Some(20i32.into())))
+            .with("s3", MinMax::new(Some("a".into()), Some("q".into())));
+
+        // s1: [None, None]
+        // s2: [None, None]
+        // s2: [None, None]
+        let stats2 = TestStatistics::new()
+            .with("s1", MinMax::new(None, None))
+            .with("s2", MinMax::new(None, None))
+            .with("s3", MinMax::new(None, None));
+
+        // s1: [9, None]
+        // s2: None
+        // s2: [None, "r"]
+        let stats3 = TestStatistics::new()
+            .with("s1", MinMax::new(Some(9i32.into()), None))
+            .with("s3", MinMax::new(None, Some("r".into())));
+
+        // This one returns a statistics value, but the value itself is NULL
+        // s1: [Some(None), None]
+        let stats4 = TestStatistics::new()
+            .with("s1", MinMax::new(Some(ScalarValue::Int32(None)), None));
+
+        let statistics = [stats1, stats2, stats3, stats4];
+        let batch = build_statistics_record_batch(&statistics, &stat_column_req).unwrap();
+        let expected = vec![
+            "+--------+--------+--------+--------+",
+            "| s1_min | s2_min | s3_max | s3_min |",

Review comment:
       With the change above, s2_min will become s2_max to make more send with the 20 value

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -85,94 +115,132 @@ impl PruningPredicateBuilder {
         })
     }
 
-    /// Generate a predicate function used to filter based on
-    /// statistics
+    /// For each set of statistics, evalates the pruning predicate
+    /// and returns a `bool` with the following meaning for a
+    /// all rows whose values match the statistics:
     ///
-    /// This function takes a slice of statistics as parameter, so
-    /// that DataFusion's physical expressions can be executed once
-    /// against a single RecordBatch, containing statistics arrays, on
-    /// which the physical predicate expression is executed to
-    /// generate a row group filter array.
+    /// `true`: There MAY be rows that match the predicate
     ///
-    /// The generated filter function is then used in the returned
-    /// closure to filter row groups. NOTE this is parquet specific at the moment
-    pub fn build_pruning_predicate(
-        &self,
-        row_group_metadata: &[RowGroupMetaData],
-    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+    /// `false`: There are no rows that could match the predicate
+    ///
+    /// Note this function takes a slice of statistics as a parameter
+    /// to amortize the cost of the evaluation of the predicate
+    /// against a single record batch.
+    pub fn prune<S: PruningStatistics>(&self, statistics: &[S]) -> Result<Vec<bool>> {
         // build statistics record batch
-        let predicate_result = build_statistics_record_batch(
-            row_group_metadata,
-            &self.schema,
-            &self.stat_column_req,
-        )
-        .and_then(|statistics_batch| {
-            // execute predicate expression
-            self.predicate_expr.evaluate(&statistics_batch)
-        })
-        .and_then(|v| match v {
-            ColumnarValue::Array(array) => Ok(array),
-            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
-                "predicate expression didn't return an array".to_string(),
-            )),
-        });
-
-        let predicate_array = match predicate_result {
-            Ok(array) => array,
-            // row group filter array could not be built
-            // return a closure which will not filter out any row groups
-            _ => return Box::new(|_r, _i| true),
-        };
+        let predicate_array =
+            build_statistics_record_batch(statistics, &self.stat_column_req)
+                .and_then(|statistics_batch| {
+                    // execute predicate expression
+                    self.predicate_expr.evaluate(&statistics_batch)
+                })
+                .and_then(|v| match v {
+                    ColumnarValue::Array(array) => Ok(array),
+                    ColumnarValue::Scalar(_) => Err(DataFusionError::Internal(
+                        "predicate expression didn't return an array".to_string(),
+                    )),
+                })?;
+
+        let predicate_array = predicate_array
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "Expected pruning predicate evaluation to be BooleanArray, \
+                     but was {:?}",
+                    predicate_array
+                ))
+            })?;
+
+        // when the result of the predicate expression for a row group is null / undefined,
+        // e.g. due to missing statistics, this row group can't be filtered out,
+        // so replace with true
+        Ok(predicate_array
+            .into_iter()
+            .map(|x| x.unwrap_or(true))
+            .collect::<Vec<_>>())
+    }
 
-        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
-        match predicate_array {
-            // return row group predicate function
-            Some(array) => {
-                // when the result of the predicate expression for a row group is null / undefined,
-                // e.g. due to missing statistics, this row group can't be filtered out,
-                // so replace with true
-                let predicate_values =
-                    array.iter().map(|x| x.unwrap_or(true)).collect::<Vec<_>>();
-                Box::new(move |_, i| predicate_values[i])
-            }
-            // predicate result is not a BooleanArray
-            // return a closure which will not filter out any row groups
-            _ => Box::new(|_r, _i| true),
-        }
+    /// Return a reference to the input schema
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
     }
 }
 
-/// Build a RecordBatch from a list of statistics (currently parquet
-/// [`RowGroupMetadata`] structs), creating arrays, one for each
-/// statistics column, as requested in the stat_column_req parameter.
-fn build_statistics_record_batch(
-    row_groups: &[RowGroupMetaData],
-    schema: &Schema,
+/// Build a RecordBatch from a list of statistics, creating arrays,
+/// with one row for each PruningStatistics and columns specified in
+/// in the stat_column_req parameter.
+///
+/// For example, if the requested columns are
+/// ```text
+/// ("s1", Min, Field:s1_min)
+/// ("s2", Max, field:s2_maxx)
+///```
+///
+/// And the input statistics had
+/// ```text
+/// S1(Min: 5, Max: 10)
+/// S2(Min: 99, Max: 1000)
+/// S3(Min: 1, Max: 2)
+/// ```
+///
+/// Then this function would build a record batch with 2 columns and
+/// one row s1_min and s2_maxx as follows (s3 is not requested):
+///
+/// ```text
+/// s1_min | s2_maxx

Review comment:
       ```suggestion
   /// s1_min | s2_max
   ```

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -85,94 +115,132 @@ impl PruningPredicateBuilder {
         })
     }
 
-    /// Generate a predicate function used to filter based on
-    /// statistics
+    /// For each set of statistics, evalates the pruning predicate
+    /// and returns a `bool` with the following meaning for a
+    /// all rows whose values match the statistics:
     ///
-    /// This function takes a slice of statistics as parameter, so
-    /// that DataFusion's physical expressions can be executed once
-    /// against a single RecordBatch, containing statistics arrays, on
-    /// which the physical predicate expression is executed to
-    /// generate a row group filter array.
+    /// `true`: There MAY be rows that match the predicate
     ///
-    /// The generated filter function is then used in the returned
-    /// closure to filter row groups. NOTE this is parquet specific at the moment
-    pub fn build_pruning_predicate(
-        &self,
-        row_group_metadata: &[RowGroupMetaData],
-    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+    /// `false`: There are no rows that could match the predicate
+    ///
+    /// Note this function takes a slice of statistics as a parameter
+    /// to amortize the cost of the evaluation of the predicate
+    /// against a single record batch.
+    pub fn prune<S: PruningStatistics>(&self, statistics: &[S]) -> Result<Vec<bool>> {
         // build statistics record batch
-        let predicate_result = build_statistics_record_batch(
-            row_group_metadata,
-            &self.schema,
-            &self.stat_column_req,
-        )
-        .and_then(|statistics_batch| {
-            // execute predicate expression
-            self.predicate_expr.evaluate(&statistics_batch)
-        })
-        .and_then(|v| match v {
-            ColumnarValue::Array(array) => Ok(array),
-            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
-                "predicate expression didn't return an array".to_string(),
-            )),
-        });
-
-        let predicate_array = match predicate_result {
-            Ok(array) => array,
-            // row group filter array could not be built
-            // return a closure which will not filter out any row groups
-            _ => return Box::new(|_r, _i| true),
-        };
+        let predicate_array =
+            build_statistics_record_batch(statistics, &self.stat_column_req)
+                .and_then(|statistics_batch| {
+                    // execute predicate expression
+                    self.predicate_expr.evaluate(&statistics_batch)
+                })
+                .and_then(|v| match v {
+                    ColumnarValue::Array(array) => Ok(array),
+                    ColumnarValue::Scalar(_) => Err(DataFusionError::Internal(
+                        "predicate expression didn't return an array".to_string(),
+                    )),
+                })?;
+
+        let predicate_array = predicate_array
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "Expected pruning predicate evaluation to be BooleanArray, \
+                     but was {:?}",
+                    predicate_array
+                ))
+            })?;
+
+        // when the result of the predicate expression for a row group is null / undefined,
+        // e.g. due to missing statistics, this row group can't be filtered out,
+        // so replace with true
+        Ok(predicate_array
+            .into_iter()
+            .map(|x| x.unwrap_or(true))
+            .collect::<Vec<_>>())
+    }
 
-        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
-        match predicate_array {
-            // return row group predicate function
-            Some(array) => {
-                // when the result of the predicate expression for a row group is null / undefined,
-                // e.g. due to missing statistics, this row group can't be filtered out,
-                // so replace with true
-                let predicate_values =
-                    array.iter().map(|x| x.unwrap_or(true)).collect::<Vec<_>>();
-                Box::new(move |_, i| predicate_values[i])
-            }
-            // predicate result is not a BooleanArray
-            // return a closure which will not filter out any row groups
-            _ => Box::new(|_r, _i| true),
-        }
+    /// Return a reference to the input schema
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
     }
 }
 
-/// Build a RecordBatch from a list of statistics (currently parquet
-/// [`RowGroupMetadata`] structs), creating arrays, one for each
-/// statistics column, as requested in the stat_column_req parameter.
-fn build_statistics_record_batch(
-    row_groups: &[RowGroupMetaData],
-    schema: &Schema,
+/// Build a RecordBatch from a list of statistics, creating arrays,
+/// with one row for each PruningStatistics and columns specified in
+/// in the stat_column_req parameter.
+///
+/// For example, if the requested columns are
+/// ```text
+/// ("s1", Min, Field:s1_min)
+/// ("s2", Max, field:s2_maxx)
+///```
+///
+/// And the input statistics had
+/// ```text
+/// S1(Min: 5, Max: 10)
+/// S2(Min: 99, Max: 1000)
+/// S3(Min: 1, Max: 2)
+/// ```
+///
+/// Then this function would build a record batch with 2 columns and
+/// one row s1_min and s2_maxx as follows (s3 is not requested):
+///
+/// ```text
+/// s1_min | s2_maxx
+/// -------+--------
+///   5    | 10

Review comment:
       Do you mean s2_max = 1000?

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {
+        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
+        let stat_column_req = vec![
+            // min of original column s1, named s1_min
+            (
+                "s1".to_string(),
+                StatisticsType::Min,
+                Field::new("s1_min", DataType::Int32, true),
+            ),
+            // max of original column s2, named s2_max
+            (
+                "s2".to_string(),
+                StatisticsType::Max,
+                Field::new("s2_min", DataType::Int32, true),
+            ),
+            // max of original column s3, named s3_max
+            (
+                "s3".to_string(),
+                StatisticsType::Max,
+                Field::new("s3_max", DataType::Utf8, true),
+            ),
+            // min of original column s3, named s3_min
+            (
+                "s3".to_string(),
+                StatisticsType::Min,
+                Field::new("s3_min", DataType::Utf8, true),
+            ),
+        ];
+
+        // s1: [None, 10]
+        // s2: [2, 20]
+        // s2: ["a", "q"]
+        let stats1 = TestStatistics::new()
+            .with("s1", MinMax::new(None, Some(10i32.into())))
+            .with("s2", MinMax::new(Some(2i32.into()), Some(20i32.into())))
+            .with("s3", MinMax::new(Some("a".into()), Some("q".into())));
+
+        // s1: [None, None]
+        // s2: [None, None]
+        // s2: [None, None]
+        let stats2 = TestStatistics::new()
+            .with("s1", MinMax::new(None, None))
+            .with("s2", MinMax::new(None, None))
+            .with("s3", MinMax::new(None, None));
+
+        // s1: [9, None]
+        // s2: None
+        // s2: [None, "r"]

Review comment:
       ```suggestion
           // s3: [None, "r"]
   ```

##########
File path: datafusion/src/scalar.rs
##########
@@ -283,6 +283,155 @@ impl ScalarValue {
         self.to_array_of_size(1)
     }
 
+    /// Converts an iterator of references [`ScalarValue`] into an [`ArrayRef`]
+    /// corresponding to those values. For example,
+    ///
+    /// Returns an error if the iterator is empty or if the
+    /// [`ScalarValue`]s are not all the same type
+    ///
+    /// Example
+    /// ```
+    /// use datafusion::scalar::ScalarValue;
+    /// use arrow::array::{ArrayRef, BooleanArray};
+    ///
+    /// let scalars = vec![
+    ///   ScalarValue::Boolean(Some(true)),
+    ///   ScalarValue::Boolean(None),
+    ///   ScalarValue::Boolean(Some(false)),
+    /// ];
+    ///
+    /// // Build an Array from the list of ScalarValues
+    /// let array = ScalarValue::iter_to_array(scalars.iter())
+    ///   .unwrap();
+    ///
+    /// let expected: ArrayRef = std::sync::Arc::new(
+    ///   BooleanArray::from(vec![
+    ///     Some(true),
+    ///     None,
+    ///     Some(false)
+    ///   ]
+    /// ));
+    ///
+    /// assert_eq!(&array, &expected);
+    /// ```
+    pub fn iter_to_array<'a>(
+        scalars: impl IntoIterator<Item = &'a ScalarValue>,
+    ) -> Result<ArrayRef> {
+        let mut scalars = scalars.into_iter().peekable();
+
+        // figure out the type based on the first element
+        let data_type = match scalars.peek() {
+            None => {
+                return Err(DataFusionError::Internal(
+                    "empty iterator passed to ScalarValue::iter_to_array".to_string(),
+                ))
+            }
+            Some(sv) => sv.get_datatype(),
+        };
+
+        /// Creates an array of $ARRAY_TY by unpacking values of
+        /// SCALAR_TY for primitive types
+        macro_rules! build_array_primative {

Review comment:
       Do you mean primitive?

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -748,4 +821,40 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn prune_api() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("s1", DataType::Utf8, false),
+            Field::new("s2", DataType::Int32, false),
+        ]));
+
+        // Prune using s2 > 5
+        let expr = col("s2").gt(lit(5));
+
+        // s2 [0, 5] ==> no rows should pass
+        let stats1 = TestStatistics::new()
+            .with("s1", MinMax::new(None, None))
+            .with("s2", MinMax::new(Some(0i32.into()), Some(5i32.into())));
+
+        // s2 [4, 6] ==> some rows could pass
+        let stats2 = TestStatistics::new()
+            .with("s1", MinMax::new(None, None))
+            .with("s2", MinMax::new(Some(4i32.into()), Some(6i32.into())));
+
+        // No stats for s2 ==> some rows could pass
+        let stats3 = TestStatistics::new();
+
+        // s2 [3, None] (null max) ==> some rows could pass
+        let stats4 = TestStatistics::new().with(
+            "s2",
+            MinMax::new(Some(3i32.into()), Some(ScalarValue::Int32(None))),
+        );
+
+        let p = PruningPredicate::try_new(&expr, schema).unwrap();
+        let result = p.prune(&[stats1, stats2, stats3, stats4]).unwrap();
+        let expected = vec![false, true, true, true];

Review comment:
       Nice

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {
+        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
+        let stat_column_req = vec![
+            // min of original column s1, named s1_min
+            (
+                "s1".to_string(),
+                StatisticsType::Min,
+                Field::new("s1_min", DataType::Int32, true),
+            ),
+            // max of original column s2, named s2_max
+            (
+                "s2".to_string(),
+                StatisticsType::Max,
+                Field::new("s2_min", DataType::Int32, true),
+            ),
+            // max of original column s3, named s3_max
+            (
+                "s3".to_string(),
+                StatisticsType::Max,
+                Field::new("s3_max", DataType::Utf8, true),
+            ),
+            // min of original column s3, named s3_min
+            (
+                "s3".to_string(),
+                StatisticsType::Min,
+                Field::new("s3_min", DataType::Utf8, true),
+            ),
+        ];
+
+        // s1: [None, 10]
+        // s2: [2, 20]
+        // s2: ["a", "q"]

Review comment:
       ```suggestion
           // s3: ["a", "q"]
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r638163739



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -28,50 +28,75 @@
 //! https://github.com/apache/arrow-datafusion/issues/363 it will
 //! be genericized.
 
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, convert::TryInto, sync::Arc};
 
 use arrow::{
-    array::{
-        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
-        BooleanBufferBuilder,
-    },
-    buffer::MutableBuffer,
-    datatypes::{DataType, Field, Schema},
+    array::{ArrayRef, BooleanArray},
+    datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
 
-use parquet::file::{
-    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
-};
-
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
     logical_plan::{Expr, Operator},
     optimizer::utils,
     physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
+    scalar::ScalarValue,
 };
 
+/// Interface to pass statistics information to [`PruningPredicates`]
+pub trait PruningStatistics {
+    /// return the minimum value for the named column, if known
+    fn min_value(&self, column: &str) -> Option<ScalarValue>;

Review comment:
       Ok, I can take a shot at trying to do this.
   
   @Dandandan  are you thinking something like:
   
   ```
   pub trait PruningStatistics {
       fn min_values(&self, column: &str) -> Option<ArrayRef>;
       fn max_values(&self, column: &str) -> Option<ArrayRef>;
   } 
   ```
   
   Which would be constrained to return Arrays the same number of rows?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r638163739



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -28,50 +28,75 @@
 //! https://github.com/apache/arrow-datafusion/issues/363 it will
 //! be genericized.
 
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, convert::TryInto, sync::Arc};
 
 use arrow::{
-    array::{
-        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
-        BooleanBufferBuilder,
-    },
-    buffer::MutableBuffer,
-    datatypes::{DataType, Field, Schema},
+    array::{ArrayRef, BooleanArray},
+    datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
 
-use parquet::file::{
-    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
-};
-
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
     logical_plan::{Expr, Operator},
     optimizer::utils,
     physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
+    scalar::ScalarValue,
 };
 
+/// Interface to pass statistics information to [`PruningPredicates`]
+pub trait PruningStatistics {
+    /// return the minimum value for the named column, if known
+    fn min_value(&self, column: &str) -> Option<ScalarValue>;

Review comment:
       Ok, I can take a shot at trying to do this.
   
   @Dandandan  are you thinking something like:
   
   ```rust
   pub trait PruningStatistics {
       fn min_values(&self, column: &str) -> Option<ArrayRef>;
       fn max_values(&self, column: &str) -> Option<ArrayRef>;
   } 
   ```
   
   Which would be constrained to return `Array`s the same number of rows?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r637952920



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -85,94 +115,132 @@ impl PruningPredicateBuilder {
         })
     }
 
-    /// Generate a predicate function used to filter based on
-    /// statistics
+    /// For each set of statistics, evalates the pruning predicate
+    /// and returns a `bool` with the following meaning for a
+    /// all rows whose values match the statistics:
     ///
-    /// This function takes a slice of statistics as parameter, so
-    /// that DataFusion's physical expressions can be executed once
-    /// against a single RecordBatch, containing statistics arrays, on
-    /// which the physical predicate expression is executed to
-    /// generate a row group filter array.
+    /// `true`: There MAY be rows that match the predicate
     ///
-    /// The generated filter function is then used in the returned
-    /// closure to filter row groups. NOTE this is parquet specific at the moment
-    pub fn build_pruning_predicate(
-        &self,
-        row_group_metadata: &[RowGroupMetaData],
-    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+    /// `false`: There are no rows that could match the predicate
+    ///
+    /// Note this function takes a slice of statistics as a parameter
+    /// to amortize the cost of the evaluation of the predicate
+    /// against a single record batch.
+    pub fn prune<S: PruningStatistics>(&self, statistics: &[S]) -> Result<Vec<bool>> {
         // build statistics record batch
-        let predicate_result = build_statistics_record_batch(
-            row_group_metadata,
-            &self.schema,
-            &self.stat_column_req,
-        )
-        .and_then(|statistics_batch| {
-            // execute predicate expression
-            self.predicate_expr.evaluate(&statistics_batch)
-        })
-        .and_then(|v| match v {
-            ColumnarValue::Array(array) => Ok(array),
-            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
-                "predicate expression didn't return an array".to_string(),
-            )),
-        });
-
-        let predicate_array = match predicate_result {
-            Ok(array) => array,
-            // row group filter array could not be built
-            // return a closure which will not filter out any row groups
-            _ => return Box::new(|_r, _i| true),
-        };
+        let predicate_array =
+            build_statistics_record_batch(statistics, &self.stat_column_req)
+                .and_then(|statistics_batch| {
+                    // execute predicate expression
+                    self.predicate_expr.evaluate(&statistics_batch)
+                })
+                .and_then(|v| match v {
+                    ColumnarValue::Array(array) => Ok(array),
+                    ColumnarValue::Scalar(_) => Err(DataFusionError::Internal(
+                        "predicate expression didn't return an array".to_string(),
+                    )),
+                })?;
+
+        let predicate_array = predicate_array
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "Expected pruning predicate evaluation to be BooleanArray, \
+                     but was {:?}",
+                    predicate_array
+                ))
+            })?;
+
+        // when the result of the predicate expression for a row group is null / undefined,
+        // e.g. due to missing statistics, this row group can't be filtered out,
+        // so replace with true
+        Ok(predicate_array
+            .into_iter()
+            .map(|x| x.unwrap_or(true))
+            .collect::<Vec<_>>())
+    }
 
-        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
-        match predicate_array {
-            // return row group predicate function
-            Some(array) => {
-                // when the result of the predicate expression for a row group is null / undefined,
-                // e.g. due to missing statistics, this row group can't be filtered out,
-                // so replace with true
-                let predicate_values =
-                    array.iter().map(|x| x.unwrap_or(true)).collect::<Vec<_>>();
-                Box::new(move |_, i| predicate_values[i])
-            }
-            // predicate result is not a BooleanArray
-            // return a closure which will not filter out any row groups
-            _ => Box::new(|_r, _i| true),
-        }
+    /// Return a reference to the input schema
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
     }
 }
 
-/// Build a RecordBatch from a list of statistics (currently parquet
-/// [`RowGroupMetadata`] structs), creating arrays, one for each
-/// statistics column, as requested in the stat_column_req parameter.
-fn build_statistics_record_batch(
-    row_groups: &[RowGroupMetaData],
-    schema: &Schema,
+/// Build a RecordBatch from a list of statistics, creating arrays,
+/// with one row for each PruningStatistics and columns specified in
+/// in the stat_column_req parameter.
+///
+/// For example, if the requested columns are
+/// ```text
+/// ("s1", Min, Field:s1_min)
+/// ("s2", Max, field:s2_maxx)
+///```
+///
+/// And the input statistics had
+/// ```text
+/// S1(Min: 5, Max: 10)
+/// S2(Min: 99, Max: 1000)
+/// S3(Min: 1, Max: 2)
+/// ```
+///
+/// Then this function would build a record batch with 2 columns and
+/// one row s1_min and s2_maxx as follows (s3 is not requested):
+///
+/// ```text
+/// s1_min | s2_maxx
+/// -------+--------
+///   5    | 10

Review comment:
       oh, yes -- that is a great catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r638139562



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -28,50 +28,75 @@
 //! https://github.com/apache/arrow-datafusion/issues/363 it will
 //! be genericized.
 
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, convert::TryInto, sync::Arc};
 
 use arrow::{
-    array::{
-        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
-        BooleanBufferBuilder,
-    },
-    buffer::MutableBuffer,
-    datatypes::{DataType, Field, Schema},
+    array::{ArrayRef, BooleanArray},
+    datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
 
-use parquet::file::{
-    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
-};
-
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
     logical_plan::{Expr, Operator},
     optimizer::utils,
     physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
+    scalar::ScalarValue,
 };
 
+/// Interface to pass statistics information to [`PruningPredicates`]
+pub trait PruningStatistics {
+    /// return the minimum value for the named column, if known
+    fn min_value(&self, column: &str) -> Option<ScalarValue>;

Review comment:
       what would be a better alternative in the long run? generic?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r637952663



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -85,94 +115,132 @@ impl PruningPredicateBuilder {
         })
     }
 
-    /// Generate a predicate function used to filter based on
-    /// statistics
+    /// For each set of statistics, evalates the pruning predicate
+    /// and returns a `bool` with the following meaning for a
+    /// all rows whose values match the statistics:
     ///
-    /// This function takes a slice of statistics as parameter, so
-    /// that DataFusion's physical expressions can be executed once
-    /// against a single RecordBatch, containing statistics arrays, on
-    /// which the physical predicate expression is executed to
-    /// generate a row group filter array.
+    /// `true`: There MAY be rows that match the predicate
     ///
-    /// The generated filter function is then used in the returned
-    /// closure to filter row groups. NOTE this is parquet specific at the moment
-    pub fn build_pruning_predicate(
-        &self,
-        row_group_metadata: &[RowGroupMetaData],
-    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+    /// `false`: There are no rows that could match the predicate
+    ///
+    /// Note this function takes a slice of statistics as a parameter
+    /// to amortize the cost of the evaluation of the predicate
+    /// against a single record batch.
+    pub fn prune<S: PruningStatistics>(&self, statistics: &[S]) -> Result<Vec<bool>> {
         // build statistics record batch
-        let predicate_result = build_statistics_record_batch(
-            row_group_metadata,
-            &self.schema,
-            &self.stat_column_req,
-        )
-        .and_then(|statistics_batch| {
-            // execute predicate expression
-            self.predicate_expr.evaluate(&statistics_batch)
-        })
-        .and_then(|v| match v {
-            ColumnarValue::Array(array) => Ok(array),
-            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
-                "predicate expression didn't return an array".to_string(),
-            )),
-        });
-
-        let predicate_array = match predicate_result {
-            Ok(array) => array,
-            // row group filter array could not be built
-            // return a closure which will not filter out any row groups
-            _ => return Box::new(|_r, _i| true),
-        };
+        let predicate_array =
+            build_statistics_record_batch(statistics, &self.stat_column_req)
+                .and_then(|statistics_batch| {
+                    // execute predicate expression
+                    self.predicate_expr.evaluate(&statistics_batch)
+                })
+                .and_then(|v| match v {
+                    ColumnarValue::Array(array) => Ok(array),
+                    ColumnarValue::Scalar(_) => Err(DataFusionError::Internal(
+                        "predicate expression didn't return an array".to_string(),
+                    )),
+                })?;
+
+        let predicate_array = predicate_array
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "Expected pruning predicate evaluation to be BooleanArray, \
+                     but was {:?}",
+                    predicate_array
+                ))
+            })?;
+
+        // when the result of the predicate expression for a row group is null / undefined,
+        // e.g. due to missing statistics, this row group can't be filtered out,
+        // so replace with true
+        Ok(predicate_array
+            .into_iter()
+            .map(|x| x.unwrap_or(true))
+            .collect::<Vec<_>>())
+    }
 
-        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
-        match predicate_array {
-            // return row group predicate function
-            Some(array) => {
-                // when the result of the predicate expression for a row group is null / undefined,
-                // e.g. due to missing statistics, this row group can't be filtered out,
-                // so replace with true
-                let predicate_values =
-                    array.iter().map(|x| x.unwrap_or(true)).collect::<Vec<_>>();
-                Box::new(move |_, i| predicate_values[i])
-            }
-            // predicate result is not a BooleanArray
-            // return a closure which will not filter out any row groups
-            _ => Box::new(|_r, _i| true),
-        }
+    /// Return a reference to the input schema
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
     }
 }
 
-/// Build a RecordBatch from a list of statistics (currently parquet
-/// [`RowGroupMetadata`] structs), creating arrays, one for each
-/// statistics column, as requested in the stat_column_req parameter.
-fn build_statistics_record_batch(
-    row_groups: &[RowGroupMetaData],
-    schema: &Schema,
+/// Build a RecordBatch from a list of statistics, creating arrays,
+/// with one row for each PruningStatistics and columns specified in
+/// in the stat_column_req parameter.
+///
+/// For example, if the requested columns are
+/// ```text
+/// ("s1", Min, Field:s1_min)
+/// ("s2", Max, field:s2_maxx)

Review comment:
       Hmm, I was trying to show the fields could be named anything (not just `_min` or `_max` but I think upon reflection this example just confused things). I will change the example to be `_min` and `_max` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r638788450



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -28,50 +28,75 @@
 //! https://github.com/apache/arrow-datafusion/issues/363 it will
 //! be genericized.
 
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, convert::TryInto, sync::Arc};
 
 use arrow::{
-    array::{
-        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
-        BooleanBufferBuilder,
-    },
-    buffer::MutableBuffer,
-    datatypes::{DataType, Field, Schema},
+    array::{ArrayRef, BooleanArray},
+    datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
 
-use parquet::file::{
-    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
-};
-
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
     logical_plan::{Expr, Operator},
     optimizer::utils,
     physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
+    scalar::ScalarValue,
 };
 
+/// Interface to pass statistics information to [`PruningPredicates`]
+pub trait PruningStatistics {
+    /// return the minimum value for the named column, if known
+    fn min_value(&self, column: &str) -> Option<ScalarValue>;

Review comment:
       Here is what the alternate API looks like https://github.com/apache/arrow-datafusion/pull/426
   
   I would say it is now harder to implement the `PruningStatistics` API in the parquet reader, but the implementation in `pruning.rs` is simpler




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r637949984



##########
File path: datafusion/src/scalar.rs
##########
@@ -283,6 +283,155 @@ impl ScalarValue {
         self.to_array_of_size(1)
     }
 
+    /// Converts an iterator of references [`ScalarValue`] into an [`ArrayRef`]
+    /// corresponding to those values. For example,
+    ///
+    /// Returns an error if the iterator is empty or if the
+    /// [`ScalarValue`]s are not all the same type
+    ///
+    /// Example
+    /// ```
+    /// use datafusion::scalar::ScalarValue;
+    /// use arrow::array::{ArrayRef, BooleanArray};
+    ///
+    /// let scalars = vec![
+    ///   ScalarValue::Boolean(Some(true)),
+    ///   ScalarValue::Boolean(None),
+    ///   ScalarValue::Boolean(Some(false)),
+    /// ];
+    ///
+    /// // Build an Array from the list of ScalarValues
+    /// let array = ScalarValue::iter_to_array(scalars.iter())
+    ///   .unwrap();
+    ///
+    /// let expected: ArrayRef = std::sync::Arc::new(
+    ///   BooleanArray::from(vec![
+    ///     Some(true),
+    ///     None,
+    ///     Some(false)
+    ///   ]
+    /// ));
+    ///
+    /// assert_eq!(&array, &expected);
+    /// ```
+    pub fn iter_to_array<'a>(
+        scalars: impl IntoIterator<Item = &'a ScalarValue>,
+    ) -> Result<ArrayRef> {
+        let mut scalars = scalars.into_iter().peekable();
+
+        // figure out the type based on the first element
+        let data_type = match scalars.peek() {
+            None => {
+                return Err(DataFusionError::Internal(
+                    "empty iterator passed to ScalarValue::iter_to_array".to_string(),
+                ))
+            }
+            Some(sv) => sv.get_datatype(),
+        };
+
+        /// Creates an array of $ARRAY_TY by unpacking values of
+        /// SCALAR_TY for primitive types
+        macro_rules! build_array_primative {

Review comment:
       yes -- thank you -- I fixed this in https://github.com/apache/arrow-datafusion/pull/381




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r638136407



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -28,50 +28,75 @@
 //! https://github.com/apache/arrow-datafusion/issues/363 it will
 //! be genericized.
 
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, convert::TryInto, sync::Arc};
 
 use arrow::{
-    array::{
-        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
-        BooleanBufferBuilder,
-    },
-    buffer::MutableBuffer,
-    datatypes::{DataType, Field, Schema},
+    array::{ArrayRef, BooleanArray},
+    datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
 
-use parquet::file::{
-    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
-};
-
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
     logical_plan::{Expr, Operator},
     optimizer::utils,
     physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
+    scalar::ScalarValue,
 };
 
+/// Interface to pass statistics information to [`PruningPredicates`]
+pub trait PruningStatistics {
+    /// return the minimum value for the named column, if known
+    fn min_value(&self, column: &str) -> Option<ScalarValue>;
+
+    /// return the maximum value for the named column, if known
+    fn max_value(&self, column: &str) -> Option<ScalarValue>;
+}
+
+/// Evaluates filter expressions on statistics in order to
+/// prune data containers (e.g. parquet row group)
+///
+/// See [`try_new`] for more information.
 #[derive(Debug, Clone)]
-/// Builder used for generating predicate functions that can be used
-/// to prune data based on statistics (e.g. parquet row group metadata)
-pub struct PruningPredicateBuilder {
-    schema: Schema,
+pub struct PruningPredicate {
+    /// The input schema against which the predicate will be evaluated
+    schema: SchemaRef,
+    /// Actual pruning predicate (rewritten in terms of column min/max statistics)
     predicate_expr: Arc<dyn PhysicalExpr>,
+    /// The statistics required to evaluate this predicate:
+    /// * The column name in the input schema
+    /// * Statstics type (e.g. Min or Max)
+    /// * The field the statistics value should be placed in for
+    ///   pruning predicate evaluation
     stat_column_req: Vec<(String, StatisticsType, Field)>,
 }
 
-impl PruningPredicateBuilder {
-    /// Try to create a new instance of [`PruningPredicateBuilder`]
+impl PruningPredicate {
+    /// Try to create a new instance of [`PruningPredicate`]
+    ///
+    /// This will translate the provided `expr` filter expression into
+    /// a *pruning predicate*.
     ///
-    /// This will translate the filter expression into a statistics predicate expression
+    /// A pruning predicate is one that has been rewritten in terms of
+    /// the min and max values of column references and that evaluates
+    /// to FALSE if the filter predicate would evaluate FALSE *for
+    /// every row* whose values fell within the min / max ranges (aka
+    /// could be pruned).
     ///
-    /// For example,  `(column / 2) = 4` becomes `(column_min / 2) <= 4 && 4 <= (column_max / 2))`
-    pub fn try_new(expr: &Expr, schema: Schema) -> Result<Self> {
+    /// The pruning predicate evaluates to TRUE or NULL
+    /// if the filter predicate *might* evaluate to TRUE for at least
+    /// one row whose vaules fell within the min/max ranges (in other

Review comment:
       ```suggestion
       /// one row whose values fell within the min/max ranges (in other
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#issuecomment-846277480


   > > I think this is really cool, I think it would be also great to have this for in-memory tables.
   > 
   > I agree -- I think all that is needed is to calculate the min/max statistics for each partition (or maybe even record batch) though we might have to be careful not to slow down queries where it wouldn't help. Maybe it could be opt in. Or perhaps we could compute the statistics "on demand" (after we have created a PruningPredicate)
   
   Yes, I think we might want also look into support something like `analyze ...` besides having an option when loading. Together with sorting of data that could become very interesting I think for in-memory analytics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r637077408



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -85,94 +115,132 @@ impl PruningPredicateBuilder {
         })
     }
 
-    /// Generate a predicate function used to filter based on
-    /// statistics
+    /// For each set of statistics, evalates the pruning predicate
+    /// and returns a `bool` with the following meaning for a
+    /// all rows whose values match the statistics:
     ///
-    /// This function takes a slice of statistics as parameter, so
-    /// that DataFusion's physical expressions can be executed once
-    /// against a single RecordBatch, containing statistics arrays, on
-    /// which the physical predicate expression is executed to
-    /// generate a row group filter array.
+    /// `true`: There MAY be rows that match the predicate
     ///
-    /// The generated filter function is then used in the returned
-    /// closure to filter row groups. NOTE this is parquet specific at the moment
-    pub fn build_pruning_predicate(
-        &self,
-        row_group_metadata: &[RowGroupMetaData],
-    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+    /// `false`: There are no rows that could match the predicate
+    ///
+    /// Note this function takes a slice of statistics as a parameter
+    /// to amortize the cost of the evaluation of the predicate
+    /// against a single record batch.
+    pub fn prune<S: PruningStatistics>(&self, statistics: &[S]) -> Result<Vec<bool>> {

Review comment:
       Here is the heart of the change -- rather than building up `Arrays` directly from ParquetMetadata, this PR now builds the arrays up from `ScalarValue`s provided by the `PruningStatistics` trait.
   
   I also tried to improve the comments to make it easier to follow what is going on

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -383,210 +455,218 @@ enum StatisticsType {
     Max,
 }
 
-fn build_statistics_array(
-    statistics: &[Option<&ParquetStatistics>],
-    statistics_type: StatisticsType,
-    data_type: &DataType,
-) -> ArrayRef {
-    let statistics_count = statistics.len();
-    let first_group_stats = statistics.iter().find(|s| s.is_some());
-    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
-        // found first row group with statistics defined
-        statistics
-    } else {
-        // no row group has statistics defined
-        return new_null_array(data_type, statistics_count);
-    };
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::logical_plan::{col, lit};
+    use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
+    use arrow::datatypes::{DataType, TimeUnit};
+
+    #[derive(Debug, Default)]
+    struct MinMax {
+        min: Option<ScalarValue>,
+        max: Option<ScalarValue>,
+    }
 
-    let (data_size, arrow_type) = match first_group_stats {
-        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
-        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
-        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
-        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
-        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
-            (0, DataType::Utf8)
+    impl MinMax {
+        fn new(min: Option<ScalarValue>, max: Option<ScalarValue>) -> Self {
+            Self { min, max }
         }
-        _ => {
-            // type of statistics not supported
-            return new_null_array(data_type, statistics_count);
+    }
+
+    #[derive(Debug, Default)]
+    struct TestStatistics {
+        // key: column name
+        stats: HashMap<String, MinMax>,
+    }
+
+    impl TestStatistics {
+        fn new() -> Self {
+            Self::default()
         }
-    };
 
-    let statistics = statistics.iter().map(|s| {
-        s.filter(|s| s.has_min_max_set())
-            .map(|s| match statistics_type {
-                StatisticsType::Min => s.min_bytes(),
-                StatisticsType::Max => s.max_bytes(),
-            })
-    });
-
-    if arrow_type == DataType::Utf8 {
-        let data_size = statistics
-            .clone()
-            .map(|x| x.map(|b| b.len()).unwrap_or(0))
-            .sum();
-        let mut builder =
-            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
-        let string_statistics =
-            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
-        for maybe_string in string_statistics {
-            match maybe_string {
-                Some(string_value) => builder.append_value(string_value).unwrap(),
-                None => builder.append_null().unwrap(),
-            };
+        fn with(mut self, name: impl Into<String>, min_max: MinMax) -> Self {
+            self.stats.insert(name.into(), min_max);
+            self
         }
-        return Arc::new(builder.finish());
     }
 
-    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
-    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
-    let mut null_count = 0;
-    for s in statistics {
-        if let Some(stat_data) = s {
-            bitmap_builder.append(true);
-            data_buffer.extend_from_slice(stat_data);
-        } else {
-            bitmap_builder.append(false);
-            data_buffer.resize(data_buffer.len() + data_size, 0);
-            null_count += 1;
+    impl PruningStatistics for TestStatistics {
+        fn min_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.min.clone())
+                .unwrap_or(None)
         }
-    }
 
-    let mut builder = ArrayData::builder(arrow_type)
-        .len(statistics_count)
-        .add_buffer(data_buffer.into());
-    if null_count > 0 {
-        builder = builder.null_bit_buffer(bitmap_builder.finish());
-    }
-    let array_data = builder.build();
-    let statistics_array = make_array(array_data);
-    if statistics_array.data_type() == data_type {
-        return statistics_array;
+        fn max_value(&self, column: &str) -> Option<ScalarValue> {
+            self.stats
+                .get(column)
+                .map(|s| s.max.clone())
+                .unwrap_or(None)
+        }
     }
-    // cast statistics array to required data type
-    arrow::compute::cast(&statistics_array, data_type)
-        .unwrap_or_else(|_| new_null_array(data_type, statistics_count))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::physical_optimizer::pruning::StatisticsType;
-    use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::DataType,
-    };
-    use parquet::file::statistics::Statistics as ParquetStatistics;
 
     #[test]
-    fn build_statistics_array_int32() {
-        // build row group metadata array
-        let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false);
-        let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false);
-        let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false);
-        let statistics = vec![Some(&s1), Some(&s2), Some(&s3)];
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        assert_eq!(int32_vec, vec![None, Some(2), Some(3)]);
-
-        let statistics_array =
-            build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32);
-        let int32_array = statistics_array
-            .as_any()
-            .downcast_ref::<Int32Array>()
-            .unwrap();
-        let int32_vec = int32_array.into_iter().collect::<Vec<_>>();
-        // here the first max value is None and not the Some(10) value which was actually set
-        // because the min value is None
-        assert_eq!(int32_vec, vec![None, Some(20), Some(30)]);
+    fn test_build_statistics_record_batch() {

Review comment:
       this shows how creating the statistics record batch works

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -748,4 +821,40 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn prune_api() {

Review comment:
       This shows end-to-end how to use the prune API (what I want to be able to do in IOx)

##########
File path: datafusion/src/scalar.rs
##########
@@ -283,6 +283,155 @@ impl ScalarValue {
         self.to_array_of_size(1)
     }
 
+    /// Converts an iterator of references [`ScalarValue`] into an [`ArrayRef`]

Review comment:
       I couldn't find any other way to take a bunch of `ScalarValues` and turn them back into an Array. Perhaps I missed something




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r638143765



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -28,50 +28,75 @@
 //! https://github.com/apache/arrow-datafusion/issues/363 it will
 //! be genericized.
 
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, convert::TryInto, sync::Arc};
 
 use arrow::{
-    array::{
-        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
-        BooleanBufferBuilder,
-    },
-    buffer::MutableBuffer,
-    datatypes::{DataType, Field, Schema},
+    array::{ArrayRef, BooleanArray},
+    datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
 
-use parquet::file::{
-    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
-};
-
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
     logical_plan::{Expr, Operator},
     optimizer::utils,
     physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
+    scalar::ScalarValue,
 };
 
+/// Interface to pass statistics information to [`PruningPredicates`]
+pub trait PruningStatistics {
+    /// return the minimum value for the named column, if known
+    fn min_value(&self, column: &str) -> Option<ScalarValue>;

Review comment:
       In the longer run I would say we should start pushing more towards typed contiguous arrays (`Array`s or `Vec`), indeed using generics. For example, here the min and max values per group could be stored in two arrays of corresponding types which would be faster and uses less memory.
   
   Vectorized processing is what we try to use Arrow for already, so I would say it is good to try to use it in more places where it makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r638166690



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -28,50 +28,75 @@
 //! https://github.com/apache/arrow-datafusion/issues/363 it will
 //! be genericized.
 
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, convert::TryInto, sync::Arc};
 
 use arrow::{
-    array::{
-        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
-        BooleanBufferBuilder,
-    },
-    buffer::MutableBuffer,
-    datatypes::{DataType, Field, Schema},
+    array::{ArrayRef, BooleanArray},
+    datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
 
-use parquet::file::{
-    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
-};
-
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
     logical_plan::{Expr, Operator},
     optimizer::utils,
     physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
+    scalar::ScalarValue,
 };
 
+/// Interface to pass statistics information to [`PruningPredicates`]
+pub trait PruningStatistics {
+    /// return the minimum value for the named column, if known
+    fn min_value(&self, column: &str) -> Option<ScalarValue>;

Review comment:
       @alamb yes - something along those lines.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #380: Support statistics pruning for formats other than parquet

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r638170719



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -28,50 +28,75 @@
 //! https://github.com/apache/arrow-datafusion/issues/363 it will
 //! be genericized.
 
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, convert::TryInto, sync::Arc};
 
 use arrow::{
-    array::{
-        make_array, new_null_array, ArrayData, ArrayRef, BooleanArray,
-        BooleanBufferBuilder,
-    },
-    buffer::MutableBuffer,
-    datatypes::{DataType, Field, Schema},
+    array::{ArrayRef, BooleanArray},
+    datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
 
-use parquet::file::{
-    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
-};
-
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
     logical_plan::{Expr, Operator},
     optimizer::utils,
     physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
+    scalar::ScalarValue,
 };
 
+/// Interface to pass statistics information to [`PruningPredicates`]
+pub trait PruningStatistics {
+    /// return the minimum value for the named column, if known
+    fn min_value(&self, column: &str) -> Option<ScalarValue>;

Review comment:
       I'll give it a go and see what it looks like




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org