You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/16 19:04:35 UTC

[arrow] branch master updated: ARROW-9618: [Rust] [DataFusion] Made it easier to write optimizers

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 58b38a6  ARROW-9618: [Rust] [DataFusion] Made it easier to write optimizers
58b38a6 is described below

commit 58b38a617dea31ed80746bc7ed4ed3670e2159cf
Author: Jorge C. Leitao <jo...@gmail.com>
AuthorDate: Sun Aug 16 13:04:14 2020 -0600

    ARROW-9618: [Rust] [DataFusion] Made it easier to write optimizers
    
    This PR adds 5 auxiliary functions to the optimizer package that allow us to more easily write optimizers and re-writes the optimizers using them.
    
    This PR assumes that the majority of the optimizations will:
    
    * recurse logical expressions and re-write them (currently we have the type coercion)
    * recurse logical plans and re-write them (currently we have the projection pushdown)
    
    To transverse and re-write expressions, this PR introduces two functions:
    
    ```
    /// Returns all expressions composing the expression.
    /// E.g. if the expression is "(a + 1) + 1", it returns ["a + 1", "1"] (as Expr objects)
    fn expr_expressions(expr: &Expr) -> Result<Vec<&Expr>>
    
    /// returns a new expression where the expressions in expr are replaced by the ones in `expr`.
    /// This is used in conjunction with ``expr_expressions`` to re-write expressions.
    pub fn from_expression(expr: &Expr, expressions: &Vec<Expr>) -> Result<Expr> {
    ```
    
    these are used on an optimizer as follows:
    
    ```
    let mut expressions = expr_expressions(expr)?;
    
    # recurse on the expression
    let mut expressions = expressions.iter().map(|e| recursion(e)).collect()
    
    # modify `expressions` (optimizer's code)
    
    utils::from_expression(expr, &expressions)
    ```
    
    Likewise, this PR introduces 3 functions for the plans:
    
    ```
    /// returns all expressions in the logical plan.
    pub fn expressions(plan: &LogicalPlan) -> Vec<Expr>
    
    /// returns all inputs in the logical plan
    pub fn inputs(plan: &LogicalPlan) -> Vec<&LogicalPlan>
    
    /// Returns a new logical plan based on the original one with inputs and expressions replaced
    pub fn from_plan(
        plan: &LogicalPlan,
        expr: &Vec<Expr>,
        inputs: &Vec<LogicalPlan>,
    ) -> Result<LogicalPlan>
    ```
    
    Overall, these make writing optimizers a much developer-friendlier experience because expressions or plans that the optimizer does not care can just be left alone.
    
    Closes #7879 from jorgecarleitao/simpler
    
    Authored-by: Jorge C. Leitao <jo...@gmail.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 rust/datafusion/src/logicalplan.rs                 |  10 +-
 .../src/optimizer/projection_push_down.rs          |  85 +++-------
 rust/datafusion/src/optimizer/type_coercion.rs     | 179 +++++++--------------
 rust/datafusion/src/optimizer/utils.rs             | 159 ++++++++++++++++++
 4 files changed, 244 insertions(+), 189 deletions(-)

diff --git a/rust/datafusion/src/logicalplan.rs b/rust/datafusion/src/logicalplan.rs
index 5831eb9..ee3c17a 100644
--- a/rust/datafusion/src/logicalplan.rs
+++ b/rust/datafusion/src/logicalplan.rs
@@ -717,8 +717,6 @@ pub enum LogicalPlan {
         expr: Vec<Expr>,
         /// The incoming logical plan
         input: Box<LogicalPlan>,
-        /// The schema description of the otuput
-        schema: Box<Schema>,
     },
     /// Produces rows from a table that has been registered on a
     /// context
@@ -782,8 +780,6 @@ pub enum LogicalPlan {
         n: usize,
         /// The logical plan
         input: Box<LogicalPlan>,
-        /// The schema description of the output
-        schema: Box<Schema>,
     },
     /// Creates an external table.
     CreateExternalTable {
@@ -832,8 +828,8 @@ impl LogicalPlan {
             LogicalPlan::Projection { schema, .. } => &schema,
             LogicalPlan::Selection { input, .. } => input.schema(),
             LogicalPlan::Aggregate { schema, .. } => &schema,
-            LogicalPlan::Sort { schema, .. } => &schema,
-            LogicalPlan::Limit { schema, .. } => &schema,
+            LogicalPlan::Sort { input, .. } => input.schema(),
+            LogicalPlan::Limit { input, .. } => input.schema(),
             LogicalPlan::CreateExternalTable { schema, .. } => &schema,
             LogicalPlan::Explain { schema, .. } => &schema,
         }
@@ -1133,7 +1129,6 @@ impl LogicalPlanBuilder {
         Ok(Self::from(&LogicalPlan::Limit {
             n,
             input: Box::new(self.plan.clone()),
-            schema: self.plan.schema().clone(),
         }))
     }
 
@@ -1142,7 +1137,6 @@ impl LogicalPlanBuilder {
         Ok(Self::from(&LogicalPlan::Sort {
             expr,
             input: Box::new(self.plan.clone()),
-            schema: self.plan.schema().clone(),
         }))
     }
 
diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs
index 48f1960..5550427 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -28,7 +28,9 @@ use std::collections::HashSet;
 use utils::optimize_explain;
 
 /// Projection Push Down optimizer rule ensures that only referenced columns are
-/// loaded into memory
+/// loaded into memory.
+///
+/// This optimizer does not modify expressions in the plan; it only changes the read projections
 pub struct ProjectionPushDown {}
 
 impl OptimizerRule for ProjectionPushDown {
@@ -56,66 +58,6 @@ impl ProjectionPushDown {
         has_projection: bool,
     ) -> Result<LogicalPlan> {
         match plan {
-            LogicalPlan::Projection {
-                expr,
-                input,
-                schema,
-            } => {
-                // collect all columns referenced by projection expressions
-                utils::exprlist_to_column_names(&expr, accum)?;
-
-                Ok(LogicalPlan::Projection {
-                    expr: expr.clone(),
-                    input: Box::new(self.optimize_plan(&input, accum, true)?),
-                    schema: schema.clone(),
-                })
-            }
-            LogicalPlan::Selection { expr, input } => {
-                // collect all columns referenced by filter expression
-                utils::expr_to_column_names(expr, accum)?;
-
-                Ok(LogicalPlan::Selection {
-                    expr: expr.clone(),
-                    input: Box::new(self.optimize_plan(&input, accum, has_projection)?),
-                })
-            }
-            LogicalPlan::Aggregate {
-                input,
-                group_expr,
-                aggr_expr,
-                schema,
-            } => {
-                // collect all columns referenced by grouping and aggregate expressions
-                utils::exprlist_to_column_names(&group_expr, accum)?;
-                utils::exprlist_to_column_names(&aggr_expr, accum)?;
-
-                Ok(LogicalPlan::Aggregate {
-                    input: Box::new(self.optimize_plan(&input, accum, has_projection)?),
-                    group_expr: group_expr.clone(),
-                    aggr_expr: aggr_expr.clone(),
-                    schema: schema.clone(),
-                })
-            }
-            LogicalPlan::Sort {
-                expr,
-                input,
-                schema,
-            } => {
-                // collect all columns referenced by sort expressions
-                utils::exprlist_to_column_names(&expr, accum)?;
-
-                Ok(LogicalPlan::Sort {
-                    expr: expr.clone(),
-                    input: Box::new(self.optimize_plan(&input, accum, has_projection)?),
-                    schema: schema.clone(),
-                })
-            }
-            LogicalPlan::Limit { n, input, schema } => Ok(LogicalPlan::Limit {
-                n: n.clone(),
-                input: Box::new(self.optimize_plan(&input, accum, has_projection)?),
-                schema: schema.clone(),
-            }),
-            LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()),
             LogicalPlan::TableScan {
                 schema_name,
                 table_name,
@@ -190,13 +132,32 @@ impl ProjectionPushDown {
                     projected_schema: Box::new(projected_schema),
                 })
             }
-            LogicalPlan::CreateExternalTable { .. } => Ok(plan.clone()),
             LogicalPlan::Explain {
                 verbose,
                 plan,
                 stringified_plans,
                 schema,
             } => optimize_explain(self, *verbose, &*plan, stringified_plans, &*schema),
+            // in all other cases, we construct a new plan based on the optimized inputs and re-written expressions
+            _ => {
+                let has_projection = match plan {
+                    LogicalPlan::Projection { .. } => true,
+                    _ => false,
+                };
+
+                let expr = utils::expressions(plan);
+                // collect all columns referenced by projection expressions
+                utils::exprlist_to_column_names(&expr, accum)?;
+
+                // apply the optimization to all inputs of the plan
+                let inputs = utils::inputs(plan);
+                let new_inputs = inputs
+                    .iter()
+                    .map(|plan| self.optimize_plan(plan, accum, has_projection))
+                    .collect::<Result<Vec<_>>>()?;
+
+                utils::from_plan(plan, &expr, &new_inputs)
+            }
         }
     }
 }
diff --git a/rust/datafusion/src/optimizer/type_coercion.rs b/rust/datafusion/src/optimizer/type_coercion.rs
index a0926db..a485423 100644
--- a/rust/datafusion/src/optimizer/type_coercion.rs
+++ b/rust/datafusion/src/optimizer/type_coercion.rs
@@ -27,13 +27,15 @@ use arrow::datatypes::Schema;
 
 use crate::error::{ExecutionError, Result};
 use crate::execution::physical_plan::udf::ScalarFunction;
+use crate::logicalplan::Expr;
 use crate::logicalplan::LogicalPlan;
-use crate::logicalplan::{Expr, LogicalPlanBuilder};
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::utils;
 use utils::optimize_explain;
 
-/// Implementation of type coercion optimizer rule
+/// Optimizer that applies coercion rules to expressions in the logical plan.
+///
+/// This optimizer does not alter the structure of the plan, it only changes expressions on it.
 pub struct TypeCoercionRule {
     scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>,
 }
@@ -47,46 +49,29 @@ impl TypeCoercionRule {
         Self { scalar_functions }
     }
 
-    /// Rewrite an expression list to include explicit CAST operations when required
-    fn rewrite_expr_list(&self, expr: &[Expr], schema: &Schema) -> Result<Vec<Expr>> {
-        Ok(expr
+    /// Rewrite an expression to include explicit CAST operations when required
+    fn rewrite_expr(&self, expr: &Expr, schema: &Schema) -> Result<Expr> {
+        let expressions = utils::expr_sub_expressions(expr)?;
+
+        // recurse of the re-write
+        let mut expressions = expressions
             .iter()
             .map(|e| self.rewrite_expr(e, schema))
-            .collect::<Result<Vec<_>>>()?)
-    }
+            .collect::<Result<Vec<_>>>()?;
 
-    /// Rewrite an expression to include explicit CAST operations when required
-    fn rewrite_expr(&self, expr: &Expr, schema: &Schema) -> Result<Expr> {
+        // modify `expressions` by introducing casts when necessary
         match expr {
-            Expr::BinaryExpr { left, op, right } => {
-                let left = self.rewrite_expr(left, schema)?;
-                let right = self.rewrite_expr(right, schema)?;
-                let left_type = left.get_type(schema)?;
-                let right_type = right.get_type(schema)?;
-                if left_type == right_type {
-                    Ok(Expr::BinaryExpr {
-                        left: Box::new(left),
-                        op: op.clone(),
-                        right: Box::new(right),
-                    })
-                } else {
+            Expr::BinaryExpr { .. } => {
+                let left_type = expressions[0].get_type(schema)?;
+                let right_type = expressions[1].get_type(schema)?;
+                if left_type != right_type {
                     let super_type = utils::get_supertype(&left_type, &right_type)?;
-                    Ok(Expr::BinaryExpr {
-                        left: Box::new(left.cast_to(&super_type, schema)?),
-                        op: op.clone(),
-                        right: Box::new(right.cast_to(&super_type, schema)?),
-                    })
+
+                    expressions[0] = expressions[0].cast_to(&super_type, schema)?;
+                    expressions[1] = expressions[1].cast_to(&super_type, schema)?;
                 }
             }
-            Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(self.rewrite_expr(e, schema)?))),
-            Expr::IsNotNull(e) => {
-                Ok(Expr::IsNotNull(Box::new(self.rewrite_expr(e, schema)?)))
-            }
-            Expr::ScalarFunction {
-                name,
-                args,
-                return_type,
-            } => {
+            Expr::ScalarFunction { name, .. } => {
                 // cast the inputs of scalar functions to the appropriate type where possible
                 match self
                     .scalar_functions
@@ -95,109 +80,65 @@ impl TypeCoercionRule {
                     .get(name)
                 {
                     Some(func_meta) => {
-                        let mut func_args = Vec::with_capacity(args.len());
-                        for i in 0..args.len() {
+                        for i in 0..expressions.len() {
                             let field = &func_meta.args[i];
-                            let expr = self.rewrite_expr(&args[i], schema)?;
-                            let actual_type = expr.get_type(schema)?;
+                            let actual_type = expressions[i].get_type(schema)?;
                             let required_type = field.data_type();
-                            if &actual_type == required_type {
-                                func_args.push(expr)
-                            } else {
+                            if &actual_type != required_type {
                                 let super_type =
                                     utils::get_supertype(&actual_type, required_type)?;
-                                func_args.push(expr.cast_to(&super_type, schema)?);
-                            }
+                                expressions[i] =
+                                    expressions[i].cast_to(&super_type, schema)?
+                            };
                         }
-
-                        Ok(Expr::ScalarFunction {
-                            name: name.clone(),
-                            args: func_args,
-                            return_type: return_type.clone(),
-                        })
                     }
-                    _ => Err(ExecutionError::General(format!(
-                        "Invalid scalar function {}",
-                        name
-                    ))),
+                    _ => {
+                        return Err(ExecutionError::General(format!(
+                            "Invalid scalar function {}",
+                            name
+                        )))
+                    }
                 }
             }
-            Expr::AggregateFunction {
-                name,
-                args,
-                return_type,
-            } => Ok(Expr::AggregateFunction {
-                name: name.clone(),
-                args: args
-                    .iter()
-                    .map(|a| self.rewrite_expr(a, schema))
-                    .collect::<Result<Vec<_>>>()?,
-                return_type: return_type.clone(),
-            }),
-            Expr::Cast { .. } => Ok(expr.clone()),
-            Expr::Column(_) => Ok(expr.clone()),
-            Expr::Alias(expr, alias) => Ok(Expr::Alias(
-                Box::new(self.rewrite_expr(expr, schema)?),
-                alias.to_owned(),
-            )),
-            Expr::Literal(_) => Ok(expr.clone()),
-            Expr::Not(_) => Ok(expr.clone()),
-            Expr::Sort { .. } => Ok(expr.clone()),
-            Expr::Wildcard { .. } => Err(ExecutionError::General(
-                "Wildcard expressions are not valid in a logical query plan".to_owned(),
-            )),
-            Expr::Nested(e) => self.rewrite_expr(e, schema),
-        }
+            _ => {}
+        };
+        utils::rewrite_expression(expr, &expressions)
     }
 }
 
 impl OptimizerRule for TypeCoercionRule {
     fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
         match plan {
-            LogicalPlan::Projection { expr, input, .. } => {
-                LogicalPlanBuilder::from(&self.optimize(input)?)
-                    .project(self.rewrite_expr_list(expr, input.schema())?)?
-                    .build()
-            }
-            LogicalPlan::Selection { expr, input, .. } => {
-                LogicalPlanBuilder::from(&self.optimize(input)?)
-                    .filter(self.rewrite_expr(expr, input.schema())?)?
-                    .build()
-            }
-            LogicalPlan::Aggregate {
-                input,
-                group_expr,
-                aggr_expr,
-                ..
-            } => LogicalPlanBuilder::from(&self.optimize(input)?)
-                .aggregate(
-                    self.rewrite_expr_list(group_expr, input.schema())?,
-                    self.rewrite_expr_list(aggr_expr, input.schema())?,
-                )?
-                .build(),
-            LogicalPlan::Limit { n, input, .. } => {
-                LogicalPlanBuilder::from(&self.optimize(input)?)
-                    .limit(*n)?
-                    .build()
-            }
-            LogicalPlan::Sort { input, expr, .. } => {
-                LogicalPlanBuilder::from(&self.optimize(input)?)
-                    .sort(self.rewrite_expr_list(expr, input.schema())?)?
-                    .build()
-            }
-            // the following rules do not have inputs and do not need to be re-written
-            LogicalPlan::TableScan { .. } => Ok(plan.clone()),
-            LogicalPlan::InMemoryScan { .. } => Ok(plan.clone()),
-            LogicalPlan::ParquetScan { .. } => Ok(plan.clone()),
-            LogicalPlan::CsvScan { .. } => Ok(plan.clone()),
-            LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()),
-            LogicalPlan::CreateExternalTable { .. } => Ok(plan.clone()),
             LogicalPlan::Explain {
                 verbose,
                 plan,
                 stringified_plans,
                 schema,
             } => optimize_explain(self, *verbose, &*plan, stringified_plans, &*schema),
+            _ => {
+                let inputs = utils::inputs(plan);
+                let expressions = utils::expressions(plan);
+
+                // apply the optimization to all inputs of the plan
+                let new_inputs = inputs
+                    .iter()
+                    .map(|plan| self.optimize(*plan))
+                    .collect::<Result<Vec<_>>>()?;
+                // re-write all expressions on this plan.
+                // This assumes a single input, [0]. It wont work for join, subqueries and union operations with more than one input.
+                // It is currently not an issue as we do not have any plan with more than one input.
+                assert!(
+                    expressions.len() == 0 || inputs.len() > 0,
+                    "Assume that all plan nodes with expressions have inputs"
+                );
+
+                let new_expressions = expressions
+                    .iter()
+                    .map(|expr| self.rewrite_expr(expr, inputs[0].schema()))
+                    .collect::<Result<Vec<_>>>()?;
+
+                utils::from_plan(plan, &new_expressions, &new_inputs)
+            }
         }
     }
 
@@ -211,7 +152,7 @@ mod tests {
     use super::*;
     use crate::execution::context::ExecutionContext;
     use crate::execution::physical_plan::csv::CsvReadOptions;
-    use crate::logicalplan::{aggregate_expr, col, lit, Operator};
+    use crate::logicalplan::{aggregate_expr, col, lit, LogicalPlanBuilder, Operator};
     use crate::test::arrow_testdata_path;
     use arrow::datatypes::{DataType, Field, Schema};
 
diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs
index c6227a1..9195e19 100644
--- a/rust/datafusion/src/optimizer/utils.rs
+++ b/rust/datafusion/src/optimizer/utils.rs
@@ -213,6 +213,165 @@ pub fn optimize_explain(
     })
 }
 
+/// returns all expressions (non-recursively) in the current logical plan node.
+pub fn expressions(plan: &LogicalPlan) -> Vec<Expr> {
+    match plan {
+        LogicalPlan::Projection { expr, .. } => expr.clone(),
+        LogicalPlan::Selection { expr, .. } => vec![expr.clone()],
+        LogicalPlan::Aggregate {
+            group_expr,
+            aggr_expr,
+            ..
+        } => {
+            let mut result = group_expr.clone();
+            result.extend(aggr_expr.clone());
+            result
+        }
+        LogicalPlan::Sort { expr, .. } => expr.clone(),
+        // plans without expressions
+        LogicalPlan::TableScan { .. }
+        | LogicalPlan::InMemoryScan { .. }
+        | LogicalPlan::ParquetScan { .. }
+        | LogicalPlan::CsvScan { .. }
+        | LogicalPlan::EmptyRelation { .. }
+        | LogicalPlan::Limit { .. }
+        | LogicalPlan::CreateExternalTable { .. }
+        | LogicalPlan::Explain { .. } => vec![],
+    }
+}
+
+/// returns all inputs in the logical plan
+pub fn inputs(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection { input, .. } => vec![input],
+        LogicalPlan::Selection { input, .. } => vec![input],
+        LogicalPlan::Aggregate { input, .. } => vec![input],
+        LogicalPlan::Sort { input, .. } => vec![input],
+        LogicalPlan::Limit { input, .. } => vec![input],
+        // plans without inputs
+        LogicalPlan::TableScan { .. }
+        | LogicalPlan::InMemoryScan { .. }
+        | LogicalPlan::ParquetScan { .. }
+        | LogicalPlan::CsvScan { .. }
+        | LogicalPlan::EmptyRelation { .. }
+        | LogicalPlan::CreateExternalTable { .. }
+        | LogicalPlan::Explain { .. } => vec![],
+    }
+}
+
+/// Returns a new logical plan based on the original one with inputs and expressions replaced
+pub fn from_plan(
+    plan: &LogicalPlan,
+    expr: &Vec<Expr>,
+    inputs: &Vec<LogicalPlan>,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection { schema, .. } => Ok(LogicalPlan::Projection {
+            expr: expr.clone(),
+            input: Box::new(inputs[0].clone()),
+            schema: schema.clone(),
+        }),
+        LogicalPlan::Selection { .. } => Ok(LogicalPlan::Selection {
+            expr: expr[0].clone(),
+            input: Box::new(inputs[0].clone()),
+        }),
+        LogicalPlan::Aggregate {
+            group_expr, schema, ..
+        } => Ok(LogicalPlan::Aggregate {
+            group_expr: expr[0..group_expr.len()].to_vec(),
+            aggr_expr: expr[group_expr.len()..].to_vec(),
+            input: Box::new(inputs[0].clone()),
+            schema: schema.clone(),
+        }),
+        LogicalPlan::Sort { .. } => Ok(LogicalPlan::Sort {
+            expr: expr.clone(),
+            input: Box::new(inputs[0].clone()),
+        }),
+        LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit {
+            n: *n,
+            input: Box::new(inputs[0].clone()),
+        }),
+        LogicalPlan::EmptyRelation { .. }
+        | LogicalPlan::TableScan { .. }
+        | LogicalPlan::InMemoryScan { .. }
+        | LogicalPlan::ParquetScan { .. }
+        | LogicalPlan::CsvScan { .. }
+        | LogicalPlan::CreateExternalTable { .. }
+        | LogicalPlan::Explain { .. } => Ok(plan.clone()),
+    }
+}
+
+/// Returns all direct children `Expression`s of `expr`.
+/// E.g. if the expression is "(a + 1) + 1", it returns ["a + 1", "1"] (as Expr objects)
+pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<&Expr>> {
+    match expr {
+        Expr::BinaryExpr { left, right, .. } => Ok(vec![left, right]),
+        Expr::IsNull(e) => Ok(vec![e]),
+        Expr::IsNotNull(e) => Ok(vec![e]),
+        Expr::ScalarFunction { args, .. } => Ok(args.iter().collect()),
+        Expr::AggregateFunction { args, .. } => Ok(args.iter().collect()),
+        Expr::Cast { expr, .. } => Ok(vec![expr]),
+        Expr::Column(_) => Ok(vec![]),
+        Expr::Alias(expr, ..) => Ok(vec![expr]),
+        Expr::Literal(_) => Ok(vec![]),
+        Expr::Not(expr) => Ok(vec![expr]),
+        Expr::Sort { expr, .. } => Ok(vec![expr]),
+        Expr::Wildcard { .. } => Err(ExecutionError::General(
+            "Wildcard expressions are not valid in a logical query plan".to_owned(),
+        )),
+        Expr::Nested(expr) => Ok(vec![expr]),
+    }
+}
+
+/// returns a new expression where the expressions in expr are replaced by the ones in `expr`.
+/// This is used in conjunction with ``expr_expressions`` to re-write expressions.
+pub fn rewrite_expression(expr: &Expr, expressions: &Vec<Expr>) -> Result<Expr> {
+    match expr {
+        Expr::BinaryExpr { op, .. } => Ok(Expr::BinaryExpr {
+            left: Box::new(expressions[0].clone()),
+            op: op.clone(),
+            right: Box::new(expressions[1].clone()),
+        }),
+        Expr::IsNull(_) => Ok(Expr::IsNull(Box::new(expressions[0].clone()))),
+        Expr::IsNotNull(_) => Ok(Expr::IsNotNull(Box::new(expressions[0].clone()))),
+        Expr::ScalarFunction {
+            name, return_type, ..
+        } => Ok(Expr::ScalarFunction {
+            name: name.clone(),
+            return_type: return_type.clone(),
+            args: expressions.clone(),
+        }),
+        Expr::AggregateFunction {
+            name, return_type, ..
+        } => Ok(Expr::AggregateFunction {
+            name: name.clone(),
+            return_type: return_type.clone(),
+            args: expressions.clone(),
+        }),
+        Expr::Cast { data_type, .. } => Ok(Expr::Cast {
+            expr: Box::new(expressions[0].clone()),
+            data_type: data_type.clone(),
+        }),
+        Expr::Alias(_, alias) => {
+            Ok(Expr::Alias(Box::new(expressions[0].clone()), alias.clone()))
+        }
+        Expr::Not(_) => Ok(Expr::Not(Box::new(expressions[0].clone()))),
+        Expr::Column(_) => Ok(expr.clone()),
+        Expr::Literal(_) => Ok(expr.clone()),
+        Expr::Sort {
+            asc, nulls_first, ..
+        } => Ok(Expr::Sort {
+            expr: Box::new(expressions[0].clone()),
+            asc: asc.clone(),
+            nulls_first: nulls_first.clone(),
+        }),
+        Expr::Wildcard { .. } => Err(ExecutionError::General(
+            "Wildcard expressions are not valid in a logical query plan".to_owned(),
+        )),
+        Expr::Nested(_) => Ok(Expr::Nested(Box::new(expressions[0].clone()))),
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;