You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/06 07:10:44 UTC

[arrow-datafusion] branch datafusion-expr-expr created (now e335245)

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a change to branch datafusion-expr-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git.


      at e335245  expr expr

This branch includes the following new commits:

     new e335245  expr expr

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[arrow-datafusion] 01/01: expr expr

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch datafusion-expr-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit e3352451372d4df03d2152831a3e9682e5022d93
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 15:10:33 2022 +0800

    expr expr
---
 datafusion-expr/Cargo.toml          |    1 +
 datafusion-expr/src/expr.rs         |  981 ++++++++++++++++++++++++++++++++++
 datafusion-expr/src/lib.rs          |    4 +
 datafusion-expr/src/operator.rs     |  168 ++++++
 datafusion/src/logical_plan/expr.rs | 1007 +----------------------------------
 5 files changed, 1156 insertions(+), 1005 deletions(-)

diff --git a/datafusion-expr/Cargo.toml b/datafusion-expr/Cargo.toml
index 3cac735..964c324 100644
--- a/datafusion-expr/Cargo.toml
+++ b/datafusion-expr/Cargo.toml
@@ -38,3 +38,4 @@ path = "src/lib.rs"
 [dependencies]
 datafusion-common = { path = "../datafusion-common" }
 arrow = { version = "8.0.0", features = ["prettyprint"] }
+ahash = { version = "0.7", default-features = false }
diff --git a/datafusion-expr/src/expr.rs b/datafusion-expr/src/expr.rs
new file mode 100644
index 0000000..ba3cb04
--- /dev/null
+++ b/datafusion-expr/src/expr.rs
@@ -0,0 +1,981 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt;
+use std::ops::Not;
+use std::sync::Arc;
+use std::hash::{BuildHasher, Hash, Hasher};
+use datafusion_common::Result;
+
+//! Expressions
+
+/// `Expr` is a central struct of DataFusion's query API, and
+/// represent logical expressions such as `A + 1`, or `CAST(c1 AS
+/// int)`.
+///
+/// An `Expr` can compute its [DataType](arrow::datatypes::DataType)
+/// and nullability, and has functions for building up complex
+/// expressions.
+///
+/// # Examples
+///
+/// ## Create an expression `c1` referring to column named "c1"
+/// ```
+/// # use datafusion::logical_plan::*;
+/// let expr = col("c1");
+/// assert_eq!(expr, Expr::Column(Column::from_name("c1")));
+/// ```
+///
+/// ## Create the expression `c1 + c2` to add columns "c1" and "c2" together
+/// ```
+/// # use datafusion::logical_plan::*;
+/// let expr = col("c1") + col("c2");
+///
+/// assert!(matches!(expr, Expr::BinaryExpr { ..} ));
+/// if let Expr::BinaryExpr { left, right, op } = expr {
+///   assert_eq!(*left, col("c1"));
+///   assert_eq!(*right, col("c2"));
+///   assert_eq!(op, Operator::Plus);
+/// }
+/// ```
+///
+/// ## Create expression `c1 = 42` to compare the value in coumn "c1" to the literal value `42`
+/// ```
+/// # use datafusion::logical_plan::*;
+/// # use datafusion::scalar::*;
+/// let expr = col("c1").eq(lit(42));
+///
+/// assert!(matches!(expr, Expr::BinaryExpr { ..} ));
+/// if let Expr::BinaryExpr { left, right, op } = expr {
+///   assert_eq!(*left, col("c1"));
+///   let scalar = ScalarValue::Int32(Some(42));
+///   assert_eq!(*right, Expr::Literal(scalar));
+///   assert_eq!(op, Operator::Eq);
+/// }
+/// ```
+#[derive(Clone, PartialEq, Hash)]
+pub enum Expr {
+  /// An expression with a specific name.
+  Alias(Box<Expr>, String),
+  /// A named reference to a qualified filed in a schema.
+  Column(Column),
+  /// A named reference to a variable in a registry.
+  ScalarVariable(Vec<String>),
+  /// A constant value.
+  Literal(ScalarValue),
+  /// A binary expression such as "age > 21"
+  BinaryExpr {
+    /// Left-hand side of the expression
+    left: Box<Expr>,
+    /// The comparison operator
+    op: Operator,
+    /// Right-hand side of the expression
+    right: Box<Expr>,
+  },
+  /// Negation of an expression. The expression's type must be a boolean to make sense.
+  Not(Box<Expr>),
+  /// Whether an expression is not Null. This expression is never null.
+  IsNotNull(Box<Expr>),
+  /// Whether an expression is Null. This expression is never null.
+  IsNull(Box<Expr>),
+  /// arithmetic negation of an expression, the operand must be of a signed numeric data type
+  Negative(Box<Expr>),
+  /// Returns the field of a [`ListArray`] or [`StructArray`] by key
+  GetIndexedField {
+    /// the expression to take the field from
+    expr: Box<Expr>,
+    /// The name of the field to take
+    key: ScalarValue,
+  },
+  /// Whether an expression is between a given range.
+  Between {
+    /// The value to compare
+    expr: Box<Expr>,
+    /// Whether the expression is negated
+    negated: bool,
+    /// The low end of the range
+    low: Box<Expr>,
+    /// The high end of the range
+    high: Box<Expr>,
+  },
+  /// The CASE expression is similar to a series of nested if/else and there are two forms that
+  /// can be used. The first form consists of a series of boolean "when" expressions with
+  /// corresponding "then" expressions, and an optional "else" expression.
+  ///
+  /// CASE WHEN condition THEN result
+  ///      [WHEN ...]
+  ///      [ELSE result]
+  /// END
+  ///
+  /// The second form uses a base expression and then a series of "when" clauses that match on a
+  /// literal value.
+  ///
+  /// CASE expression
+  ///     WHEN value THEN result
+  ///     [WHEN ...]
+  ///     [ELSE result]
+  /// END
+  Case {
+    /// Optional base expression that can be compared to literal values in the "when" expressions
+    expr: Option<Box<Expr>>,
+    /// One or more when/then expressions
+    when_then_expr: Vec<(Box<Expr>, Box<Expr>)>,
+    /// Optional "else" expression
+    else_expr: Option<Box<Expr>>,
+  },
+  /// Casts the expression to a given type and will return a runtime error if the expression cannot be cast.
+  /// This expression is guaranteed to have a fixed type.
+  Cast {
+    /// The expression being cast
+    expr: Box<Expr>,
+    /// The `DataType` the expression will yield
+    data_type: DataType,
+  },
+  /// Casts the expression to a given type and will return a null value if the expression cannot be cast.
+  /// This expression is guaranteed to have a fixed type.
+  TryCast {
+    /// The expression being cast
+    expr: Box<Expr>,
+    /// The `DataType` the expression will yield
+    data_type: DataType,
+  },
+  /// A sort expression, that can be used to sort values.
+  Sort {
+    /// The expression to sort on
+    expr: Box<Expr>,
+    /// The direction of the sort
+    asc: bool,
+    /// Whether to put Nulls before all other data values
+    nulls_first: bool,
+  },
+  /// Represents the call of a built-in scalar function with a set of arguments.
+  ScalarFunction {
+    /// The function
+    fun: functions::BuiltinScalarFunction,
+    /// List of expressions to feed to the functions as arguments
+    args: Vec<Expr>,
+  },
+  /// Represents the call of a user-defined scalar function with arguments.
+  ScalarUDF {
+    /// The function
+    fun: Arc<ScalarUDF>,
+    /// List of expressions to feed to the functions as arguments
+    args: Vec<Expr>,
+  },
+  /// Represents the call of an aggregate built-in function with arguments.
+  AggregateFunction {
+    /// Name of the function
+    fun: aggregates::AggregateFunction,
+    /// List of expressions to feed to the functions as arguments
+    args: Vec<Expr>,
+    /// Whether this is a DISTINCT aggregation or not
+    distinct: bool,
+  },
+  /// Represents the call of a window function with arguments.
+  WindowFunction {
+    /// Name of the function
+    fun: window_functions::WindowFunction,
+    /// List of expressions to feed to the functions as arguments
+    args: Vec<Expr>,
+    /// List of partition by expressions
+    partition_by: Vec<Expr>,
+    /// List of order by expressions
+    order_by: Vec<Expr>,
+    /// Window frame
+    window_frame: Option<window_frames::WindowFrame>,
+  },
+  /// aggregate function
+  AggregateUDF {
+    /// The function
+    fun: Arc<AggregateUDF>,
+    /// List of expressions to feed to the functions as arguments
+    args: Vec<Expr>,
+  },
+  /// Returns whether the list contains the expr value.
+  InList {
+    /// The expression to compare
+    expr: Box<Expr>,
+    /// A list of values to compare against
+    list: Vec<Expr>,
+    /// Whether the expression is negated
+    negated: bool,
+  },
+  /// Represents a reference to all fields in a schema.
+  Wildcard,
+}
+
+/// Fixed seed for the hashing so that Ords are consistent across runs
+const SEED: ahash::RandomState = ahash::RandomState::with_seeds(0, 0, 0, 0);
+
+impl PartialOrd for Expr {
+  fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+    let mut hasher = SEED.build_hasher();
+    self.hash(&mut hasher);
+    let s = hasher.finish();
+
+    let mut hasher = SEED.build_hasher();
+    other.hash(&mut hasher);
+    let o = hasher.finish();
+
+    Some(s.cmp(&o))
+  }
+}
+
+impl Expr {
+  /// Returns the [arrow::datatypes::DataType] of the expression
+  /// based on [ExprSchema]
+  ///
+  /// Note: [DFSchema] implements [ExprSchema].
+  ///
+  /// # Errors
+  ///
+  /// This function errors when it is not possible to compute its
+  /// [arrow::datatypes::DataType].  This happens when e.g. the
+  /// expression refers to a column that does not exist in the
+  /// schema, or when the expression is incorrectly typed
+  /// (e.g. `[utf8] + [bool]`).
+  pub fn get_type<S: ExprSchema>(&self, schema: &S) -> Result<DataType> {
+    match self {
+      Expr::Alias(expr, _) | Expr::Sort { expr, .. } | Expr::Negative(expr) => {
+        expr.get_type(schema)
+      }
+      Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
+      Expr::ScalarVariable(_) => Ok(DataType::Utf8),
+      Expr::Literal(l) => Ok(l.get_datatype()),
+      Expr::Case { when_then_expr, .. } => when_then_expr[0].1.get_type(schema),
+      Expr::Cast { data_type, .. } | Expr::TryCast { data_type, .. } => {
+        Ok(data_type.clone())
+      }
+      Expr::ScalarUDF { fun, args } => {
+        let data_types = args
+          .iter()
+          .map(|e| e.get_type(schema))
+          .collect::<Result<Vec<_>>>()?;
+        Ok((fun.return_type)(&data_types)?.as_ref().clone())
+      }
+      Expr::ScalarFunction { fun, args } => {
+        let data_types = args
+          .iter()
+          .map(|e| e.get_type(schema))
+          .collect::<Result<Vec<_>>>()?;
+        functions::return_type(fun, &data_types)
+      }
+      Expr::WindowFunction { fun, args, .. } => {
+        let data_types = args
+          .iter()
+          .map(|e| e.get_type(schema))
+          .collect::<Result<Vec<_>>>()?;
+        window_functions::return_type(fun, &data_types)
+      }
+      Expr::AggregateFunction { fun, args, .. } => {
+        let data_types = args
+          .iter()
+          .map(|e| e.get_type(schema))
+          .collect::<Result<Vec<_>>>()?;
+        aggregates::return_type(fun, &data_types)
+      }
+      Expr::AggregateUDF { fun, args, .. } => {
+        let data_types = args
+          .iter()
+          .map(|e| e.get_type(schema))
+          .collect::<Result<Vec<_>>>()?;
+        Ok((fun.return_type)(&data_types)?.as_ref().clone())
+      }
+      Expr::Not(_)
+      | Expr::IsNull(_)
+      | Expr::Between { .. }
+      | Expr::InList { .. }
+      | Expr::IsNotNull(_) => Ok(DataType::Boolean),
+      Expr::BinaryExpr {
+        ref left,
+        ref right,
+        ref op,
+      } => {
+        binary_operator_data_type(&left.get_type(schema)?, op, &right.get_type(schema)?)
+      }
+      Expr::Wildcard => Err(DataFusionError::Internal(
+        "Wildcard expressions are not valid in a logical query plan".to_owned(),
+      )),
+      Expr::GetIndexedField { ref expr, key } => {
+        let data_type = expr.get_type(schema)?;
+
+        get_indexed_field(&data_type, key).map(|x| x.data_type().clone())
+      }
+    }
+  }
+
+  /// Returns the nullability of the expression based on [ExprSchema].
+  ///
+  /// Note: [DFSchema] implements [ExprSchema].
+  ///
+  /// # Errors
+  ///
+  /// This function errors when it is not possible to compute its
+  /// nullability.  This happens when the expression refers to a
+  /// column that does not exist in the schema.
+  pub fn nullable<S: ExprSchema>(&self, input_schema: &S) -> Result<bool> {
+    match self {
+      Expr::Alias(expr, _)
+      | Expr::Not(expr)
+      | Expr::Negative(expr)
+      | Expr::Sort { expr, .. }
+      | Expr::Between { expr, .. }
+      | Expr::InList { expr, .. } => expr.nullable(input_schema),
+      Expr::Column(c) => input_schema.nullable(c),
+      Expr::Literal(value) => Ok(value.is_null()),
+      Expr::Case {
+        when_then_expr,
+        else_expr,
+        ..
+      } => {
+        // this expression is nullable if any of the input expressions are nullable
+        let then_nullable = when_then_expr
+          .iter()
+          .map(|(_, t)| t.nullable(input_schema))
+          .collect::<Result<Vec<_>>>()?;
+        if then_nullable.contains(&true) {
+          Ok(true)
+        } else if let Some(e) = else_expr {
+          e.nullable(input_schema)
+        } else {
+          Ok(false)
+        }
+      }
+      Expr::Cast { expr, .. } => expr.nullable(input_schema),
+      Expr::ScalarVariable(_)
+      | Expr::TryCast { .. }
+      | Expr::ScalarFunction { .. }
+      | Expr::ScalarUDF { .. }
+      | Expr::WindowFunction { .. }
+      | Expr::AggregateFunction { .. }
+      | Expr::AggregateUDF { .. } => Ok(true),
+      Expr::IsNull(_) | Expr::IsNotNull(_) => Ok(false),
+      Expr::BinaryExpr {
+        ref left,
+        ref right,
+        ..
+      } => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
+      Expr::Wildcard => Err(DataFusionError::Internal(
+        "Wildcard expressions are not valid in a logical query plan".to_owned(),
+      )),
+      Expr::GetIndexedField { ref expr, key } => {
+        let data_type = expr.get_type(input_schema)?;
+        get_indexed_field(&data_type, key).map(|x| x.is_nullable())
+      }
+    }
+  }
+
+  /// Returns the name of this expression based on [crate::logical_plan::DFSchema].
+  ///
+  /// This represents how a column with this expression is named when no alias is chosen
+  pub fn name(&self, input_schema: &DFSchema) -> Result<String> {
+    create_name(self, input_schema)
+  }
+
+  /// Returns a [arrow::datatypes::Field] compatible with this expression.
+  pub fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
+    match self {
+      Expr::Column(c) => Ok(DFField::new(
+        c.relation.as_deref(),
+        &c.name,
+        self.get_type(input_schema)?,
+        self.nullable(input_schema)?,
+      )),
+      _ => Ok(DFField::new(
+        None,
+        &self.name(input_schema)?,
+        self.get_type(input_schema)?,
+        self.nullable(input_schema)?,
+      )),
+    }
+  }
+
+  /// Wraps this expression in a cast to a target [arrow::datatypes::DataType].
+  ///
+  /// # Errors
+  ///
+  /// This function errors when it is impossible to cast the
+  /// expression to the target [arrow::datatypes::DataType].
+  pub fn cast_to<S: ExprSchema>(
+    self,
+    cast_to_type: &DataType,
+    schema: &S,
+  ) -> Result<Expr> {
+    // TODO(kszucs): most of the operations do not validate the type correctness
+    // like all of the binary expressions below. Perhaps Expr should track the
+    // type of the expression?
+    let this_type = self.get_type(schema)?;
+    if this_type == *cast_to_type {
+      Ok(self)
+    } else if can_cast_types(&this_type, cast_to_type) {
+      Ok(Expr::Cast {
+        expr: Box::new(self),
+        data_type: cast_to_type.clone(),
+      })
+    } else {
+      Err(DataFusionError::Plan(format!(
+        "Cannot automatically convert {:?} to {:?}",
+        this_type, cast_to_type
+      )))
+    }
+  }
+
+  /// Return `self == other`
+  pub fn eq(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::Eq, other)
+  }
+
+  /// Return `self != other`
+  pub fn not_eq(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::NotEq, other)
+  }
+
+  /// Return `self > other`
+  pub fn gt(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::Gt, other)
+  }
+
+  /// Return `self >= other`
+  pub fn gt_eq(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::GtEq, other)
+  }
+
+  /// Return `self < other`
+  pub fn lt(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::Lt, other)
+  }
+
+  /// Return `self <= other`
+  pub fn lt_eq(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::LtEq, other)
+  }
+
+  /// Return `self && other`
+  pub fn and(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::And, other)
+  }
+
+  /// Return `self || other`
+  pub fn or(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::Or, other)
+  }
+
+  /// Return `!self`
+  #[allow(clippy::should_implement_trait)]
+  pub fn not(self) -> Expr {
+    !self
+  }
+
+  /// Calculate the modulus of two expressions.
+  /// Return `self % other`
+  pub fn modulus(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::Modulo, other)
+  }
+
+  /// Return `self LIKE other`
+  pub fn like(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::Like, other)
+  }
+
+  /// Return `self NOT LIKE other`
+  pub fn not_like(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::NotLike, other)
+  }
+
+  /// Return `self AS name` alias expression
+  pub fn alias(self, name: &str) -> Expr {
+    Expr::Alias(Box::new(self), name.to_owned())
+  }
+
+  /// Return `self IN <list>` if `negated` is false, otherwise
+  /// return `self NOT IN <list>`.a
+  pub fn in_list(self, list: Vec<Expr>, negated: bool) -> Expr {
+    Expr::InList {
+      expr: Box::new(self),
+      list,
+      negated,
+    }
+  }
+
+  /// Return `IsNull(Box(self))
+  #[allow(clippy::wrong_self_convention)]
+  pub fn is_null(self) -> Expr {
+    Expr::IsNull(Box::new(self))
+  }
+
+  /// Return `IsNotNull(Box(self))
+  #[allow(clippy::wrong_self_convention)]
+  pub fn is_not_null(self) -> Expr {
+    Expr::IsNotNull(Box::new(self))
+  }
+
+  /// Create a sort expression from an existing expression.
+  ///
+  /// ```
+  /// # use datafusion::logical_plan::col;
+  /// let sort_expr = col("foo").sort(true, true); // SORT ASC NULLS_FIRST
+  /// ```
+  pub fn sort(self, asc: bool, nulls_first: bool) -> Expr {
+    Expr::Sort {
+      expr: Box::new(self),
+      asc,
+      nulls_first,
+    }
+  }
+
+  /// Performs a depth first walk of an expression and
+  /// its children, calling [`ExpressionVisitor::pre_visit`] and
+  /// `visitor.post_visit`.
+  ///
+  /// Implements the [visitor pattern](https://en.wikipedia.org/wiki/Visitor_pattern) to
+  /// separate expression algorithms from the structure of the
+  /// `Expr` tree and make it easier to add new types of expressions
+  /// and algorithms that walk the tree.
+  ///
+  /// For an expression tree such as
+  /// ```text
+  /// BinaryExpr (GT)
+  ///    left: Column("foo")
+  ///    right: Column("bar")
+  /// ```
+  ///
+  /// The nodes are visited using the following order
+  /// ```text
+  /// pre_visit(BinaryExpr(GT))
+  /// pre_visit(Column("foo"))
+  /// pre_visit(Column("bar"))
+  /// post_visit(Column("bar"))
+  /// post_visit(Column("bar"))
+  /// post_visit(BinaryExpr(GT))
+  /// ```
+  ///
+  /// If an Err result is returned, recursion is stopped immediately
+  ///
+  /// If `Recursion::Stop` is returned on a call to pre_visit, no
+  /// children of that expression are visited, nor is post_visit
+  /// called on that expression
+  ///
+  pub fn accept<V: ExpressionVisitor>(&self, visitor: V) -> Result<V> {
+    let visitor = match visitor.pre_visit(self)? {
+      Recursion::Continue(visitor) => visitor,
+      // If the recursion should stop, do not visit children
+      Recursion::Stop(visitor) => return Ok(visitor),
+    };
+
+    // recurse (and cover all expression types)
+    let visitor = match self {
+      Expr::Alias(expr, _)
+      | Expr::Not(expr)
+      | Expr::IsNotNull(expr)
+      | Expr::IsNull(expr)
+      | Expr::Negative(expr)
+      | Expr::Cast { expr, .. }
+      | Expr::TryCast { expr, .. }
+      | Expr::Sort { expr, .. }
+      | Expr::GetIndexedField { expr, .. } => expr.accept(visitor),
+      Expr::Column(_) | Expr::ScalarVariable(_) | Expr::Literal(_) | Expr::Wildcard => {
+        Ok(visitor)
+      }
+      Expr::BinaryExpr { left, right, .. } => {
+        let visitor = left.accept(visitor)?;
+        right.accept(visitor)
+      }
+      Expr::Between {
+        expr, low, high, ..
+      } => {
+        let visitor = expr.accept(visitor)?;
+        let visitor = low.accept(visitor)?;
+        high.accept(visitor)
+      }
+      Expr::Case {
+        expr,
+        when_then_expr,
+        else_expr,
+      } => {
+        let visitor = if let Some(expr) = expr.as_ref() {
+          expr.accept(visitor)
+        } else {
+          Ok(visitor)
+        }?;
+        let visitor =
+          when_then_expr
+            .iter()
+            .try_fold(visitor, |visitor, (when, then)| {
+              let visitor = when.accept(visitor)?;
+              then.accept(visitor)
+            })?;
+        if let Some(else_expr) = else_expr.as_ref() {
+          else_expr.accept(visitor)
+        } else {
+          Ok(visitor)
+        }
+      }
+      Expr::ScalarFunction { args, .. }
+      | Expr::ScalarUDF { args, .. }
+      | Expr::AggregateFunction { args, .. }
+      | Expr::AggregateUDF { args, .. } => args
+        .iter()
+        .try_fold(visitor, |visitor, arg| arg.accept(visitor)),
+      Expr::WindowFunction {
+        args,
+        partition_by,
+        order_by,
+        ..
+      } => {
+        let visitor = args
+          .iter()
+          .try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
+        let visitor = partition_by
+          .iter()
+          .try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
+        let visitor = order_by
+          .iter()
+          .try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
+        Ok(visitor)
+      }
+      Expr::InList { expr, list, .. } => {
+        let visitor = expr.accept(visitor)?;
+        list
+          .iter()
+          .try_fold(visitor, |visitor, arg| arg.accept(visitor))
+      }
+    }?;
+
+    visitor.post_visit(self)
+  }
+
+  /// Performs a depth first walk of an expression and its children
+  /// to rewrite an expression, consuming `self` producing a new
+  /// [`Expr`].
+  ///
+  /// Implements a modified version of the [visitor
+  /// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) to
+  /// separate algorithms from the structure of the `Expr` tree and
+  /// make it easier to write new, efficient expression
+  /// transformation algorithms.
+  ///
+  /// For an expression tree such as
+  /// ```text
+  /// BinaryExpr (GT)
+  ///    left: Column("foo")
+  ///    right: Column("bar")
+  /// ```
+  ///
+  /// The nodes are visited using the following order
+  /// ```text
+  /// pre_visit(BinaryExpr(GT))
+  /// pre_visit(Column("foo"))
+  /// mutatate(Column("foo"))
+  /// pre_visit(Column("bar"))
+  /// mutate(Column("bar"))
+  /// mutate(BinaryExpr(GT))
+  /// ```
+  ///
+  /// If an Err result is returned, recursion is stopped immediately
+  ///
+  /// If [`false`] is returned on a call to pre_visit, no
+  /// children of that expression are visited, nor is mutate
+  /// called on that expression
+  ///
+  pub fn rewrite<R>(self, rewriter: &mut R) -> Result<Self>
+  where
+    R: ExprRewriter,
+  {
+    let need_mutate = match rewriter.pre_visit(&self)? {
+      RewriteRecursion::Mutate => return rewriter.mutate(self),
+      RewriteRecursion::Stop => return Ok(self),
+      RewriteRecursion::Continue => true,
+      RewriteRecursion::Skip => false,
+    };
+
+    // recurse into all sub expressions(and cover all expression types)
+    let expr = match self {
+      Expr::Alias(expr, name) => Expr::Alias(rewrite_boxed(expr, rewriter)?, name),
+      Expr::Column(_) => self.clone(),
+      Expr::ScalarVariable(names) => Expr::ScalarVariable(names),
+      Expr::Literal(value) => Expr::Literal(value),
+      Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr {
+        left: rewrite_boxed(left, rewriter)?,
+        op,
+        right: rewrite_boxed(right, rewriter)?,
+      },
+      Expr::Not(expr) => Expr::Not(rewrite_boxed(expr, rewriter)?),
+      Expr::IsNotNull(expr) => Expr::IsNotNull(rewrite_boxed(expr, rewriter)?),
+      Expr::IsNull(expr) => Expr::IsNull(rewrite_boxed(expr, rewriter)?),
+      Expr::Negative(expr) => Expr::Negative(rewrite_boxed(expr, rewriter)?),
+      Expr::Between {
+        expr,
+        low,
+        high,
+        negated,
+      } => Expr::Between {
+        expr: rewrite_boxed(expr, rewriter)?,
+        low: rewrite_boxed(low, rewriter)?,
+        high: rewrite_boxed(high, rewriter)?,
+        negated,
+      },
+      Expr::Case {
+        expr,
+        when_then_expr,
+        else_expr,
+      } => {
+        let expr = rewrite_option_box(expr, rewriter)?;
+        let when_then_expr = when_then_expr
+          .into_iter()
+          .map(|(when, then)| {
+            Ok((
+              rewrite_boxed(when, rewriter)?,
+              rewrite_boxed(then, rewriter)?,
+            ))
+          })
+          .collect::<Result<Vec<_>>>()?;
+
+        let else_expr = rewrite_option_box(else_expr, rewriter)?;
+
+        Expr::Case {
+          expr,
+          when_then_expr,
+          else_expr,
+        }
+      }
+      Expr::Cast { expr, data_type } => Expr::Cast {
+        expr: rewrite_boxed(expr, rewriter)?,
+        data_type,
+      },
+      Expr::TryCast { expr, data_type } => Expr::TryCast {
+        expr: rewrite_boxed(expr, rewriter)?,
+        data_type,
+      },
+      Expr::Sort {
+        expr,
+        asc,
+        nulls_first,
+      } => Expr::Sort {
+        expr: rewrite_boxed(expr, rewriter)?,
+        asc,
+        nulls_first,
+      },
+      Expr::ScalarFunction { args, fun } => Expr::ScalarFunction {
+        args: rewrite_vec(args, rewriter)?,
+        fun,
+      },
+      Expr::ScalarUDF { args, fun } => Expr::ScalarUDF {
+        args: rewrite_vec(args, rewriter)?,
+        fun,
+      },
+      Expr::WindowFunction {
+        args,
+        fun,
+        partition_by,
+        order_by,
+        window_frame,
+      } => Expr::WindowFunction {
+        args: rewrite_vec(args, rewriter)?,
+        fun,
+        partition_by: rewrite_vec(partition_by, rewriter)?,
+        order_by: rewrite_vec(order_by, rewriter)?,
+        window_frame,
+      },
+      Expr::AggregateFunction {
+        args,
+        fun,
+        distinct,
+      } => Expr::AggregateFunction {
+        args: rewrite_vec(args, rewriter)?,
+        fun,
+        distinct,
+      },
+      Expr::AggregateUDF { args, fun } => Expr::AggregateUDF {
+        args: rewrite_vec(args, rewriter)?,
+        fun,
+      },
+      Expr::InList {
+        expr,
+        list,
+        negated,
+      } => Expr::InList {
+        expr: rewrite_boxed(expr, rewriter)?,
+        list: rewrite_vec(list, rewriter)?,
+        negated,
+      },
+      Expr::Wildcard => Expr::Wildcard,
+      Expr::GetIndexedField { expr, key } => Expr::GetIndexedField {
+        expr: rewrite_boxed(expr, rewriter)?,
+        key,
+      },
+    };
+
+    // now rewrite this expression itself
+    if need_mutate {
+      rewriter.mutate(expr)
+    } else {
+      Ok(expr)
+    }
+  }
+
+}
+
+impl Not for Expr {
+  type Output = Self;
+
+  fn not(self) -> Self::Output {
+    Expr::Not(Box::new(self))
+  }
+}
+
+impl std::fmt::Display for Expr {
+  fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+    match self {
+      Expr::BinaryExpr {
+        ref left,
+        ref right,
+        ref op,
+      } => write!(f, "{} {} {}", left, op, right),
+      Expr::AggregateFunction {
+        /// Name of the function
+        ref fun,
+        /// List of expressions to feed to the functions as arguments
+        ref args,
+        /// Whether this is a DISTINCT aggregation or not
+        ref distinct,
+      } => fmt_function(f, &fun.to_string(), *distinct, args, true),
+      Expr::ScalarFunction {
+        /// Name of the function
+        ref fun,
+        /// List of expressions to feed to the functions as arguments
+        ref args,
+      } => fmt_function(f, &fun.to_string(), false, args, true),
+      _ => write!(f, "{:?}", self),
+    }
+  }
+}
+
+
+/// Controls how the visitor recursion should proceed.
+pub enum Recursion<V: ExpressionVisitor> {
+  /// Attempt to visit all the children, recursively, of this expression.
+  Continue(V),
+  /// Do not visit the children of this expression, though the walk
+  /// of parents of this expression will not be affected
+  Stop(V),
+}
+
+/// Encode the traversal of an expression tree. When passed to
+/// `Expr::accept`, `ExpressionVisitor::visit` is invoked
+/// recursively on all nodes of an expression tree. See the comments
+/// on `Expr::accept` for details on its use
+pub trait ExpressionVisitor: Sized {
+  /// Invoked before any children of `expr` are visisted.
+  fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>>;
+
+  /// Invoked after all children of `expr` are visited. Default
+  /// implementation does nothing.
+  fn post_visit(self, _expr: &Expr) -> Result<Self> {
+      Ok(self)
+  }
+}
+
+
+/// Controls how the [ExprRewriter] recursion should proceed.
+pub enum RewriteRecursion {
+  /// Continue rewrite / visit this expression.
+  Continue,
+  /// Call [mutate()] immediately and return.
+  Mutate,
+  /// Do not rewrite / visit the children of this expression.
+  Stop,
+  /// Keep recursive but skip mutate on this expression
+  Skip,
+}
+
+/// Trait for potentially recursively rewriting an [`Expr`] expression
+/// tree. When passed to `Expr::rewrite`, `ExpressionVisitor::mutate` is
+/// invoked recursively on all nodes of an expression tree. See the
+/// comments on `Expr::rewrite` for details on its use
+pub trait ExprRewriter: Sized {
+  /// Invoked before any children of `expr` are rewritten /
+  /// visited. Default implementation returns `Ok(RewriteRecursion::Continue)`
+  fn pre_visit(&mut self, _expr: &Expr) -> Result<RewriteRecursion> {
+      Ok(RewriteRecursion::Continue)
+  }
+
+  /// Invoked after all children of `expr` have been mutated and
+  /// returns a potentially modified expr.
+  fn mutate(&mut self, expr: Expr) -> Result<Expr>;
+}
+
+fn fmt_function(
+  f: &mut fmt::Formatter,
+  fun: &str,
+  distinct: bool,
+  args: &[Expr],
+  display: bool,
+) -> fmt::Result {
+  let args: Vec<String> = match display {
+      true => args.iter().map(|arg| format!("{}", arg)).collect(),
+      false => args.iter().map(|arg| format!("{:?}", arg)).collect(),
+  };
+
+  // let args: Vec<String> = args.iter().map(|arg| format!("{:?}", arg)).collect();
+  let distinct_str = match distinct {
+      true => "DISTINCT ",
+      false => "",
+  };
+  write!(f, "{}({}{})", fun, distinct_str, args.join(", "))
+}
+
+#[allow(clippy::boxed_local)]
+fn rewrite_boxed<R>(boxed_expr: Box<Expr>, rewriter: &mut R) -> Result<Box<Expr>>
+where
+    R: ExprRewriter,
+{
+    // TODO: It might be possible to avoid an allocation (the
+    // Box::new) below by reusing the box.
+    let expr: Expr = *boxed_expr;
+    let rewritten_expr = expr.rewrite(rewriter)?;
+    Ok(Box::new(rewritten_expr))
+}
+
+fn rewrite_option_box<R>(
+  option_box: Option<Box<Expr>>,
+  rewriter: &mut R,
+) -> Result<Option<Box<Expr>>>
+where
+  R: ExprRewriter,
+{
+  option_box
+      .map(|expr| rewrite_boxed(expr, rewriter))
+      .transpose()
+}
+
+
+/// rewrite a `Vec` of `Expr`s with the rewriter
+fn rewrite_vec<R>(v: Vec<Expr>, rewriter: &mut R) -> Result<Vec<Expr>>
+where
+    R: ExprRewriter,
+{
+    v.into_iter().map(|expr| expr.rewrite(rewriter)).collect()
+}
+
+/// return a new expression l <op> r
+pub fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr {
+  Expr::BinaryExpr {
+      left: Box::new(l),
+      op,
+      right: Box::new(r),
+  }
+}
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index b6eaaf7..3561482 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -16,7 +16,11 @@
 // under the License.
 
 mod aggregate_function;
+mod expr;
+mod operator;
 mod window_function;
 
 pub use aggregate_function::AggregateFunction;
+pub use expr::Expr;
+pub use operator::Operator;
 pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion-expr/src/operator.rs b/datafusion-expr/src/operator.rs
new file mode 100644
index 0000000..37e23e7
--- /dev/null
+++ b/datafusion-expr/src/operator.rs
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{fmt, ops};
+
+use super::{binary_expr, Expr};
+
+/// Operators applied to expressions
+#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)]
+pub enum Operator {
+  /// Expressions are equal
+  Eq,
+  /// Expressions are not equal
+  NotEq,
+  /// Left side is smaller than right side
+  Lt,
+  /// Left side is smaller or equal to right side
+  LtEq,
+  /// Left side is greater than right side
+  Gt,
+  /// Left side is greater or equal to right side
+  GtEq,
+  /// Addition
+  Plus,
+  /// Subtraction
+  Minus,
+  /// Multiplication operator, like `*`
+  Multiply,
+  /// Division operator, like `/`
+  Divide,
+  /// Remainder operator, like `%`
+  Modulo,
+  /// Logical AND, like `&&`
+  And,
+  /// Logical OR, like `||`
+  Or,
+  /// Matches a wildcard pattern
+  Like,
+  /// Does not match a wildcard pattern
+  NotLike,
+  /// IS DISTINCT FROM
+  IsDistinctFrom,
+  /// IS NOT DISTINCT FROM
+  IsNotDistinctFrom,
+  /// Case sensitive regex match
+  RegexMatch,
+  /// Case insensitive regex match
+  RegexIMatch,
+  /// Case sensitive regex not match
+  RegexNotMatch,
+  /// Case insensitive regex not match
+  RegexNotIMatch,
+  /// Bitwise and, like `&`
+  BitwiseAnd,
+}
+
+impl fmt::Display for Operator {
+  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+    let display = match &self {
+      Operator::Eq => "=",
+      Operator::NotEq => "!=",
+      Operator::Lt => "<",
+      Operator::LtEq => "<=",
+      Operator::Gt => ">",
+      Operator::GtEq => ">=",
+      Operator::Plus => "+",
+      Operator::Minus => "-",
+      Operator::Multiply => "*",
+      Operator::Divide => "/",
+      Operator::Modulo => "%",
+      Operator::And => "AND",
+      Operator::Or => "OR",
+      Operator::Like => "LIKE",
+      Operator::NotLike => "NOT LIKE",
+      Operator::RegexMatch => "~",
+      Operator::RegexIMatch => "~*",
+      Operator::RegexNotMatch => "!~",
+      Operator::RegexNotIMatch => "!~*",
+      Operator::IsDistinctFrom => "IS DISTINCT FROM",
+      Operator::IsNotDistinctFrom => "IS NOT DISTINCT FROM",
+      Operator::BitwiseAnd => "&",
+    };
+    write!(f, "{}", display)
+  }
+}
+
+impl ops::Add for Expr {
+  type Output = Self;
+
+  fn add(self, rhs: Self) -> Self {
+    binary_expr(self, Operator::Plus, rhs)
+  }
+}
+
+impl ops::Sub for Expr {
+  type Output = Self;
+
+  fn sub(self, rhs: Self) -> Self {
+    binary_expr(self, Operator::Minus, rhs)
+  }
+}
+
+impl ops::Mul for Expr {
+  type Output = Self;
+
+  fn mul(self, rhs: Self) -> Self {
+    binary_expr(self, Operator::Multiply, rhs)
+  }
+}
+
+impl ops::Div for Expr {
+  type Output = Self;
+
+  fn div(self, rhs: Self) -> Self {
+    binary_expr(self, Operator::Divide, rhs)
+  }
+}
+
+impl ops::Rem for Expr {
+  type Output = Self;
+
+  fn rem(self, rhs: Self) -> Self {
+    binary_expr(self, Operator::Modulo, rhs)
+  }
+}
+
+#[cfg(test)]
+mod tests {
+  use crate::prelude::lit;
+
+  #[test]
+  fn test_operators() {
+    assert_eq!(
+      format!("{:?}", lit(1u32) + lit(2u32)),
+      "UInt32(1) + UInt32(2)"
+    );
+    assert_eq!(
+      format!("{:?}", lit(1u32) - lit(2u32)),
+      "UInt32(1) - UInt32(2)"
+    );
+    assert_eq!(
+      format!("{:?}", lit(1u32) * lit(2u32)),
+      "UInt32(1) * UInt32(2)"
+    );
+    assert_eq!(
+      format!("{:?}", lit(1u32) / lit(2u32)),
+      "UInt32(1) / UInt32(2)"
+    );
+    assert_eq!(
+      format!("{:?}", lit(1u32) % lit(2u32)),
+      "UInt32(1) % UInt32(2)"
+    );
+  }
+}
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index fba7d81..bcb5b1c 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -42,983 +42,7 @@ use std::hash::{BuildHasher, Hash, Hasher};
 use std::ops::Not;
 use std::sync::Arc;
 
-/// `Expr` is a central struct of DataFusion's query API, and
-/// represent logical expressions such as `A + 1`, or `CAST(c1 AS
-/// int)`.
-///
-/// An `Expr` can compute its [DataType](arrow::datatypes::DataType)
-/// and nullability, and has functions for building up complex
-/// expressions.
-///
-/// # Examples
-///
-/// ## Create an expression `c1` referring to column named "c1"
-/// ```
-/// # use datafusion::logical_plan::*;
-/// let expr = col("c1");
-/// assert_eq!(expr, Expr::Column(Column::from_name("c1")));
-/// ```
-///
-/// ## Create the expression `c1 + c2` to add columns "c1" and "c2" together
-/// ```
-/// # use datafusion::logical_plan::*;
-/// let expr = col("c1") + col("c2");
-///
-/// assert!(matches!(expr, Expr::BinaryExpr { ..} ));
-/// if let Expr::BinaryExpr { left, right, op } = expr {
-///   assert_eq!(*left, col("c1"));
-///   assert_eq!(*right, col("c2"));
-///   assert_eq!(op, Operator::Plus);
-/// }
-/// ```
-///
-/// ## Create expression `c1 = 42` to compare the value in coumn "c1" to the literal value `42`
-/// ```
-/// # use datafusion::logical_plan::*;
-/// # use datafusion::scalar::*;
-/// let expr = col("c1").eq(lit(42));
-///
-/// assert!(matches!(expr, Expr::BinaryExpr { ..} ));
-/// if let Expr::BinaryExpr { left, right, op } = expr {
-///   assert_eq!(*left, col("c1"));
-///   let scalar = ScalarValue::Int32(Some(42));
-///   assert_eq!(*right, Expr::Literal(scalar));
-///   assert_eq!(op, Operator::Eq);
-/// }
-/// ```
-#[derive(Clone, PartialEq, Hash)]
-pub enum Expr {
-    /// An expression with a specific name.
-    Alias(Box<Expr>, String),
-    /// A named reference to a qualified filed in a schema.
-    Column(Column),
-    /// A named reference to a variable in a registry.
-    ScalarVariable(Vec<String>),
-    /// A constant value.
-    Literal(ScalarValue),
-    /// A binary expression such as "age > 21"
-    BinaryExpr {
-        /// Left-hand side of the expression
-        left: Box<Expr>,
-        /// The comparison operator
-        op: Operator,
-        /// Right-hand side of the expression
-        right: Box<Expr>,
-    },
-    /// Negation of an expression. The expression's type must be a boolean to make sense.
-    Not(Box<Expr>),
-    /// Whether an expression is not Null. This expression is never null.
-    IsNotNull(Box<Expr>),
-    /// Whether an expression is Null. This expression is never null.
-    IsNull(Box<Expr>),
-    /// arithmetic negation of an expression, the operand must be of a signed numeric data type
-    Negative(Box<Expr>),
-    /// Returns the field of a [`ListArray`] or [`StructArray`] by key
-    GetIndexedField {
-        /// the expression to take the field from
-        expr: Box<Expr>,
-        /// The name of the field to take
-        key: ScalarValue,
-    },
-    /// Whether an expression is between a given range.
-    Between {
-        /// The value to compare
-        expr: Box<Expr>,
-        /// Whether the expression is negated
-        negated: bool,
-        /// The low end of the range
-        low: Box<Expr>,
-        /// The high end of the range
-        high: Box<Expr>,
-    },
-    /// The CASE expression is similar to a series of nested if/else and there are two forms that
-    /// can be used. The first form consists of a series of boolean "when" expressions with
-    /// corresponding "then" expressions, and an optional "else" expression.
-    ///
-    /// CASE WHEN condition THEN result
-    ///      [WHEN ...]
-    ///      [ELSE result]
-    /// END
-    ///
-    /// The second form uses a base expression and then a series of "when" clauses that match on a
-    /// literal value.
-    ///
-    /// CASE expression
-    ///     WHEN value THEN result
-    ///     [WHEN ...]
-    ///     [ELSE result]
-    /// END
-    Case {
-        /// Optional base expression that can be compared to literal values in the "when" expressions
-        expr: Option<Box<Expr>>,
-        /// One or more when/then expressions
-        when_then_expr: Vec<(Box<Expr>, Box<Expr>)>,
-        /// Optional "else" expression
-        else_expr: Option<Box<Expr>>,
-    },
-    /// Casts the expression to a given type and will return a runtime error if the expression cannot be cast.
-    /// This expression is guaranteed to have a fixed type.
-    Cast {
-        /// The expression being cast
-        expr: Box<Expr>,
-        /// The `DataType` the expression will yield
-        data_type: DataType,
-    },
-    /// Casts the expression to a given type and will return a null value if the expression cannot be cast.
-    /// This expression is guaranteed to have a fixed type.
-    TryCast {
-        /// The expression being cast
-        expr: Box<Expr>,
-        /// The `DataType` the expression will yield
-        data_type: DataType,
-    },
-    /// A sort expression, that can be used to sort values.
-    Sort {
-        /// The expression to sort on
-        expr: Box<Expr>,
-        /// The direction of the sort
-        asc: bool,
-        /// Whether to put Nulls before all other data values
-        nulls_first: bool,
-    },
-    /// Represents the call of a built-in scalar function with a set of arguments.
-    ScalarFunction {
-        /// The function
-        fun: functions::BuiltinScalarFunction,
-        /// List of expressions to feed to the functions as arguments
-        args: Vec<Expr>,
-    },
-    /// Represents the call of a user-defined scalar function with arguments.
-    ScalarUDF {
-        /// The function
-        fun: Arc<ScalarUDF>,
-        /// List of expressions to feed to the functions as arguments
-        args: Vec<Expr>,
-    },
-    /// Represents the call of an aggregate built-in function with arguments.
-    AggregateFunction {
-        /// Name of the function
-        fun: aggregates::AggregateFunction,
-        /// List of expressions to feed to the functions as arguments
-        args: Vec<Expr>,
-        /// Whether this is a DISTINCT aggregation or not
-        distinct: bool,
-    },
-    /// Represents the call of a window function with arguments.
-    WindowFunction {
-        /// Name of the function
-        fun: window_functions::WindowFunction,
-        /// List of expressions to feed to the functions as arguments
-        args: Vec<Expr>,
-        /// List of partition by expressions
-        partition_by: Vec<Expr>,
-        /// List of order by expressions
-        order_by: Vec<Expr>,
-        /// Window frame
-        window_frame: Option<window_frames::WindowFrame>,
-    },
-    /// aggregate function
-    AggregateUDF {
-        /// The function
-        fun: Arc<AggregateUDF>,
-        /// List of expressions to feed to the functions as arguments
-        args: Vec<Expr>,
-    },
-    /// Returns whether the list contains the expr value.
-    InList {
-        /// The expression to compare
-        expr: Box<Expr>,
-        /// A list of values to compare against
-        list: Vec<Expr>,
-        /// Whether the expression is negated
-        negated: bool,
-    },
-    /// Represents a reference to all fields in a schema.
-    Wildcard,
-}
-
-/// Fixed seed for the hashing so that Ords are consistent across runs
-const SEED: ahash::RandomState = ahash::RandomState::with_seeds(0, 0, 0, 0);
-
-impl PartialOrd for Expr {
-    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
-        let mut hasher = SEED.build_hasher();
-        self.hash(&mut hasher);
-        let s = hasher.finish();
-
-        let mut hasher = SEED.build_hasher();
-        other.hash(&mut hasher);
-        let o = hasher.finish();
-
-        Some(s.cmp(&o))
-    }
-}
-
-impl Expr {
-    /// Returns the [arrow::datatypes::DataType] of the expression
-    /// based on [ExprSchema]
-    ///
-    /// Note: [DFSchema] implements [ExprSchema].
-    ///
-    /// # Errors
-    ///
-    /// This function errors when it is not possible to compute its
-    /// [arrow::datatypes::DataType].  This happens when e.g. the
-    /// expression refers to a column that does not exist in the
-    /// schema, or when the expression is incorrectly typed
-    /// (e.g. `[utf8] + [bool]`).
-    pub fn get_type<S: ExprSchema>(&self, schema: &S) -> Result<DataType> {
-        match self {
-            Expr::Alias(expr, _) | Expr::Sort { expr, .. } | Expr::Negative(expr) => {
-                expr.get_type(schema)
-            }
-            Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
-            Expr::ScalarVariable(_) => Ok(DataType::Utf8),
-            Expr::Literal(l) => Ok(l.get_datatype()),
-            Expr::Case { when_then_expr, .. } => when_then_expr[0].1.get_type(schema),
-            Expr::Cast { data_type, .. } | Expr::TryCast { data_type, .. } => {
-                Ok(data_type.clone())
-            }
-            Expr::ScalarUDF { fun, args } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                Ok((fun.return_type)(&data_types)?.as_ref().clone())
-            }
-            Expr::ScalarFunction { fun, args } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                functions::return_type(fun, &data_types)
-            }
-            Expr::WindowFunction { fun, args, .. } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                window_functions::return_type(fun, &data_types)
-            }
-            Expr::AggregateFunction { fun, args, .. } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                aggregates::return_type(fun, &data_types)
-            }
-            Expr::AggregateUDF { fun, args, .. } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                Ok((fun.return_type)(&data_types)?.as_ref().clone())
-            }
-            Expr::Not(_)
-            | Expr::IsNull(_)
-            | Expr::Between { .. }
-            | Expr::InList { .. }
-            | Expr::IsNotNull(_) => Ok(DataType::Boolean),
-            Expr::BinaryExpr {
-                ref left,
-                ref right,
-                ref op,
-            } => binary_operator_data_type(
-                &left.get_type(schema)?,
-                op,
-                &right.get_type(schema)?,
-            ),
-            Expr::Wildcard => Err(DataFusionError::Internal(
-                "Wildcard expressions are not valid in a logical query plan".to_owned(),
-            )),
-            Expr::GetIndexedField { ref expr, key } => {
-                let data_type = expr.get_type(schema)?;
-
-                get_indexed_field(&data_type, key).map(|x| x.data_type().clone())
-            }
-        }
-    }
-
-    /// Returns the nullability of the expression based on [ExprSchema].
-    ///
-    /// Note: [DFSchema] implements [ExprSchema].
-    ///
-    /// # Errors
-    ///
-    /// This function errors when it is not possible to compute its
-    /// nullability.  This happens when the expression refers to a
-    /// column that does not exist in the schema.
-    pub fn nullable<S: ExprSchema>(&self, input_schema: &S) -> Result<bool> {
-        match self {
-            Expr::Alias(expr, _)
-            | Expr::Not(expr)
-            | Expr::Negative(expr)
-            | Expr::Sort { expr, .. }
-            | Expr::Between { expr, .. }
-            | Expr::InList { expr, .. } => expr.nullable(input_schema),
-            Expr::Column(c) => input_schema.nullable(c),
-            Expr::Literal(value) => Ok(value.is_null()),
-            Expr::Case {
-                when_then_expr,
-                else_expr,
-                ..
-            } => {
-                // this expression is nullable if any of the input expressions are nullable
-                let then_nullable = when_then_expr
-                    .iter()
-                    .map(|(_, t)| t.nullable(input_schema))
-                    .collect::<Result<Vec<_>>>()?;
-                if then_nullable.contains(&true) {
-                    Ok(true)
-                } else if let Some(e) = else_expr {
-                    e.nullable(input_schema)
-                } else {
-                    Ok(false)
-                }
-            }
-            Expr::Cast { expr, .. } => expr.nullable(input_schema),
-            Expr::ScalarVariable(_)
-            | Expr::TryCast { .. }
-            | Expr::ScalarFunction { .. }
-            | Expr::ScalarUDF { .. }
-            | Expr::WindowFunction { .. }
-            | Expr::AggregateFunction { .. }
-            | Expr::AggregateUDF { .. } => Ok(true),
-            Expr::IsNull(_) | Expr::IsNotNull(_) => Ok(false),
-            Expr::BinaryExpr {
-                ref left,
-                ref right,
-                ..
-            } => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
-            Expr::Wildcard => Err(DataFusionError::Internal(
-                "Wildcard expressions are not valid in a logical query plan".to_owned(),
-            )),
-            Expr::GetIndexedField { ref expr, key } => {
-                let data_type = expr.get_type(input_schema)?;
-                get_indexed_field(&data_type, key).map(|x| x.is_nullable())
-            }
-        }
-    }
-
-    /// Returns the name of this expression based on [crate::logical_plan::DFSchema].
-    ///
-    /// This represents how a column with this expression is named when no alias is chosen
-    pub fn name(&self, input_schema: &DFSchema) -> Result<String> {
-        create_name(self, input_schema)
-    }
-
-    /// Returns a [arrow::datatypes::Field] compatible with this expression.
-    pub fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
-        match self {
-            Expr::Column(c) => Ok(DFField::new(
-                c.relation.as_deref(),
-                &c.name,
-                self.get_type(input_schema)?,
-                self.nullable(input_schema)?,
-            )),
-            _ => Ok(DFField::new(
-                None,
-                &self.name(input_schema)?,
-                self.get_type(input_schema)?,
-                self.nullable(input_schema)?,
-            )),
-        }
-    }
-
-    /// Wraps this expression in a cast to a target [arrow::datatypes::DataType].
-    ///
-    /// # Errors
-    ///
-    /// This function errors when it is impossible to cast the
-    /// expression to the target [arrow::datatypes::DataType].
-    pub fn cast_to<S: ExprSchema>(
-        self,
-        cast_to_type: &DataType,
-        schema: &S,
-    ) -> Result<Expr> {
-        // TODO(kszucs): most of the operations do not validate the type correctness
-        // like all of the binary expressions below. Perhaps Expr should track the
-        // type of the expression?
-        let this_type = self.get_type(schema)?;
-        if this_type == *cast_to_type {
-            Ok(self)
-        } else if can_cast_types(&this_type, cast_to_type) {
-            Ok(Expr::Cast {
-                expr: Box::new(self),
-                data_type: cast_to_type.clone(),
-            })
-        } else {
-            Err(DataFusionError::Plan(format!(
-                "Cannot automatically convert {:?} to {:?}",
-                this_type, cast_to_type
-            )))
-        }
-    }
-
-    /// Return `self == other`
-    pub fn eq(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::Eq, other)
-    }
-
-    /// Return `self != other`
-    pub fn not_eq(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::NotEq, other)
-    }
-
-    /// Return `self > other`
-    pub fn gt(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::Gt, other)
-    }
-
-    /// Return `self >= other`
-    pub fn gt_eq(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::GtEq, other)
-    }
-
-    /// Return `self < other`
-    pub fn lt(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::Lt, other)
-    }
-
-    /// Return `self <= other`
-    pub fn lt_eq(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::LtEq, other)
-    }
-
-    /// Return `self && other`
-    pub fn and(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::And, other)
-    }
-
-    /// Return `self || other`
-    pub fn or(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::Or, other)
-    }
-
-    /// Return `!self`
-    #[allow(clippy::should_implement_trait)]
-    pub fn not(self) -> Expr {
-        !self
-    }
-
-    /// Calculate the modulus of two expressions.
-    /// Return `self % other`
-    pub fn modulus(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::Modulo, other)
-    }
-
-    /// Return `self LIKE other`
-    pub fn like(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::Like, other)
-    }
-
-    /// Return `self NOT LIKE other`
-    pub fn not_like(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::NotLike, other)
-    }
-
-    /// Return `self AS name` alias expression
-    pub fn alias(self, name: &str) -> Expr {
-        Expr::Alias(Box::new(self), name.to_owned())
-    }
-
-    /// Return `self IN <list>` if `negated` is false, otherwise
-    /// return `self NOT IN <list>`.a
-    pub fn in_list(self, list: Vec<Expr>, negated: bool) -> Expr {
-        Expr::InList {
-            expr: Box::new(self),
-            list,
-            negated,
-        }
-    }
-
-    /// Return `IsNull(Box(self))
-    #[allow(clippy::wrong_self_convention)]
-    pub fn is_null(self) -> Expr {
-        Expr::IsNull(Box::new(self))
-    }
-
-    /// Return `IsNotNull(Box(self))
-    #[allow(clippy::wrong_self_convention)]
-    pub fn is_not_null(self) -> Expr {
-        Expr::IsNotNull(Box::new(self))
-    }
-
-    /// Create a sort expression from an existing expression.
-    ///
-    /// ```
-    /// # use datafusion::logical_plan::col;
-    /// let sort_expr = col("foo").sort(true, true); // SORT ASC NULLS_FIRST
-    /// ```
-    pub fn sort(self, asc: bool, nulls_first: bool) -> Expr {
-        Expr::Sort {
-            expr: Box::new(self),
-            asc,
-            nulls_first,
-        }
-    }
-
-    /// Performs a depth first walk of an expression and
-    /// its children, calling [`ExpressionVisitor::pre_visit`] and
-    /// `visitor.post_visit`.
-    ///
-    /// Implements the [visitor pattern](https://en.wikipedia.org/wiki/Visitor_pattern) to
-    /// separate expression algorithms from the structure of the
-    /// `Expr` tree and make it easier to add new types of expressions
-    /// and algorithms that walk the tree.
-    ///
-    /// For an expression tree such as
-    /// ```text
-    /// BinaryExpr (GT)
-    ///    left: Column("foo")
-    ///    right: Column("bar")
-    /// ```
-    ///
-    /// The nodes are visited using the following order
-    /// ```text
-    /// pre_visit(BinaryExpr(GT))
-    /// pre_visit(Column("foo"))
-    /// pre_visit(Column("bar"))
-    /// post_visit(Column("bar"))
-    /// post_visit(Column("bar"))
-    /// post_visit(BinaryExpr(GT))
-    /// ```
-    ///
-    /// If an Err result is returned, recursion is stopped immediately
-    ///
-    /// If `Recursion::Stop` is returned on a call to pre_visit, no
-    /// children of that expression are visited, nor is post_visit
-    /// called on that expression
-    ///
-    pub fn accept<V: ExpressionVisitor>(&self, visitor: V) -> Result<V> {
-        let visitor = match visitor.pre_visit(self)? {
-            Recursion::Continue(visitor) => visitor,
-            // If the recursion should stop, do not visit children
-            Recursion::Stop(visitor) => return Ok(visitor),
-        };
-
-        // recurse (and cover all expression types)
-        let visitor = match self {
-            Expr::Alias(expr, _)
-            | Expr::Not(expr)
-            | Expr::IsNotNull(expr)
-            | Expr::IsNull(expr)
-            | Expr::Negative(expr)
-            | Expr::Cast { expr, .. }
-            | Expr::TryCast { expr, .. }
-            | Expr::Sort { expr, .. }
-            | Expr::GetIndexedField { expr, .. } => expr.accept(visitor),
-            Expr::Column(_)
-            | Expr::ScalarVariable(_)
-            | Expr::Literal(_)
-            | Expr::Wildcard => Ok(visitor),
-            Expr::BinaryExpr { left, right, .. } => {
-                let visitor = left.accept(visitor)?;
-                right.accept(visitor)
-            }
-            Expr::Between {
-                expr, low, high, ..
-            } => {
-                let visitor = expr.accept(visitor)?;
-                let visitor = low.accept(visitor)?;
-                high.accept(visitor)
-            }
-            Expr::Case {
-                expr,
-                when_then_expr,
-                else_expr,
-            } => {
-                let visitor = if let Some(expr) = expr.as_ref() {
-                    expr.accept(visitor)
-                } else {
-                    Ok(visitor)
-                }?;
-                let visitor = when_then_expr.iter().try_fold(
-                    visitor,
-                    |visitor, (when, then)| {
-                        let visitor = when.accept(visitor)?;
-                        then.accept(visitor)
-                    },
-                )?;
-                if let Some(else_expr) = else_expr.as_ref() {
-                    else_expr.accept(visitor)
-                } else {
-                    Ok(visitor)
-                }
-            }
-            Expr::ScalarFunction { args, .. }
-            | Expr::ScalarUDF { args, .. }
-            | Expr::AggregateFunction { args, .. }
-            | Expr::AggregateUDF { args, .. } => args
-                .iter()
-                .try_fold(visitor, |visitor, arg| arg.accept(visitor)),
-            Expr::WindowFunction {
-                args,
-                partition_by,
-                order_by,
-                ..
-            } => {
-                let visitor = args
-                    .iter()
-                    .try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
-                let visitor = partition_by
-                    .iter()
-                    .try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
-                let visitor = order_by
-                    .iter()
-                    .try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
-                Ok(visitor)
-            }
-            Expr::InList { expr, list, .. } => {
-                let visitor = expr.accept(visitor)?;
-                list.iter()
-                    .try_fold(visitor, |visitor, arg| arg.accept(visitor))
-            }
-        }?;
-
-        visitor.post_visit(self)
-    }
-
-    /// Performs a depth first walk of an expression and its children
-    /// to rewrite an expression, consuming `self` producing a new
-    /// [`Expr`].
-    ///
-    /// Implements a modified version of the [visitor
-    /// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) to
-    /// separate algorithms from the structure of the `Expr` tree and
-    /// make it easier to write new, efficient expression
-    /// transformation algorithms.
-    ///
-    /// For an expression tree such as
-    /// ```text
-    /// BinaryExpr (GT)
-    ///    left: Column("foo")
-    ///    right: Column("bar")
-    /// ```
-    ///
-    /// The nodes are visited using the following order
-    /// ```text
-    /// pre_visit(BinaryExpr(GT))
-    /// pre_visit(Column("foo"))
-    /// mutatate(Column("foo"))
-    /// pre_visit(Column("bar"))
-    /// mutate(Column("bar"))
-    /// mutate(BinaryExpr(GT))
-    /// ```
-    ///
-    /// If an Err result is returned, recursion is stopped immediately
-    ///
-    /// If [`false`] is returned on a call to pre_visit, no
-    /// children of that expression are visited, nor is mutate
-    /// called on that expression
-    ///
-    pub fn rewrite<R>(self, rewriter: &mut R) -> Result<Self>
-    where
-        R: ExprRewriter,
-    {
-        let need_mutate = match rewriter.pre_visit(&self)? {
-            RewriteRecursion::Mutate => return rewriter.mutate(self),
-            RewriteRecursion::Stop => return Ok(self),
-            RewriteRecursion::Continue => true,
-            RewriteRecursion::Skip => false,
-        };
-
-        // recurse into all sub expressions(and cover all expression types)
-        let expr = match self {
-            Expr::Alias(expr, name) => Expr::Alias(rewrite_boxed(expr, rewriter)?, name),
-            Expr::Column(_) => self.clone(),
-            Expr::ScalarVariable(names) => Expr::ScalarVariable(names),
-            Expr::Literal(value) => Expr::Literal(value),
-            Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr {
-                left: rewrite_boxed(left, rewriter)?,
-                op,
-                right: rewrite_boxed(right, rewriter)?,
-            },
-            Expr::Not(expr) => Expr::Not(rewrite_boxed(expr, rewriter)?),
-            Expr::IsNotNull(expr) => Expr::IsNotNull(rewrite_boxed(expr, rewriter)?),
-            Expr::IsNull(expr) => Expr::IsNull(rewrite_boxed(expr, rewriter)?),
-            Expr::Negative(expr) => Expr::Negative(rewrite_boxed(expr, rewriter)?),
-            Expr::Between {
-                expr,
-                low,
-                high,
-                negated,
-            } => Expr::Between {
-                expr: rewrite_boxed(expr, rewriter)?,
-                low: rewrite_boxed(low, rewriter)?,
-                high: rewrite_boxed(high, rewriter)?,
-                negated,
-            },
-            Expr::Case {
-                expr,
-                when_then_expr,
-                else_expr,
-            } => {
-                let expr = rewrite_option_box(expr, rewriter)?;
-                let when_then_expr = when_then_expr
-                    .into_iter()
-                    .map(|(when, then)| {
-                        Ok((
-                            rewrite_boxed(when, rewriter)?,
-                            rewrite_boxed(then, rewriter)?,
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                let else_expr = rewrite_option_box(else_expr, rewriter)?;
-
-                Expr::Case {
-                    expr,
-                    when_then_expr,
-                    else_expr,
-                }
-            }
-            Expr::Cast { expr, data_type } => Expr::Cast {
-                expr: rewrite_boxed(expr, rewriter)?,
-                data_type,
-            },
-            Expr::TryCast { expr, data_type } => Expr::TryCast {
-                expr: rewrite_boxed(expr, rewriter)?,
-                data_type,
-            },
-            Expr::Sort {
-                expr,
-                asc,
-                nulls_first,
-            } => Expr::Sort {
-                expr: rewrite_boxed(expr, rewriter)?,
-                asc,
-                nulls_first,
-            },
-            Expr::ScalarFunction { args, fun } => Expr::ScalarFunction {
-                args: rewrite_vec(args, rewriter)?,
-                fun,
-            },
-            Expr::ScalarUDF { args, fun } => Expr::ScalarUDF {
-                args: rewrite_vec(args, rewriter)?,
-                fun,
-            },
-            Expr::WindowFunction {
-                args,
-                fun,
-                partition_by,
-                order_by,
-                window_frame,
-            } => Expr::WindowFunction {
-                args: rewrite_vec(args, rewriter)?,
-                fun,
-                partition_by: rewrite_vec(partition_by, rewriter)?,
-                order_by: rewrite_vec(order_by, rewriter)?,
-                window_frame,
-            },
-            Expr::AggregateFunction {
-                args,
-                fun,
-                distinct,
-            } => Expr::AggregateFunction {
-                args: rewrite_vec(args, rewriter)?,
-                fun,
-                distinct,
-            },
-            Expr::AggregateUDF { args, fun } => Expr::AggregateUDF {
-                args: rewrite_vec(args, rewriter)?,
-                fun,
-            },
-            Expr::InList {
-                expr,
-                list,
-                negated,
-            } => Expr::InList {
-                expr: rewrite_boxed(expr, rewriter)?,
-                list: rewrite_vec(list, rewriter)?,
-                negated,
-            },
-            Expr::Wildcard => Expr::Wildcard,
-            Expr::GetIndexedField { expr, key } => Expr::GetIndexedField {
-                expr: rewrite_boxed(expr, rewriter)?,
-                key,
-            },
-        };
-
-        // now rewrite this expression itself
-        if need_mutate {
-            rewriter.mutate(expr)
-        } else {
-            Ok(expr)
-        }
-    }
-
-    /// Simplifies this [`Expr`]`s as much as possible, evaluating
-    /// constants and applying algebraic simplifications
-    ///
-    /// # Example:
-    /// `b > 2 AND b > 2`
-    /// can be written to
-    /// `b > 2`
-    ///
-    /// ```
-    /// use datafusion::logical_plan::*;
-    /// use datafusion::error::Result;
-    /// use datafusion::execution::context::ExecutionProps;
-    ///
-    /// /// Simple implementation that provides `Simplifier` the information it needs
-    /// #[derive(Default)]
-    /// struct Info {
-    ///   execution_props: ExecutionProps,
-    /// };
-    ///
-    /// impl SimplifyInfo for Info {
-    ///   fn is_boolean_type(&self, expr: &Expr) -> Result<bool> {
-    ///     Ok(false)
-    ///   }
-    ///   fn nullable(&self, expr: &Expr) -> Result<bool> {
-    ///     Ok(true)
-    ///   }
-    ///   fn execution_props(&self) -> &ExecutionProps {
-    ///     &self.execution_props
-    ///   }
-    /// }
-    ///
-    /// // b < 2
-    /// let b_lt_2 = col("b").gt(lit(2));
-    ///
-    /// // (b < 2) OR (b < 2)
-    /// let expr = b_lt_2.clone().or(b_lt_2.clone());
-    ///
-    /// // (b < 2) OR (b < 2) --> (b < 2)
-    /// let expr = expr.simplify(&Info::default()).unwrap();
-    /// assert_eq!(expr, b_lt_2);
-    /// ```
-    pub fn simplify<S: SimplifyInfo>(self, info: &S) -> Result<Self> {
-        let mut rewriter = Simplifier::new(info);
-        let mut const_evaluator = ConstEvaluator::new(info.execution_props());
-
-        // TODO iterate until no changes are made during rewrite
-        // (evaluating constants can enable new simplifications and
-        // simplifications can enable new constant evaluation)
-        // https://github.com/apache/arrow-datafusion/issues/1160
-        self.rewrite(&mut const_evaluator)?.rewrite(&mut rewriter)
-    }
-}
-
-impl Not for Expr {
-    type Output = Self;
-
-    fn not(self) -> Self::Output {
-        Expr::Not(Box::new(self))
-    }
-}
-
-impl std::fmt::Display for Expr {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        match self {
-            Expr::BinaryExpr {
-                ref left,
-                ref right,
-                ref op,
-            } => write!(f, "{} {} {}", left, op, right),
-            Expr::AggregateFunction {
-                /// Name of the function
-                ref fun,
-                /// List of expressions to feed to the functions as arguments
-                ref args,
-                /// Whether this is a DISTINCT aggregation or not
-                ref distinct,
-            } => fmt_function(f, &fun.to_string(), *distinct, args, true),
-            Expr::ScalarFunction {
-                /// Name of the function
-                ref fun,
-                /// List of expressions to feed to the functions as arguments
-                ref args,
-            } => fmt_function(f, &fun.to_string(), false, args, true),
-            _ => write!(f, "{:?}", self),
-        }
-    }
-}
-
-#[allow(clippy::boxed_local)]
-fn rewrite_boxed<R>(boxed_expr: Box<Expr>, rewriter: &mut R) -> Result<Box<Expr>>
-where
-    R: ExprRewriter,
-{
-    // TODO: It might be possible to avoid an allocation (the
-    // Box::new) below by reusing the box.
-    let expr: Expr = *boxed_expr;
-    let rewritten_expr = expr.rewrite(rewriter)?;
-    Ok(Box::new(rewritten_expr))
-}
-
-fn rewrite_option_box<R>(
-    option_box: Option<Box<Expr>>,
-    rewriter: &mut R,
-) -> Result<Option<Box<Expr>>>
-where
-    R: ExprRewriter,
-{
-    option_box
-        .map(|expr| rewrite_boxed(expr, rewriter))
-        .transpose()
-}
-
-/// rewrite a `Vec` of `Expr`s with the rewriter
-fn rewrite_vec<R>(v: Vec<Expr>, rewriter: &mut R) -> Result<Vec<Expr>>
-where
-    R: ExprRewriter,
-{
-    v.into_iter().map(|expr| expr.rewrite(rewriter)).collect()
-}
-
-/// Controls how the visitor recursion should proceed.
-pub enum Recursion<V: ExpressionVisitor> {
-    /// Attempt to visit all the children, recursively, of this expression.
-    Continue(V),
-    /// Do not visit the children of this expression, though the walk
-    /// of parents of this expression will not be affected
-    Stop(V),
-}
-
-/// Encode the traversal of an expression tree. When passed to
-/// `Expr::accept`, `ExpressionVisitor::visit` is invoked
-/// recursively on all nodes of an expression tree. See the comments
-/// on `Expr::accept` for details on its use
-pub trait ExpressionVisitor: Sized {
-    /// Invoked before any children of `expr` are visisted.
-    fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>>;
-
-    /// Invoked after all children of `expr` are visited. Default
-    /// implementation does nothing.
-    fn post_visit(self, _expr: &Expr) -> Result<Self> {
-        Ok(self)
-    }
-}
-
-/// Controls how the [ExprRewriter] recursion should proceed.
-pub enum RewriteRecursion {
-    /// Continue rewrite / visit this expression.
-    Continue,
-    /// Call [mutate()] immediately and return.
-    Mutate,
-    /// Do not rewrite / visit the children of this expression.
-    Stop,
-    /// Keep recursive but skip mutate on this expression
-    Skip,
-}
-
-/// Trait for potentially recursively rewriting an [`Expr`] expression
-/// tree. When passed to `Expr::rewrite`, `ExpressionVisitor::mutate` is
-/// invoked recursively on all nodes of an expression tree. See the
-/// comments on `Expr::rewrite` for details on its use
-pub trait ExprRewriter: Sized {
-    /// Invoked before any children of `expr` are rewritten /
-    /// visited. Default implementation returns `Ok(RewriteRecursion::Continue)`
-    fn pre_visit(&mut self, _expr: &Expr) -> Result<RewriteRecursion> {
-        Ok(RewriteRecursion::Continue)
-    }
-
-    /// Invoked after all children of `expr` have been mutated and
-    /// returns a potentially modified expr.
-    fn mutate(&mut self, expr: Expr) -> Result<Expr>;
-}
+pub use datafusion_expr::Expr;
 
 /// The information necessary to apply algebraic simplification to an
 /// [Expr]. See [SimplifyContext] for one implementation
@@ -1123,14 +147,7 @@ pub fn when(when: Expr, then: Expr) -> CaseBuilder {
     }
 }
 
-/// return a new expression l <op> r
-pub fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr {
-    Expr::BinaryExpr {
-        left: Box::new(l),
-        op,
-        right: Box::new(r),
-    }
-}
+
 
 /// return a new expression with a logical AND
 pub fn and(left: Expr, right: Expr) -> Expr {
@@ -1750,26 +767,6 @@ pub fn create_udaf(
     )
 }
 
-fn fmt_function(
-    f: &mut fmt::Formatter,
-    fun: &str,
-    distinct: bool,
-    args: &[Expr],
-    display: bool,
-) -> fmt::Result {
-    let args: Vec<String> = match display {
-        true => args.iter().map(|arg| format!("{}", arg)).collect(),
-        false => args.iter().map(|arg| format!("{:?}", arg)).collect(),
-    };
-
-    // let args: Vec<String> = args.iter().map(|arg| format!("{:?}", arg)).collect();
-    let distinct_str = match distinct {
-        true => "DISTINCT ",
-        false => "",
-    };
-    write!(f, "{}({}{})", fun, distinct_str, args.join(", "))
-}
-
 impl fmt::Debug for Expr {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         match self {