You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/08 21:38:46 UTC

[arrow-datafusion] branch master updated: reuse code `utils::optimize_children` but affect inline. (#4121)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 63386d1b7 reuse code `utils::optimize_children` but affect inline. (#4121)
63386d1b7 is described below

commit 63386d1b7a805b854658afaf26a0ed8da207abcd
Author: jakevin <ja...@gmail.com>
AuthorDate: Wed Nov 9 05:38:40 2022 +0800

    reuse code `utils::optimize_children` but affect inline. (#4121)
---
 .../optimizer/src/common_subexpr_eliminate.rs      | 388 ++++++++++-----------
 1 file changed, 190 insertions(+), 198 deletions(-)

diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 6ab1dd1fc..e8158e632 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -17,7 +17,7 @@
 
 //! Eliminate common sub-expression.
 
-use crate::{OptimizerConfig, OptimizerRule};
+use crate::{utils, OptimizerConfig, OptimizerRule};
 use arrow::datatypes::DataType;
 use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, Result};
 use datafusion_expr::{
@@ -25,7 +25,6 @@ use datafusion_expr::{
     expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion},
     expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
     logical_plan::{Aggregate, Filter, LogicalPlan, Projection, Sort, Window},
-    utils::from_plan,
     Expr, ExprSchemable,
 };
 use std::collections::{BTreeSet, HashMap};
@@ -54,13 +53,200 @@ type Identifier = String;
 /// be eliminated.
 pub struct CommonSubexprEliminate {}
 
+impl CommonSubexprEliminate {
+    fn rewrite_expr(
+        &self,
+        exprs_list: &[&[Expr]],
+        arrays_list: &[&[Vec<(usize, String)>]],
+        input: &LogicalPlan,
+        expr_set: &mut ExprSet,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<(Vec<Vec<Expr>>, LogicalPlan)> {
+        let mut affected_id = BTreeSet::<Identifier>::new();
+
+        let rewrite_exprs = exprs_list
+            .iter()
+            .zip(arrays_list.iter())
+            .map(|(exprs, arrays)| {
+                exprs
+                    .iter()
+                    .cloned()
+                    .zip(arrays.iter())
+                    .map(|(expr, id_array)| {
+                        replace_common_expr(expr, id_array, expr_set, &mut affected_id)
+                    })
+                    .collect::<Result<Vec<_>>>()
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        let mut new_input = self.optimize(input, optimizer_config)?;
+        if !affected_id.is_empty() {
+            new_input = build_project_plan(new_input, affected_id, expr_set)?;
+        }
+
+        Ok((rewrite_exprs, new_input))
+    }
+}
+
 impl OptimizerRule for CommonSubexprEliminate {
     fn optimize(
         &self,
         plan: &LogicalPlan,
         optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        optimize(plan, optimizer_config)
+        let mut expr_set = ExprSet::new();
+
+        match plan {
+            LogicalPlan::Projection(Projection {
+                expr,
+                input,
+                schema,
+                alias,
+            }) => {
+                let input_schema = Arc::clone(input.schema());
+                let arrays = to_arrays(expr, input_schema, &mut expr_set)?;
+
+                let (mut new_expr, new_input) = self.rewrite_expr(
+                    &[expr],
+                    &[&arrays],
+                    input,
+                    &mut expr_set,
+                    optimizer_config,
+                )?;
+
+                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                    pop_expr(&mut new_expr)?,
+                    Arc::new(new_input),
+                    schema.clone(),
+                    alias.clone(),
+                )?))
+            }
+            LogicalPlan::Filter(filter) => {
+                let input = filter.input();
+                let predicate = filter.predicate();
+                let input_schema = Arc::clone(input.schema());
+                let mut id_array = vec![];
+                expr_to_identifier(
+                    predicate,
+                    &mut expr_set,
+                    &mut id_array,
+                    input_schema,
+                )?;
+
+                let (mut new_expr, new_input) = self.rewrite_expr(
+                    &[&[predicate.clone()]],
+                    &[&[id_array]],
+                    filter.input(),
+                    &mut expr_set,
+                    optimizer_config,
+                )?;
+
+                if let Some(predicate) = pop_expr(&mut new_expr)?.pop() {
+                    Ok(LogicalPlan::Filter(Filter::try_new(
+                        predicate,
+                        Arc::new(new_input),
+                    )?))
+                } else {
+                    Err(DataFusionError::Internal(
+                        "Failed to pop predicate expr".to_string(),
+                    ))
+                }
+            }
+            LogicalPlan::Window(Window {
+                input,
+                window_expr,
+                schema,
+            }) => {
+                let input_schema = Arc::clone(input.schema());
+                let arrays = to_arrays(window_expr, input_schema, &mut expr_set)?;
+
+                let (mut new_expr, new_input) = self.rewrite_expr(
+                    &[window_expr],
+                    &[&arrays],
+                    input,
+                    &mut expr_set,
+                    optimizer_config,
+                )?;
+
+                Ok(LogicalPlan::Window(Window {
+                    input: Arc::new(new_input),
+                    window_expr: pop_expr(&mut new_expr)?,
+                    schema: schema.clone(),
+                }))
+            }
+            LogicalPlan::Aggregate(Aggregate {
+                group_expr,
+                aggr_expr,
+                input,
+                schema,
+            }) => {
+                let input_schema = Arc::clone(input.schema());
+                let group_arrays =
+                    to_arrays(group_expr, Arc::clone(&input_schema), &mut expr_set)?;
+                let aggr_arrays = to_arrays(aggr_expr, input_schema, &mut expr_set)?;
+
+                let (mut new_expr, new_input) = self.rewrite_expr(
+                    &[group_expr, aggr_expr],
+                    &[&group_arrays, &aggr_arrays],
+                    input,
+                    &mut expr_set,
+                    optimizer_config,
+                )?;
+                // note the reversed pop order.
+                let new_aggr_expr = pop_expr(&mut new_expr)?;
+                let new_group_expr = pop_expr(&mut new_expr)?;
+
+                Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+                    Arc::new(new_input),
+                    new_group_expr,
+                    new_aggr_expr,
+                    schema.clone(),
+                )?))
+            }
+            LogicalPlan::Sort(Sort { expr, input, fetch }) => {
+                let input_schema = Arc::clone(input.schema());
+                let arrays = to_arrays(expr, input_schema, &mut expr_set)?;
+
+                let (mut new_expr, new_input) = self.rewrite_expr(
+                    &[expr],
+                    &[&arrays],
+                    input,
+                    &mut expr_set,
+                    optimizer_config,
+                )?;
+
+                Ok(LogicalPlan::Sort(Sort {
+                    expr: pop_expr(&mut new_expr)?,
+                    input: Arc::new(new_input),
+                    fetch: *fetch,
+                }))
+            }
+            LogicalPlan::Join { .. }
+            | LogicalPlan::CrossJoin(_)
+            | LogicalPlan::Repartition(_)
+            | LogicalPlan::Union(_)
+            | LogicalPlan::TableScan { .. }
+            | LogicalPlan::Values(_)
+            | LogicalPlan::EmptyRelation(_)
+            | LogicalPlan::Subquery(_)
+            | LogicalPlan::SubqueryAlias(_)
+            | LogicalPlan::Limit(_)
+            | LogicalPlan::CreateExternalTable(_)
+            | LogicalPlan::Explain { .. }
+            | LogicalPlan::Analyze { .. }
+            | LogicalPlan::CreateMemoryTable(_)
+            | LogicalPlan::CreateView(_)
+            | LogicalPlan::CreateCatalogSchema(_)
+            | LogicalPlan::CreateCatalog(_)
+            | LogicalPlan::DropTable(_)
+            | LogicalPlan::DropView(_)
+            | LogicalPlan::SetVariable(_)
+            | LogicalPlan::Distinct(_)
+            | LogicalPlan::Extension { .. } => {
+                // apply the optimization to all inputs of the plan
+                utils::optimize_children(self, plan, optimizer_config)
+            }
+        }
     }
 
     fn name(&self) -> &str {
@@ -81,167 +267,6 @@ impl CommonSubexprEliminate {
     }
 }
 
-fn optimize(
-    plan: &LogicalPlan,
-    optimizer_config: &OptimizerConfig,
-) -> Result<LogicalPlan> {
-    let mut expr_set = ExprSet::new();
-
-    match plan {
-        LogicalPlan::Projection(Projection {
-            expr,
-            input,
-            schema,
-            alias,
-        }) => {
-            let input_schema = Arc::clone(input.schema());
-            let arrays = to_arrays(expr, input_schema, &mut expr_set)?;
-
-            let (mut new_expr, new_input) = rewrite_expr(
-                &[expr],
-                &[&arrays],
-                input,
-                &mut expr_set,
-                optimizer_config,
-            )?;
-
-            Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
-                pop_expr(&mut new_expr)?,
-                Arc::new(new_input),
-                schema.clone(),
-                alias.clone(),
-            )?))
-        }
-        LogicalPlan::Filter(filter) => {
-            let input = filter.input();
-            let predicate = filter.predicate();
-            let input_schema = Arc::clone(input.schema());
-            let mut id_array = vec![];
-            expr_to_identifier(predicate, &mut expr_set, &mut id_array, input_schema)?;
-
-            let (mut new_expr, new_input) = rewrite_expr(
-                &[&[predicate.clone()]],
-                &[&[id_array]],
-                filter.input(),
-                &mut expr_set,
-                optimizer_config,
-            )?;
-
-            if let Some(predicate) = pop_expr(&mut new_expr)?.pop() {
-                Ok(LogicalPlan::Filter(Filter::try_new(
-                    predicate,
-                    Arc::new(new_input),
-                )?))
-            } else {
-                Err(DataFusionError::Internal(
-                    "Failed to pop predicate expr".to_string(),
-                ))
-            }
-        }
-        LogicalPlan::Window(Window {
-            input,
-            window_expr,
-            schema,
-        }) => {
-            let input_schema = Arc::clone(input.schema());
-            let arrays = to_arrays(window_expr, input_schema, &mut expr_set)?;
-
-            let (mut new_expr, new_input) = rewrite_expr(
-                &[window_expr],
-                &[&arrays],
-                input,
-                &mut expr_set,
-                optimizer_config,
-            )?;
-
-            Ok(LogicalPlan::Window(Window {
-                input: Arc::new(new_input),
-                window_expr: pop_expr(&mut new_expr)?,
-                schema: schema.clone(),
-            }))
-        }
-        LogicalPlan::Aggregate(Aggregate {
-            group_expr,
-            aggr_expr,
-            input,
-            schema,
-        }) => {
-            let input_schema = Arc::clone(input.schema());
-            let group_arrays =
-                to_arrays(group_expr, Arc::clone(&input_schema), &mut expr_set)?;
-            let aggr_arrays = to_arrays(aggr_expr, input_schema, &mut expr_set)?;
-
-            let (mut new_expr, new_input) = rewrite_expr(
-                &[group_expr, aggr_expr],
-                &[&group_arrays, &aggr_arrays],
-                input,
-                &mut expr_set,
-                optimizer_config,
-            )?;
-            // note the reversed pop order.
-            let new_aggr_expr = pop_expr(&mut new_expr)?;
-            let new_group_expr = pop_expr(&mut new_expr)?;
-
-            Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
-                Arc::new(new_input),
-                new_group_expr,
-                new_aggr_expr,
-                schema.clone(),
-            )?))
-        }
-        LogicalPlan::Sort(Sort { expr, input, fetch }) => {
-            let input_schema = Arc::clone(input.schema());
-            let arrays = to_arrays(expr, input_schema, &mut expr_set)?;
-
-            let (mut new_expr, new_input) = rewrite_expr(
-                &[expr],
-                &[&arrays],
-                input,
-                &mut expr_set,
-                optimizer_config,
-            )?;
-
-            Ok(LogicalPlan::Sort(Sort {
-                expr: pop_expr(&mut new_expr)?,
-                input: Arc::new(new_input),
-                fetch: *fetch,
-            }))
-        }
-        LogicalPlan::Join { .. }
-        | LogicalPlan::CrossJoin(_)
-        | LogicalPlan::Repartition(_)
-        | LogicalPlan::Union(_)
-        | LogicalPlan::TableScan { .. }
-        | LogicalPlan::Values(_)
-        | LogicalPlan::EmptyRelation(_)
-        | LogicalPlan::Subquery(_)
-        | LogicalPlan::SubqueryAlias(_)
-        | LogicalPlan::Limit(_)
-        | LogicalPlan::CreateExternalTable(_)
-        | LogicalPlan::Explain { .. }
-        | LogicalPlan::Analyze { .. }
-        | LogicalPlan::CreateMemoryTable(_)
-        | LogicalPlan::CreateView(_)
-        | LogicalPlan::CreateCatalogSchema(_)
-        | LogicalPlan::CreateCatalog(_)
-        | LogicalPlan::DropTable(_)
-        | LogicalPlan::DropView(_)
-        | LogicalPlan::SetVariable(_)
-        | LogicalPlan::Distinct(_)
-        | LogicalPlan::Extension { .. } => {
-            // apply the optimization to all inputs of the plan
-            let expr = plan.expressions();
-            let inputs = plan.inputs();
-            let new_inputs = inputs
-                .iter()
-                .map(|input_plan| optimize(input_plan, optimizer_config))
-                .collect::<Result<Vec<_>>>()?;
-
-            from_plan(plan, &expr, &new_inputs)
-        }
-    }
-}
-
 fn pop_expr(new_expr: &mut Vec<Vec<Expr>>) -> Result<Vec<Expr>> {
     new_expr
         .pop()
@@ -285,7 +310,7 @@ fn build_project_plan(
             _ => {
                 return Err(DataFusionError::Internal(
                     "expr_set invalid state".to_string(),
-                ))
+                ));
             }
         }
     }
@@ -307,39 +332,6 @@ fn build_project_plan(
     )?))
 }
 
-#[inline]
-fn rewrite_expr(
-    exprs_list: &[&[Expr]],
-    arrays_list: &[&[Vec<(usize, String)>]],
-    input: &LogicalPlan,
-    expr_set: &mut ExprSet,
-    optimizer_config: &OptimizerConfig,
-) -> Result<(Vec<Vec<Expr>>, LogicalPlan)> {
-    let mut affected_id = BTreeSet::<Identifier>::new();
-
-    let rewrote_exprs = exprs_list
-        .iter()
-        .zip(arrays_list.iter())
-        .map(|(exprs, arrays)| {
-            exprs
-                .iter()
-                .cloned()
-                .zip(arrays.iter())
-                .map(|(expr, id_array)| {
-                    replace_common_expr(expr, id_array, expr_set, &mut affected_id)
-                })
-                .collect::<Result<Vec<_>>>()
-        })
-        .collect::<Result<Vec<_>>>()?;
-
-    let mut new_input = optimize(input, optimizer_config)?;
-    if !affected_id.is_empty() {
-        new_input = build_project_plan(new_input, affected_id, expr_set)?;
-    }
-
-    Ok((rewrote_exprs, new_input))
-}
-
 /// Go through an expression tree and generate identifier.
 ///
 /// An identifier contains information of the expression itself and its sub-expression.