You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/10/12 18:30:07 UTC
[arrow-datafusion] branch master updated: Consolidate expression manipulation functions into `datafusion_optimizer` (#3809)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b0f58dda5 Consolidate expression manipulation functions into `datafusion_optimizer` (#3809)
b0f58dda5 is described below
commit b0f58dda5cd073c54897f58a1f71f289b6942f3c
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Oct 12 14:30:00 2022 -0400
Consolidate expression manipulation functions into `datafusion_optimizer` (#3809)
* Consolidate expression manipulation functions into optimizer
* cleanup includes
* Fixup some tests
---
benchmarks/src/bin/parquet_filter_pushdown.rs | 5 +-
.../core/src/datasource/file_format/parquet.rs | 2 +-
datafusion/core/src/logical_plan/mod.rs | 9 +-
.../src/physical_plan/file_format/row_filter.rs | 3 +-
datafusion/core/src/physical_plan/planner.rs | 3 +-
datafusion/core/tests/simplification.rs | 4 +-
datafusion/expr/src/expr_fn.rs | 130 ---------------------
.../optimizer/src/decorrelate_where_exists.rs | 5 +-
datafusion/optimizer/src/decorrelate_where_in.rs | 6 +-
.../optimizer/src/scalar_subquery_to_join.rs | 4 +-
datafusion/optimizer/src/utils.rs | 121 ++++++++++++++++++-
11 files changed, 141 insertions(+), 151 deletions(-)
diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs
index e3b365d4f..86cd79d2a 100644
--- a/benchmarks/src/bin/parquet_filter_pushdown.rs
+++ b/benchmarks/src/bin/parquet_filter_pushdown.rs
@@ -28,13 +28,14 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::{lit, or, Expr};
use datafusion::logical_plan::ToDFSchema;
+use datafusion::optimizer::utils::combine_filters_disjunctive;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::file_format::{
FileScanConfig, ParquetExec, ParquetScanOptions,
};
use datafusion::physical_plan::filter::FilterExec;
-use datafusion::prelude::{col, combine_filters, SessionConfig, SessionContext};
+use datafusion::prelude::{col, SessionConfig, SessionContext};
use object_store::path::Path;
use object_store::ObjectMeta;
use parquet::arrow::ArrowWriter;
@@ -143,7 +144,7 @@ async fn run_benchmarks(
col("response_status").eq(lit(403_u16)),
)),
// Many filters
- combine_filters(&[
+ combine_filters_disjunctive(&[
col("request_method").not_eq(lit("GET")),
col("response_status").eq(lit(400_u16)),
// TODO this fails in the FilterExec with Error: Internal("The type of Dictionary(Int32, Utf8) = Utf8 of binary physical should be same")
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index be022a293..bf0488932 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -25,6 +25,7 @@ use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use datafusion_common::DataFusionError;
+use datafusion_optimizer::utils::combine_filters;
use hashbrown::HashMap;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::parquet_to_arrow_schema;
@@ -40,7 +41,6 @@ use crate::arrow::array::{
use crate::arrow::datatypes::{DataType, Field};
use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::Result;
-use crate::logical_plan::combine_filters;
use crate::logical_plan::Expr;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs
index e09f39ac1..57da79a74 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -28,9 +28,9 @@ pub use datafusion_common::{
pub use datafusion_expr::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
atan2, avg, bit_length, btrim, call_fn, case, cast, ceil, character_length, chr,
- coalesce, col, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos,
- count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest,
- exists, exp, expr_rewriter,
+ coalesce, col, concat, concat_expr, concat_ws, concat_ws_expr, cos, count,
+ count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exists, exp,
+ expr_rewriter,
expr_rewriter::{
normalize_col, normalize_col_with_schemas, normalize_cols, replace_col,
rewrite_sort_cols_by_aggs, unnormalize_col, unnormalize_cols, ExprRewritable,
@@ -53,6 +53,5 @@ pub use datafusion_expr::{
reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384, sha512,
signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim,
- trunc, unalias, upper, when, Expr, ExprSchemable, Literal, Operator,
+ trunc, upper, when, Expr, ExprSchemable, Literal, Operator,
};
-pub use datafusion_optimizer::expr_simplifier::SimplifyInfo;
diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs b/datafusion/core/src/physical_plan/file_format/row_filter.rs
index 473856b7b..574e2ad44 100644
--- a/datafusion/core/src/physical_plan/file_format/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs
@@ -22,7 +22,8 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::{Column, DataFusionError, Result, ScalarValue, ToDFSchema};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion};
-use datafusion_expr::{uncombine_filter, Expr};
+use datafusion_expr::Expr;
+use datafusion_optimizer::utils::uncombine_filter;
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 88be00cd9..2a34dd9fb 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -32,7 +32,7 @@ use crate::logical_expr::{
Window,
};
use crate::logical_plan::{
- unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan,
+ unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan,
Partitioning as LogicalPartitioning, PlanType, Repartition, ToStringifiedPlan, Union,
UserDefinedLogicalNode,
};
@@ -63,6 +63,7 @@ use datafusion_common::ScalarValue;
use datafusion_expr::expr::GroupingSet;
use datafusion_expr::utils::{expand_wildcard, expr_to_columns};
use datafusion_expr::WindowFrameUnits;
+use datafusion_optimizer::utils::unalias;
use datafusion_physical_expr::expressions::Literal;
use datafusion_sql::utils::window_expr_common_partition_keys;
use futures::future::BoxFuture;
diff --git a/datafusion/core/tests/simplification.rs b/datafusion/core/tests/simplification.rs
index 9cccd1aab..8c98b8045 100644
--- a/datafusion/core/tests/simplification.rs
+++ b/datafusion/core/tests/simplification.rs
@@ -22,10 +22,10 @@ use datafusion::logical_plan::ExprSchemable;
use datafusion::{
error::Result,
execution::context::ExecutionProps,
- logical_plan::{DFSchema, Expr, SimplifyInfo},
+ logical_plan::{DFSchema, Expr},
prelude::*,
};
-use datafusion_optimizer::expr_simplifier::ExprSimplifier;
+use datafusion_optimizer::expr_simplifier::{ExprSimplifier, SimplifyInfo};
/// In order to simplify expressions, DataFusion must have information
/// about the expressions.
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 9c9ce0445..5ab519dce 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -468,60 +468,6 @@ pub fn when(when: Expr, then: Expr) -> CaseBuilder {
CaseBuilder::new(None, vec![when], vec![then], None)
}
-/// Combines an array of filter expressions into a single filter expression
-/// consisting of the input filter expressions joined with logical AND.
-/// Returns None if the filters array is empty.
-pub fn combine_filters(filters: &[Expr]) -> Option<Expr> {
- if filters.is_empty() {
- return None;
- }
- let combined_filter = filters
- .iter()
- .skip(1)
- .fold(filters[0].clone(), |acc, filter| and(acc, filter.clone()));
- Some(combined_filter)
-}
-
-/// Take combined filter (multiple boolean expressions ANDed together)
-/// and break down into distinct filters. This should be the inverse of
-/// `combine_filters`
-pub fn uncombine_filter(filter: Expr) -> Vec<Expr> {
- match filter {
- Expr::BinaryExpr {
- left,
- op: Operator::And,
- right,
- } => {
- let mut exprs = uncombine_filter(*left);
- exprs.extend(uncombine_filter(*right));
- exprs
- }
- expr => {
- vec![expr]
- }
- }
-}
-
-/// Combines an array of filter expressions into a single filter expression
-/// consisting of the input filter expressions joined with logical OR.
-/// Returns None if the filters array is empty.
-pub fn combine_filters_disjunctive(filters: &[Expr]) -> Option<Expr> {
- if filters.is_empty() {
- return None;
- }
-
- filters.iter().cloned().reduce(or)
-}
-
-/// Recursively un-alias an expressions
-#[inline]
-pub fn unalias(expr: Expr) -> Expr {
- match expr {
- Expr::Alias(sub_expr, _) => unalias(*sub_expr),
- _ => expr,
- }
-}
-
/// Creates a new UDF with a specific signature and specific return type.
/// This is a helper function to create a new UDF.
/// The function `create_udf` returns a subset of all possible `ScalarFunction`:
@@ -582,7 +528,6 @@ pub fn call_fn(name: impl AsRef<str>, args: Vec<Expr>) -> Result<Expr> {
#[cfg(test)]
mod test {
use super::*;
- use arrow::datatypes::{Field, Schema};
#[test]
fn filter_is_null_and_is_not_null() {
@@ -737,79 +682,4 @@ mod test {
unreachable!();
}
}
-
- #[test]
- fn combine_zero_filters() {
- let result = combine_filters(&[]);
- assert_eq!(result, None);
- }
-
- #[test]
- fn combine_one_filter() {
- let filter = binary_expr(col("c1"), Operator::Lt, lit(1));
- let result = combine_filters(&[filter.clone()]);
- assert_eq!(result, Some(filter));
- }
-
- #[test]
- fn combine_multiple_filters() {
- let filter1 = binary_expr(col("c1"), Operator::Lt, lit(1));
- let filter2 = binary_expr(col("c2"), Operator::Lt, lit(2));
- let filter3 = binary_expr(col("c3"), Operator::Lt, lit(3));
- let result =
- combine_filters(&[filter1.clone(), filter2.clone(), filter3.clone()]);
- assert_eq!(result, Some(and(and(filter1, filter2), filter3)));
- }
-
- fn assert_predicates(actual: Vec<Expr>, expected: Vec<Expr>) {
- assert_eq!(
- actual.len(),
- expected.len(),
- "Predicates are not equal, found {} predicates but expected {}",
- actual.len(),
- expected.len()
- );
-
- for expr in expected.into_iter() {
- assert!(
- actual.contains(&expr),
- "Predicates are not equal, predicate {:?} not found in {:?}",
- expr,
- actual
- );
- }
- }
-
- #[test]
- fn test_uncombine_filter() {
- let _schema = Schema::new(vec![
- Field::new("a", DataType::Utf8, true),
- Field::new("b", DataType::Utf8, true),
- Field::new("c", DataType::Utf8, true),
- ]);
-
- let expr = col("a").eq(lit("s"));
- let actual = uncombine_filter(expr);
-
- assert_predicates(actual, vec![col("a").eq(lit("s"))]);
- }
-
- #[test]
- fn test_uncombine_filter_recursively() {
- let _schema = Schema::new(vec![
- Field::new("a", DataType::Utf8, true),
- Field::new("b", DataType::Utf8, true),
- Field::new("c", DataType::Utf8, true),
- ]);
-
- let expr = and(col("a"), col("b"));
- let actual = uncombine_filter(expr);
-
- assert_predicates(actual, vec![col("a"), col("b")]);
-
- let expr = col("a").and(col("b")).or(col("c"));
- let actual = uncombine_filter(expr.clone());
-
- assert_predicates(actual, vec![expr]);
- }
}
diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs
index d6727ad0f..434c57bf1 100644
--- a/datafusion/optimizer/src/decorrelate_where_exists.rs
+++ b/datafusion/optimizer/src/decorrelate_where_exists.rs
@@ -16,12 +16,13 @@
// under the License.
use crate::utils::{
- exprs_to_join_cols, find_join_exprs, split_conjunction, verify_not_disjunction,
+ combine_filters, exprs_to_join_cols, find_join_exprs, split_conjunction,
+ verify_not_disjunction,
};
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::{context, plan_err, DataFusionError};
use datafusion_expr::logical_plan::{Filter, JoinType, Subquery};
-use datafusion_expr::{combine_filters, Expr, LogicalPlan, LogicalPlanBuilder};
+use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
use std::sync::Arc;
/// Optimizer rule for rewriting subquery filters to joins
diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs
index a3443eaee..c7a072dac 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -16,13 +16,13 @@
// under the License.
use crate::utils::{
- alias_cols, exprs_to_join_cols, find_join_exprs, merge_cols, only_or_err,
- split_conjunction, swap_table, verify_not_disjunction,
+ alias_cols, combine_filters, exprs_to_join_cols, find_join_exprs, merge_cols,
+ only_or_err, split_conjunction, swap_table, verify_not_disjunction,
};
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::context;
use datafusion_expr::logical_plan::{Filter, JoinType, Projection, Subquery};
-use datafusion_expr::{combine_filters, Expr, LogicalPlan, LogicalPlanBuilder};
+use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
use log::debug;
use std::sync::Arc;
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index d14888110..3399e5576 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -16,13 +16,13 @@
// under the License.
use crate::utils::{
- exprs_to_join_cols, find_join_exprs, only_or_err, split_conjunction,
+ combine_filters, exprs_to_join_cols, find_join_exprs, only_or_err, split_conjunction,
verify_not_disjunction,
};
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::{context, plan_err, Column, Result};
use datafusion_expr::logical_plan::{Filter, JoinType, Limit, Subquery};
-use datafusion_expr::{combine_filters, Expr, LogicalPlan, LogicalPlanBuilder, Operator};
+use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Operator};
use log::debug;
use std::sync::Arc;
diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs
index d9b5cd9a8..23aec3354 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -23,7 +23,7 @@ use datafusion_common::{plan_err, Column, DFSchemaRef};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion_expr::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
use datafusion_expr::{
- and, col, combine_filters,
+ and, col,
logical_plan::{Filter, LogicalPlan},
utils::from_plan,
Expr, Operator,
@@ -69,6 +69,60 @@ pub fn split_conjunction<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>
}
}
+/// Combines an array of filter expressions into a single filter expression
+/// consisting of the input filter expressions joined with logical AND.
+/// Returns None if the filters array is empty.
+pub fn combine_filters(filters: &[Expr]) -> Option<Expr> {
+ if filters.is_empty() {
+ return None;
+ }
+ let combined_filter = filters
+ .iter()
+ .skip(1)
+ .fold(filters[0].clone(), |acc, filter| and(acc, filter.clone()));
+ Some(combined_filter)
+}
+
+/// Take combined filter (multiple boolean expressions ANDed together)
+/// and break down into distinct filters. This should be the inverse of
+/// `combine_filters`
+pub fn uncombine_filter(filter: Expr) -> Vec<Expr> {
+ match filter {
+ Expr::BinaryExpr {
+ left,
+ op: Operator::And,
+ right,
+ } => {
+ let mut exprs = uncombine_filter(*left);
+ exprs.extend(uncombine_filter(*right));
+ exprs
+ }
+ expr => {
+ vec![expr]
+ }
+ }
+}
+
+/// Combines an array of filter expressions into a single filter expression
+/// consisting of the input filter expressions joined with logical OR.
+/// Returns None if the filters array is empty.
+pub fn combine_filters_disjunctive(filters: &[Expr]) -> Option<Expr> {
+ if filters.is_empty() {
+ return None;
+ }
+
+ filters.iter().cloned().reduce(datafusion_expr::or)
+}
+
+/// Recursively un-alias an expressions
+#[inline]
+pub fn unalias(expr: Expr) -> Expr {
+ match expr {
+ Expr::Alias(sub_expr, _) => unalias(*sub_expr),
+ _ => expr,
+ }
+}
+
/// Recursively scans a slice of expressions for any `Or` operators
///
/// # Arguments
@@ -370,10 +424,33 @@ mod tests {
use super::*;
use arrow::datatypes::DataType;
use datafusion_common::Column;
- use datafusion_expr::{col, lit, utils::expr_to_columns};
+ use datafusion_expr::{binary_expr, col, lit, utils::expr_to_columns};
use std::collections::HashSet;
use std::ops::Add;
+ #[test]
+ fn combine_zero_filters() {
+ let result = combine_filters(&[]);
+ assert_eq!(result, None);
+ }
+
+ #[test]
+ fn combine_one_filter() {
+ let filter = binary_expr(col("c1"), Operator::Lt, lit(1));
+ let result = combine_filters(&[filter.clone()]);
+ assert_eq!(result, Some(filter));
+ }
+
+ #[test]
+ fn combine_multiple_filters() {
+ let filter1 = binary_expr(col("c1"), Operator::Lt, lit(1));
+ let filter2 = binary_expr(col("c2"), Operator::Lt, lit(2));
+ let filter3 = binary_expr(col("c3"), Operator::Lt, lit(3));
+ let result =
+ combine_filters(&[filter1.clone(), filter2.clone(), filter3.clone()]);
+ assert_eq!(result, Some(and(and(filter1, filter2), filter3)));
+ }
+
#[test]
fn test_collect_expr() -> Result<()> {
let mut accum: HashSet<Column> = HashSet::new();
@@ -396,6 +473,46 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_uncombine_filter() {
+ let expr = col("a").eq(lit("s"));
+ let actual = uncombine_filter(expr);
+
+ assert_predicates(actual, vec![col("a").eq(lit("s"))]);
+ }
+
+ #[test]
+ fn test_uncombine_filter_recursively() {
+ let expr = and(col("a"), col("b"));
+ let actual = uncombine_filter(expr);
+
+ assert_predicates(actual, vec![col("a"), col("b")]);
+
+ let expr = col("a").and(col("b")).or(col("c"));
+ let actual = uncombine_filter(expr.clone());
+
+ assert_predicates(actual, vec![expr]);
+ }
+
+ fn assert_predicates(actual: Vec<Expr>, expected: Vec<Expr>) {
+ assert_eq!(
+ actual.len(),
+ expected.len(),
+ "Predicates are not equal, found {} predicates but expected {}",
+ actual.len(),
+ expected.len()
+ );
+
+ for expr in expected.into_iter() {
+ assert!(
+ actual.contains(&expr),
+ "Predicates are not equal, predicate {:?} not found in {:?}",
+ expr,
+ actual
+ );
+ }
+ }
+
#[test]
fn test_rewrite_preserving_name() {
test_rewrite(col("a"), col("a"));