You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2024/02/14 14:21:09 UTC

(arrow-datafusion) branch main updated: chore(pruning): Support `IS NOT NULL` predicates in `PruningPredicate` (#9208)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new cc139c9790 chore(pruning): Support `IS NOT NULL` predicates in `PruningPredicate` (#9208)
cc139c9790 is described below

commit cc139c9790023463d2240213f2e4f335d9a880dd
Author: Chunchun Ye <14...@users.noreply.github.com>
AuthorDate: Wed Feb 14 09:21:03 2024 -0500

    chore(pruning): Support `IS NOT NULL` predicates in `PruningPredicate` (#9208)
    
    * chore: add test cases for predicate is_null and is_not_null
    
    * feat(pruning): support predicate build for is_not_null expression
    
    * doc: add example in doc for `IS NOT NULL`
    
    * chore: remove edit on cargo file
    
    * chore: add `IS NOT NULL` test for row group pruning
    
    chore: remove Debug derive
    
    * chore: update comment null --> NULL
    
    Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    
    * chore: update comment
    
    Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    
    ---------
    
    Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
---
 .../datasource/physical_plan/parquet/row_groups.rs | 49 +++++++++++++++--
 datafusion/core/src/physical_optimizer/pruning.rs  | 63 ++++++++++++++++++++++
 2 files changed, 107 insertions(+), 5 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index fa9523a763..c876694db1 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -620,13 +620,20 @@ mod tests {
                 ParquetStatistics::boolean(Some(false), Some(true), None, 1, false),
             ],
         );
-        vec![rgm1, rgm2]
+        let rgm3 = get_row_group_meta_data(
+            &schema_descr,
+            vec![
+                ParquetStatistics::int32(Some(17), Some(30), None, 1, false),
+                ParquetStatistics::boolean(Some(false), Some(true), None, 0, false),
+            ],
+        );
+        vec![rgm1, rgm2, rgm3]
     }
 
     #[test]
     fn row_group_pruning_predicate_null_expr() {
         use datafusion_expr::{col, lit};
-        // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
+        // c1 > 15 and IsNull(c2) => c1_max > 15 and c2_null_count > 0
         let schema = Arc::new(Schema::new(vec![
             Field::new("c1", DataType::Int32, false),
             Field::new("c2", DataType::Boolean, false),
@@ -657,7 +664,7 @@ mod tests {
         use datafusion_expr::{col, lit};
         // test row group predicate with an unknown (Null) expr
         //
-        // int > 1 and bool = NULL => c1_max > 1 and null
+        // c1 > 15 and c2 = NULL => c1_max > 15 and NULL
         let schema = Arc::new(Schema::new(vec![
             Field::new("c1", DataType::Int32, false),
             Field::new("c2", DataType::Boolean, false),
@@ -672,7 +679,7 @@ mod tests {
 
         let metrics = parquet_file_metrics();
         // bool = NULL always evaluates to NULL (and thus will not
-        // pass predicates. Ideally these should both be false
+        // pass predicates. Ideally these should all be false
         assert_eq!(
             prune_row_groups_by_statistics(
                 &schema,
@@ -682,7 +689,39 @@ mod tests {
                 Some(&pruning_predicate),
                 &metrics
             ),
-            vec![1]
+            vec![1, 2]
+        );
+    }
+
+    #[test]
+    fn row_group_pruning_predicate_not_null_expr() {
+        use datafusion_expr::{col, lit};
+        // c1 > 15 and IsNotNull(c2) => c1_max > 15 and c2_null_count = 0
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, false),
+            Field::new("c2", DataType::Boolean, false),
+        ]));
+        let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
+        let expr = col("c1").gt(lit(15)).and(col("c2").is_not_null());
+        let expr = logical2physical(&expr, &schema);
+        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
+        let groups = gen_row_group_meta_data_for_pruning_predicate();
+
+        let metrics = parquet_file_metrics();
+        assert_eq!(
+            prune_row_groups_by_statistics(
+                &schema,
+                &schema_descr,
+                &groups,
+                None,
+                Some(&pruning_predicate),
+                &metrics
+            ),
+            // The first row group was filtered out because c1_max is 10, which is smaller than 15.
+            // The second row group was filtered out because it contains null value on "c2".
+            // The third row group is kept because c1_max is 30, which is greater than 15 AND
+            // it does NOT contain any null value on "c2".
+            vec![2]
         );
     }
 
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs
index 648b1f70c5..e1b52c3837 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -315,6 +315,7 @@ pub trait PruningStatistics {
 /// `x < 5` | `x_max < 5`
 /// `x = 5 AND y = 10` | `x_min <= 5 AND 5 <= x_max AND y_min <= 10 AND 10 <= y_max`
 /// `x IS NULL`  | `x_null_count > 0`
+/// `x IS NOT NULL`  | `x_null_count = 0`
 ///
 /// ## Predicate Evaluation
 /// The PruningPredicate works in two passes
@@ -1120,6 +1121,34 @@ fn build_is_null_column_expr(
     }
 }
 
+/// Given an expression reference to `expr`, if `expr` is a column expression,
+/// returns a pruning expression in terms of IsNotNull that will evaluate to true
+/// if the column does NOT contain null, and false if it may contain null
+fn build_is_not_null_column_expr(
+    expr: &Arc<dyn PhysicalExpr>,
+    schema: &Schema,
+    required_columns: &mut RequiredColumns,
+) -> Option<Arc<dyn PhysicalExpr>> {
+    if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
+        let field = schema.field_with_name(col.name()).ok()?;
+
+        let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
+        required_columns
+            .null_count_column_expr(col, expr, null_count_field)
+            .map(|null_count_column_expr| {
+                // IsNotNull(column) => null_count = 0
+                Arc::new(phys_expr::BinaryExpr::new(
+                    null_count_column_expr,
+                    Operator::Eq,
+                    Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
+                )) as _
+            })
+            .ok()
+    } else {
+        None
+    }
+}
+
 /// The maximum number of entries in an `InList` that might be rewritten into
 /// an OR chain
 const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
@@ -1146,6 +1175,14 @@ fn build_predicate_expression(
         return build_is_null_column_expr(is_null.arg(), schema, required_columns)
             .unwrap_or(unhandled);
     }
+    if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
+        return build_is_not_null_column_expr(
+            is_not_null.arg(),
+            schema,
+            required_columns,
+        )
+        .unwrap_or(unhandled);
+    }
     if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
         return build_single_column_expr(col, schema, required_columns, false)
             .unwrap_or(unhandled);
@@ -2052,6 +2089,32 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn row_group_predicate_is_null() -> Result<()> {
+        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+        let expected_expr = "c1_null_count@0 > 0";
+
+        let expr = col("c1").is_null();
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
+        assert_eq!(predicate_expr.to_string(), expected_expr);
+
+        Ok(())
+    }
+
+    #[test]
+    fn row_group_predicate_is_not_null() -> Result<()> {
+        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+        let expected_expr = "c1_null_count@0 = 0";
+
+        let expr = col("c1").is_not_null();
+        let predicate_expr =
+            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
+        assert_eq!(predicate_expr.to_string(), expected_expr);
+
+        Ok(())
+    }
+
     #[test]
     fn row_group_predicate_required_columns() -> Result<()> {
         let schema = Schema::new(vec![