You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/10/12 18:30:07 UTC

[arrow-datafusion] branch master updated: Consolidate expression manipulation functions into `datafusion_optimizer` (#3809)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b0f58dda5 Consolidate expression manipulation functions into `datafusion_optimizer` (#3809)
b0f58dda5 is described below

commit b0f58dda5cd073c54897f58a1f71f289b6942f3c
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Oct 12 14:30:00 2022 -0400

    Consolidate expression manipulation functions into `datafusion_optimizer` (#3809)
    
    * Consolidate expression manipulation functions into optimizer
    
    * cleanup includes
    
    * Fixup some tests
---
 benchmarks/src/bin/parquet_filter_pushdown.rs      |   5 +-
 .../core/src/datasource/file_format/parquet.rs     |   2 +-
 datafusion/core/src/logical_plan/mod.rs            |   9 +-
 .../src/physical_plan/file_format/row_filter.rs    |   3 +-
 datafusion/core/src/physical_plan/planner.rs       |   3 +-
 datafusion/core/tests/simplification.rs            |   4 +-
 datafusion/expr/src/expr_fn.rs                     | 130 ---------------------
 .../optimizer/src/decorrelate_where_exists.rs      |   5 +-
 datafusion/optimizer/src/decorrelate_where_in.rs   |   6 +-
 .../optimizer/src/scalar_subquery_to_join.rs       |   4 +-
 datafusion/optimizer/src/utils.rs                  | 121 ++++++++++++++++++-
 11 files changed, 141 insertions(+), 151 deletions(-)

diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs
index e3b365d4f..86cd79d2a 100644
--- a/benchmarks/src/bin/parquet_filter_pushdown.rs
+++ b/benchmarks/src/bin/parquet_filter_pushdown.rs
@@ -28,13 +28,14 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::execution::context::ExecutionProps;
 use datafusion::logical_expr::{lit, or, Expr};
 use datafusion::logical_plan::ToDFSchema;
+use datafusion::optimizer::utils::combine_filters_disjunctive;
 use datafusion::physical_expr::create_physical_expr;
 use datafusion::physical_plan::collect;
 use datafusion::physical_plan::file_format::{
     FileScanConfig, ParquetExec, ParquetScanOptions,
 };
 use datafusion::physical_plan::filter::FilterExec;
-use datafusion::prelude::{col, combine_filters, SessionConfig, SessionContext};
+use datafusion::prelude::{col, SessionConfig, SessionContext};
 use object_store::path::Path;
 use object_store::ObjectMeta;
 use parquet::arrow::ArrowWriter;
@@ -143,7 +144,7 @@ async fn run_benchmarks(
             col("response_status").eq(lit(403_u16)),
         )),
         // Many filters
-        combine_filters(&[
+        combine_filters_disjunctive(&[
             col("request_method").not_eq(lit("GET")),
             col("response_status").eq(lit(400_u16)),
             // TODO this fails in the FilterExec with Error: Internal("The type of Dictionary(Int32, Utf8) = Utf8 of binary physical should be same")
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index be022a293..bf0488932 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -25,6 +25,7 @@ use arrow::datatypes::SchemaRef;
 use async_trait::async_trait;
 use bytes::{BufMut, BytesMut};
 use datafusion_common::DataFusionError;
+use datafusion_optimizer::utils::combine_filters;
 use hashbrown::HashMap;
 use object_store::{ObjectMeta, ObjectStore};
 use parquet::arrow::parquet_to_arrow_schema;
@@ -40,7 +41,6 @@ use crate::arrow::array::{
 use crate::arrow::datatypes::{DataType, Field};
 use crate::datasource::{create_max_min_accs, get_col_stats};
 use crate::error::Result;
-use crate::logical_plan::combine_filters;
 use crate::logical_plan::Expr;
 use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
 use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs
index e09f39ac1..57da79a74 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -28,9 +28,9 @@ pub use datafusion_common::{
 pub use datafusion_expr::{
     abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
     atan2, avg, bit_length, btrim, call_fn, case, cast, ceil, character_length, chr,
-    coalesce, col, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos,
-    count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest,
-    exists, exp, expr_rewriter,
+    coalesce, col, concat, concat_expr, concat_ws, concat_ws_expr, cos, count,
+    count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exists, exp,
+    expr_rewriter,
     expr_rewriter::{
         normalize_col, normalize_col_with_schemas, normalize_cols, replace_col,
         rewrite_sort_cols_by_aggs, unnormalize_col, unnormalize_cols, ExprRewritable,
@@ -53,6 +53,5 @@ pub use datafusion_expr::{
     reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384, sha512,
     signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
     to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim,
-    trunc, unalias, upper, when, Expr, ExprSchemable, Literal, Operator,
+    trunc, upper, when, Expr, ExprSchemable, Literal, Operator,
 };
-pub use datafusion_optimizer::expr_simplifier::SimplifyInfo;
diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs b/datafusion/core/src/physical_plan/file_format/row_filter.rs
index 473856b7b..574e2ad44 100644
--- a/datafusion/core/src/physical_plan/file_format/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs
@@ -22,7 +22,8 @@ use arrow::record_batch::RecordBatch;
 use datafusion_common::{Column, DataFusionError, Result, ScalarValue, ToDFSchema};
 use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion};
 
-use datafusion_expr::{uncombine_filter, Expr};
+use datafusion_expr::Expr;
+use datafusion_optimizer::utils::uncombine_filter;
 use datafusion_physical_expr::execution_props::ExecutionProps;
 use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
 use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 88be00cd9..2a34dd9fb 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -32,7 +32,7 @@ use crate::logical_expr::{
     Window,
 };
 use crate::logical_plan::{
-    unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan,
+    unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan,
     Partitioning as LogicalPartitioning, PlanType, Repartition, ToStringifiedPlan, Union,
     UserDefinedLogicalNode,
 };
@@ -63,6 +63,7 @@ use datafusion_common::ScalarValue;
 use datafusion_expr::expr::GroupingSet;
 use datafusion_expr::utils::{expand_wildcard, expr_to_columns};
 use datafusion_expr::WindowFrameUnits;
+use datafusion_optimizer::utils::unalias;
 use datafusion_physical_expr::expressions::Literal;
 use datafusion_sql::utils::window_expr_common_partition_keys;
 use futures::future::BoxFuture;
diff --git a/datafusion/core/tests/simplification.rs b/datafusion/core/tests/simplification.rs
index 9cccd1aab..8c98b8045 100644
--- a/datafusion/core/tests/simplification.rs
+++ b/datafusion/core/tests/simplification.rs
@@ -22,10 +22,10 @@ use datafusion::logical_plan::ExprSchemable;
 use datafusion::{
     error::Result,
     execution::context::ExecutionProps,
-    logical_plan::{DFSchema, Expr, SimplifyInfo},
+    logical_plan::{DFSchema, Expr},
     prelude::*,
 };
-use datafusion_optimizer::expr_simplifier::ExprSimplifier;
+use datafusion_optimizer::expr_simplifier::{ExprSimplifier, SimplifyInfo};
 
 /// In order to simplify expressions, DataFusion must have information
 /// about the expressions.
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 9c9ce0445..5ab519dce 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -468,60 +468,6 @@ pub fn when(when: Expr, then: Expr) -> CaseBuilder {
     CaseBuilder::new(None, vec![when], vec![then], None)
 }
 
-/// Combines an array of filter expressions into a single filter expression
-/// consisting of the input filter expressions joined with logical AND.
-/// Returns None if the filters array is empty.
-pub fn combine_filters(filters: &[Expr]) -> Option<Expr> {
-    if filters.is_empty() {
-        return None;
-    }
-    let combined_filter = filters
-        .iter()
-        .skip(1)
-        .fold(filters[0].clone(), |acc, filter| and(acc, filter.clone()));
-    Some(combined_filter)
-}
-
-/// Take combined filter (multiple boolean expressions ANDed together)
-/// and break down into distinct filters. This should be the inverse of
-/// `combine_filters`
-pub fn uncombine_filter(filter: Expr) -> Vec<Expr> {
-    match filter {
-        Expr::BinaryExpr {
-            left,
-            op: Operator::And,
-            right,
-        } => {
-            let mut exprs = uncombine_filter(*left);
-            exprs.extend(uncombine_filter(*right));
-            exprs
-        }
-        expr => {
-            vec![expr]
-        }
-    }
-}
-
-/// Combines an array of filter expressions into a single filter expression
-/// consisting of the input filter expressions joined with logical OR.
-/// Returns None if the filters array is empty.
-pub fn combine_filters_disjunctive(filters: &[Expr]) -> Option<Expr> {
-    if filters.is_empty() {
-        return None;
-    }
-
-    filters.iter().cloned().reduce(or)
-}
-
-/// Recursively un-alias an expressions
-#[inline]
-pub fn unalias(expr: Expr) -> Expr {
-    match expr {
-        Expr::Alias(sub_expr, _) => unalias(*sub_expr),
-        _ => expr,
-    }
-}
-
 /// Creates a new UDF with a specific signature and specific return type.
 /// This is a helper function to create a new UDF.
 /// The function `create_udf` returns a subset of all possible `ScalarFunction`:
@@ -582,7 +528,6 @@ pub fn call_fn(name: impl AsRef<str>, args: Vec<Expr>) -> Result<Expr> {
 #[cfg(test)]
 mod test {
     use super::*;
-    use arrow::datatypes::{Field, Schema};
 
     #[test]
     fn filter_is_null_and_is_not_null() {
@@ -737,79 +682,4 @@ mod test {
             unreachable!();
         }
     }
-
-    #[test]
-    fn combine_zero_filters() {
-        let result = combine_filters(&[]);
-        assert_eq!(result, None);
-    }
-
-    #[test]
-    fn combine_one_filter() {
-        let filter = binary_expr(col("c1"), Operator::Lt, lit(1));
-        let result = combine_filters(&[filter.clone()]);
-        assert_eq!(result, Some(filter));
-    }
-
-    #[test]
-    fn combine_multiple_filters() {
-        let filter1 = binary_expr(col("c1"), Operator::Lt, lit(1));
-        let filter2 = binary_expr(col("c2"), Operator::Lt, lit(2));
-        let filter3 = binary_expr(col("c3"), Operator::Lt, lit(3));
-        let result =
-            combine_filters(&[filter1.clone(), filter2.clone(), filter3.clone()]);
-        assert_eq!(result, Some(and(and(filter1, filter2), filter3)));
-    }
-
-    fn assert_predicates(actual: Vec<Expr>, expected: Vec<Expr>) {
-        assert_eq!(
-            actual.len(),
-            expected.len(),
-            "Predicates are not equal, found {} predicates but expected {}",
-            actual.len(),
-            expected.len()
-        );
-
-        for expr in expected.into_iter() {
-            assert!(
-                actual.contains(&expr),
-                "Predicates are not equal, predicate {:?} not found in {:?}",
-                expr,
-                actual
-            );
-        }
-    }
-
-    #[test]
-    fn test_uncombine_filter() {
-        let _schema = Schema::new(vec![
-            Field::new("a", DataType::Utf8, true),
-            Field::new("b", DataType::Utf8, true),
-            Field::new("c", DataType::Utf8, true),
-        ]);
-
-        let expr = col("a").eq(lit("s"));
-        let actual = uncombine_filter(expr);
-
-        assert_predicates(actual, vec![col("a").eq(lit("s"))]);
-    }
-
-    #[test]
-    fn test_uncombine_filter_recursively() {
-        let _schema = Schema::new(vec![
-            Field::new("a", DataType::Utf8, true),
-            Field::new("b", DataType::Utf8, true),
-            Field::new("c", DataType::Utf8, true),
-        ]);
-
-        let expr = and(col("a"), col("b"));
-        let actual = uncombine_filter(expr);
-
-        assert_predicates(actual, vec![col("a"), col("b")]);
-
-        let expr = col("a").and(col("b")).or(col("c"));
-        let actual = uncombine_filter(expr.clone());
-
-        assert_predicates(actual, vec![expr]);
-    }
 }
diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs
index d6727ad0f..434c57bf1 100644
--- a/datafusion/optimizer/src/decorrelate_where_exists.rs
+++ b/datafusion/optimizer/src/decorrelate_where_exists.rs
@@ -16,12 +16,13 @@
 // under the License.
 
 use crate::utils::{
-    exprs_to_join_cols, find_join_exprs, split_conjunction, verify_not_disjunction,
+    combine_filters, exprs_to_join_cols, find_join_exprs, split_conjunction,
+    verify_not_disjunction,
 };
 use crate::{utils, OptimizerConfig, OptimizerRule};
 use datafusion_common::{context, plan_err, DataFusionError};
 use datafusion_expr::logical_plan::{Filter, JoinType, Subquery};
-use datafusion_expr::{combine_filters, Expr, LogicalPlan, LogicalPlanBuilder};
+use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
 use std::sync::Arc;
 
 /// Optimizer rule for rewriting subquery filters to joins
diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs
index a3443eaee..c7a072dac 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -16,13 +16,13 @@
 // under the License.
 
 use crate::utils::{
-    alias_cols, exprs_to_join_cols, find_join_exprs, merge_cols, only_or_err,
-    split_conjunction, swap_table, verify_not_disjunction,
+    alias_cols, combine_filters, exprs_to_join_cols, find_join_exprs, merge_cols,
+    only_or_err, split_conjunction, swap_table, verify_not_disjunction,
 };
 use crate::{utils, OptimizerConfig, OptimizerRule};
 use datafusion_common::context;
 use datafusion_expr::logical_plan::{Filter, JoinType, Projection, Subquery};
-use datafusion_expr::{combine_filters, Expr, LogicalPlan, LogicalPlanBuilder};
+use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
 use log::debug;
 use std::sync::Arc;
 
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index d14888110..3399e5576 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -16,13 +16,13 @@
 // under the License.
 
 use crate::utils::{
-    exprs_to_join_cols, find_join_exprs, only_or_err, split_conjunction,
+    combine_filters, exprs_to_join_cols, find_join_exprs, only_or_err, split_conjunction,
     verify_not_disjunction,
 };
 use crate::{utils, OptimizerConfig, OptimizerRule};
 use datafusion_common::{context, plan_err, Column, Result};
 use datafusion_expr::logical_plan::{Filter, JoinType, Limit, Subquery};
-use datafusion_expr::{combine_filters, Expr, LogicalPlan, LogicalPlanBuilder, Operator};
+use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Operator};
 use log::debug;
 use std::sync::Arc;
 
diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs
index d9b5cd9a8..23aec3354 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -23,7 +23,7 @@ use datafusion_common::{plan_err, Column, DFSchemaRef};
 use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
 use datafusion_expr::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
 use datafusion_expr::{
-    and, col, combine_filters,
+    and, col,
     logical_plan::{Filter, LogicalPlan},
     utils::from_plan,
     Expr, Operator,
@@ -69,6 +69,60 @@ pub fn split_conjunction<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>
     }
 }
 
+/// Combines an array of filter expressions into a single filter expression
+/// consisting of the input filter expressions joined with logical AND.
+/// Returns None if the filters array is empty.
+pub fn combine_filters(filters: &[Expr]) -> Option<Expr> {
+    if filters.is_empty() {
+        return None;
+    }
+    let combined_filter = filters
+        .iter()
+        .skip(1)
+        .fold(filters[0].clone(), |acc, filter| and(acc, filter.clone()));
+    Some(combined_filter)
+}
+
+/// Take combined filter (multiple boolean expressions ANDed together)
+/// and break down into distinct filters. This should be the inverse of
+/// `combine_filters`
+pub fn uncombine_filter(filter: Expr) -> Vec<Expr> {
+    match filter {
+        Expr::BinaryExpr {
+            left,
+            op: Operator::And,
+            right,
+        } => {
+            let mut exprs = uncombine_filter(*left);
+            exprs.extend(uncombine_filter(*right));
+            exprs
+        }
+        expr => {
+            vec![expr]
+        }
+    }
+}
+
+/// Combines an array of filter expressions into a single filter expression
+/// consisting of the input filter expressions joined with logical OR.
+/// Returns None if the filters array is empty.
+pub fn combine_filters_disjunctive(filters: &[Expr]) -> Option<Expr> {
+    if filters.is_empty() {
+        return None;
+    }
+
+    filters.iter().cloned().reduce(datafusion_expr::or)
+}
+
+/// Recursively un-alias an expressions
+#[inline]
+pub fn unalias(expr: Expr) -> Expr {
+    match expr {
+        Expr::Alias(sub_expr, _) => unalias(*sub_expr),
+        _ => expr,
+    }
+}
+
 /// Recursively scans a slice of expressions for any `Or` operators
 ///
 /// # Arguments
@@ -370,10 +424,33 @@ mod tests {
     use super::*;
     use arrow::datatypes::DataType;
     use datafusion_common::Column;
-    use datafusion_expr::{col, lit, utils::expr_to_columns};
+    use datafusion_expr::{binary_expr, col, lit, utils::expr_to_columns};
     use std::collections::HashSet;
     use std::ops::Add;
 
+    #[test]
+    fn combine_zero_filters() {
+        let result = combine_filters(&[]);
+        assert_eq!(result, None);
+    }
+
+    #[test]
+    fn combine_one_filter() {
+        let filter = binary_expr(col("c1"), Operator::Lt, lit(1));
+        let result = combine_filters(&[filter.clone()]);
+        assert_eq!(result, Some(filter));
+    }
+
+    #[test]
+    fn combine_multiple_filters() {
+        let filter1 = binary_expr(col("c1"), Operator::Lt, lit(1));
+        let filter2 = binary_expr(col("c2"), Operator::Lt, lit(2));
+        let filter3 = binary_expr(col("c3"), Operator::Lt, lit(3));
+        let result =
+            combine_filters(&[filter1.clone(), filter2.clone(), filter3.clone()]);
+        assert_eq!(result, Some(and(and(filter1, filter2), filter3)));
+    }
+
     #[test]
     fn test_collect_expr() -> Result<()> {
         let mut accum: HashSet<Column> = HashSet::new();
@@ -396,6 +473,46 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_uncombine_filter() {
+        let expr = col("a").eq(lit("s"));
+        let actual = uncombine_filter(expr);
+
+        assert_predicates(actual, vec![col("a").eq(lit("s"))]);
+    }
+
+    #[test]
+    fn test_uncombine_filter_recursively() {
+        let expr = and(col("a"), col("b"));
+        let actual = uncombine_filter(expr);
+
+        assert_predicates(actual, vec![col("a"), col("b")]);
+
+        let expr = col("a").and(col("b")).or(col("c"));
+        let actual = uncombine_filter(expr.clone());
+
+        assert_predicates(actual, vec![expr]);
+    }
+
+    fn assert_predicates(actual: Vec<Expr>, expected: Vec<Expr>) {
+        assert_eq!(
+            actual.len(),
+            expected.len(),
+            "Predicates are not equal, found {} predicates but expected {}",
+            actual.len(),
+            expected.len()
+        );
+
+        for expr in expected.into_iter() {
+            assert!(
+                actual.contains(&expr),
+                "Predicates are not equal, predicate {:?} not found in {:?}",
+                expr,
+                actual
+            );
+        }
+    }
+
     #[test]
     fn test_rewrite_preserving_name() {
         test_rewrite(col("a"), col("a"));