You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/05 13:21:10 UTC

[GitHub] [arrow] drusso commented on a change in pull request #8836: ARROW-10808: [Rust][DataFusion] Support nested expressions in aggregations.

drusso commented on a change in pull request #8836:
URL: https://github.com/apache/arrow/pull/8836#discussion_r536756258



##########
File path: rust/datafusion/src/sql/utils.rs
##########
@@ -0,0 +1,305 @@
+use crate::error::{DataFusionError, Result};
+use crate::logical_plan::{Expr, LogicalPlan};
+use arrow::datatypes::Schema;
+
+/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
+pub(crate) fn expand_wildcard(expr: &Expr, schema: &Schema) -> Vec<Expr> {
+    match expr {
+        Expr::Wildcard => schema
+            .fields()
+            .iter()
+            .map(|f| Expr::Column(f.name().to_string()))
+            .collect::<Vec<Expr>>(),
+        _ => vec![expr.clone()],
+    }
+}
+
+/// Collect all deeply nested `Expr::AggregateFunction` and
+/// `Expr::AggregateUDF`. They are returned in order of occurrence (depth
+/// first), with duplicates omitted.
+pub(crate) fn find_aggregate_exprs(exprs: &Vec<Expr>) -> Vec<Expr> {
+    find_exprs_in_exprs(exprs, &|nested_expr| match nested_expr {
+        Expr::AggregateFunction { .. } | Expr::AggregateUDF { .. } => true,
+        _ => false,
+    })
+}
+
+/// Collect all deeply nested `Expr::Column`'s. They are returned in order of
+/// appearance (depth first), with duplicates omitted.
+pub(crate) fn find_column_exprs(exprs: &Vec<Expr>) -> Vec<Expr> {
+    find_exprs_in_exprs(exprs, &|nested_expr| match nested_expr {
+        Expr::Column(_) => true,
+        _ => false,
+    })
+}
+
+/// Search the provided `Expr`'s, and all of their nested `Expr`, for any that
+/// pass the provided test. The returned `Expr`'s are deduplicated and returned
+/// in order of appearance (depth first).
+fn find_exprs_in_exprs<F>(exprs: &Vec<Expr>, test_fn: &F) -> Vec<Expr>
+where
+    F: Fn(&Expr) -> bool,
+{
+    exprs
+        .iter()
+        .flat_map(|expr| find_exprs_in_expr(expr, test_fn))
+        .fold(vec![], |mut acc, expr| {
+            if !acc.contains(&expr) {
+                acc.push(expr)
+            }
+            acc
+        })
+}
+
+/// Search an `Expr`, and all of its nested `Expr`'s, for any that pass the
+/// provided test. The returned `Expr`'s are deduplicated and returned in order
+/// of appearance (depth first).
+fn find_exprs_in_expr<F>(expr: &Expr, test_fn: &F) -> Vec<Expr>
+where
+    F: Fn(&Expr) -> bool,
+{
+    let matched_exprs = if test_fn(expr) {
+        vec![expr.clone()]
+    } else {
+        match expr {
+            Expr::AggregateFunction { args, .. } => find_exprs_in_exprs(&args, test_fn),
+            Expr::AggregateUDF { args, .. } => find_exprs_in_exprs(&args, test_fn),
+            Expr::Alias(nested_expr, _) => {
+                find_exprs_in_expr(nested_expr.as_ref(), test_fn)
+            }
+            Expr::BinaryExpr { left, right, .. } => {
+                let mut matches = vec![];
+                matches.extend(find_exprs_in_expr(left.as_ref(), test_fn));
+                matches.extend(find_exprs_in_expr(right.as_ref(), test_fn));
+                matches
+            }
+            Expr::Case {
+                expr: case_expr_opt,
+                when_then_expr,
+                else_expr: else_expr_opt,
+            } => {
+                let mut matches = vec![];
+
+                if let Some(case_expr) = case_expr_opt {
+                    matches.extend(find_exprs_in_expr(case_expr.as_ref(), test_fn));
+                }
+
+                matches.extend(
+                    when_then_expr
+                        .iter()
+                        .flat_map(|(a, b)| vec![a, b])
+                        .flat_map(|expr| find_exprs_in_expr(expr.as_ref(), test_fn))
+                        .collect::<Vec<Expr>>(),
+                );
+
+                if let Some(else_expr) = else_expr_opt {
+                    matches.extend(find_exprs_in_expr(else_expr.as_ref(), test_fn));
+                }
+
+                matches
+            }
+            Expr::Cast {
+                expr: nested_expr, ..
+            } => find_exprs_in_expr(nested_expr.as_ref(), test_fn),
+            Expr::IsNotNull(nested_expr) => {
+                find_exprs_in_expr(nested_expr.as_ref(), test_fn)
+            }
+            Expr::IsNull(nested_expr) => {
+                find_exprs_in_expr(nested_expr.as_ref(), test_fn)
+            }
+            Expr::Not(nested_expr) => find_exprs_in_expr(nested_expr.as_ref(), test_fn),
+            Expr::ScalarFunction { args, .. } => find_exprs_in_exprs(&args, test_fn),
+            Expr::ScalarUDF { args, .. } => find_exprs_in_exprs(&args, test_fn),
+            Expr::Sort {
+                expr: nested_expr, ..
+            } => find_exprs_in_expr(nested_expr.as_ref(), test_fn),
+
+            // These expressions don't nest other expressions.
+            Expr::Column(_)
+            | Expr::Literal(_)
+            | Expr::ScalarVariable(_)
+            | Expr::Wildcard => vec![],
+        }
+    };
+
+    matched_exprs.into_iter().fold(vec![], |mut acc, expr| {
+        if !acc.contains(&expr) {
+            acc.push(expr)
+        }
+
+        acc
+    })
+}
+
+/// Convert any `Expr` to an `Expr::Column`.
+pub(crate) fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> {
+    match expr {
+        Expr::Column(_) => Ok(expr.clone()),
+        _ => Ok(Expr::Column(expr.name(&plan.schema())?)),
+    }
+}
+
+/// Rebuilds an `Expr` as a projection on top of a collection of `Expr`'s.
+///
+/// For example, the expression `a + b < 1` would require, as input, the 2
+/// individual columns, `a` and `b`. But, if the base expressions already
+/// contain the `a + b` result, then that may be used in lieu of the `a` and
+/// `b` columns.
+///
+/// This is useful in the context of a query like:
+///
+/// SELECT a + b < 1 ... GROUP BY a + b
+///
+/// where post-aggregation, `a + b` need not be a projection against the
+/// individual columns `a` and `b`, but rather it is a projection against the
+/// `a + b` found in the GROUP BY.
+pub(crate) fn rebase_expr(
+    expr: &Expr,
+    base_exprs: &Vec<Expr>,
+    plan: &LogicalPlan,
+) -> Result<Expr> {
+    clone_with_replacement(expr, &|nested_expr| {
+        if base_exprs.contains(nested_expr) {
+            Ok(Some(expr_as_column_expr(nested_expr, plan)?))
+        } else {
+            Ok(None)
+        }
+    })
+}
+
+/// Determines if the set of `Expr`'s are a valid projection on the input
+/// `Expr::Column`'s.
+pub(crate) fn can_columns_satisfy_exprs(
+    columns: &Vec<Expr>,
+    exprs: &Vec<Expr>,
+) -> Result<bool> {
+    columns.iter().try_for_each(|c| match c {
+        Expr::Column(_) => Ok(()),
+        _ => Err(DataFusionError::Internal(
+            "Expr::Column are required".to_string(),
+        )),
+    })?;
+
+    Ok(find_column_exprs(exprs).iter().all(|c| columns.contains(c)))
+}
+
+/// Returns a cloned `Expr`, but any of the `Expr`'s in the tree may be
+/// replaced/customized by the replacement function.
+fn clone_with_replacement<F>(expr: &Expr, replacement_fn: &F) -> Result<Expr>
+where
+    F: Fn(&Expr) -> Result<Option<Expr>>,
+{

Review comment:
       I should add some notes in the docstring on this. 
   
   The `Result` allows `replacement_fn()` to be fallible, as is the case with `rebase_expr()`'s closure's use of `expr_as_column_expr()`. 
   
   If `replacement_fn()` returns:
   
   * `Err(_)`: The error is returned by `clone_with_replacement()`. 
   * `Ok(Some(replacement))`: A replacement was provided, directing `clone_with_replacement()` to use the replacement. In this case, cloning will not continue down this particular branch of expressions.
   * `Ok(None)`: A replacement was not provided, and cloning continues down this particular branch of expressions. 
   
   Does that make sense? 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org