You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/08 15:30:50 UTC

[arrow-datafusion] 02/02: pyarrow

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 685ea46926f4b0c2812e026229c0f55036c36928
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 13:29:20 2022 +0800

    pyarrow
---
 datafusion-expr/Cargo.toml                 |   1 +
 datafusion-expr/src/expr.rs                | 703 +++++++++++++++++++++++++++++
 datafusion-expr/src/lib.rs                 |  34 ++
 datafusion-expr/src/operator.rs            |  72 +++
 datafusion-expr/src/udaf.rs                |  92 ++++
 datafusion-expr/src/udf.rs                 |  93 ++++
 datafusion/src/logical_plan/expr.rs        | 684 +---------------------------
 datafusion/src/logical_plan/mod.rs         |   3 +-
 datafusion/src/logical_plan/operators.rs   |  71 ---
 datafusion/src/physical_plan/aggregates.rs |   9 -
 datafusion/src/physical_plan/functions.rs  |  17 +-
 datafusion/src/physical_plan/udaf.rs       |  73 +--
 datafusion/src/physical_plan/udf.rs        |  69 +--
 13 files changed, 1009 insertions(+), 912 deletions(-)

diff --git a/datafusion-expr/Cargo.toml b/datafusion-expr/Cargo.toml
index 73a5fcd..a6dad52 100644
--- a/datafusion-expr/Cargo.toml
+++ b/datafusion-expr/Cargo.toml
@@ -38,3 +38,4 @@ path = "src/lib.rs"
 datafusion-common = { path = "../datafusion-common", version = "6.0.0" }
 arrow = { version = "8.0.0", features = ["prettyprint"] }
 sqlparser = "0.13"
+ahash = { version = "0.7", default-features = false }
diff --git a/datafusion-expr/src/expr.rs b/datafusion-expr/src/expr.rs
new file mode 100644
index 0000000..c4c7197
--- /dev/null
+++ b/datafusion-expr/src/expr.rs
@@ -0,0 +1,703 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::aggregate_function;
+use crate::built_in_function;
+use crate::window_frame;
+use crate::window_function;
+use crate::AggregateUDF;
+use crate::Operator;
+use crate::ScalarUDF;
+use arrow::datatypes::DataType;
+use datafusion_common::Column;
+use datafusion_common::{DFSchema, Result};
+use datafusion_common::{DataFusionError, ScalarValue};
+use std::fmt;
+use std::hash::{BuildHasher, Hash, Hasher};
+use std::ops::Not;
+use std::sync::Arc;
+
+/// return a new expression l <op> r
+pub fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr {
+  Expr::BinaryExpr {
+    left: Box::new(l),
+    op,
+    right: Box::new(r),
+  }
+}
+
+/// `Expr` is a central struct of DataFusion's query API, and
+/// represent logical expressions such as `A + 1`, or `CAST(c1 AS
+/// int)`.
+///
+/// An `Expr` can compute its [DataType](arrow::datatypes::DataType)
+/// and nullability, and has functions for building up complex
+/// expressions.
+///
+/// # Examples
+///
+/// ## Create an expression `c1` referring to column named "c1"
+/// ```
+/// # use datafusion::logical_plan::*;
+/// let expr = col("c1");
+/// assert_eq!(expr, Expr::Column(Column::from_name("c1")));
+/// ```
+///
+/// ## Create the expression `c1 + c2` to add columns "c1" and "c2" together
+/// ```
+/// # use datafusion::logical_plan::*;
+/// let expr = col("c1") + col("c2");
+///
+/// assert!(matches!(expr, Expr::BinaryExpr { ..} ));
+/// if let Expr::BinaryExpr { left, right, op } = expr {
+///   assert_eq!(*left, col("c1"));
+///   assert_eq!(*right, col("c2"));
+///   assert_eq!(op, Operator::Plus);
+/// }
+/// ```
+///
+/// ## Create expression `c1 = 42` to compare the value in coumn "c1" to the literal value `42`
+/// ```
+/// # use datafusion::logical_plan::*;
+/// # use datafusion::scalar::*;
+/// let expr = col("c1").eq(lit(42));
+///
+/// assert!(matches!(expr, Expr::BinaryExpr { ..} ));
+/// if let Expr::BinaryExpr { left, right, op } = expr {
+///   assert_eq!(*left, col("c1"));
+///   let scalar = ScalarValue::Int32(Some(42));
+///   assert_eq!(*right, Expr::Literal(scalar));
+///   assert_eq!(op, Operator::Eq);
+/// }
+/// ```
+#[derive(Clone, PartialEq, Hash)]
+pub enum Expr {
+  /// An expression with a specific name.
+  Alias(Box<Expr>, String),
+  /// A named reference to a qualified filed in a schema.
+  Column(Column),
+  /// A named reference to a variable in a registry.
+  ScalarVariable(Vec<String>),
+  /// A constant value.
+  Literal(ScalarValue),
+  /// A binary expression such as "age > 21"
+  BinaryExpr {
+    /// Left-hand side of the expression
+    left: Box<Expr>,
+    /// The comparison operator
+    op: Operator,
+    /// Right-hand side of the expression
+    right: Box<Expr>,
+  },
+  /// Negation of an expression. The expression's type must be a boolean to make sense.
+  Not(Box<Expr>),
+  /// Whether an expression is not Null. This expression is never null.
+  IsNotNull(Box<Expr>),
+  /// Whether an expression is Null. This expression is never null.
+  IsNull(Box<Expr>),
+  /// arithmetic negation of an expression, the operand must be of a signed numeric data type
+  Negative(Box<Expr>),
+  /// Returns the field of a [`ListArray`] or [`StructArray`] by key
+  GetIndexedField {
+    /// the expression to take the field from
+    expr: Box<Expr>,
+    /// The name of the field to take
+    key: ScalarValue,
+  },
+  /// Whether an expression is between a given range.
+  Between {
+    /// The value to compare
+    expr: Box<Expr>,
+    /// Whether the expression is negated
+    negated: bool,
+    /// The low end of the range
+    low: Box<Expr>,
+    /// The high end of the range
+    high: Box<Expr>,
+  },
+  /// The CASE expression is similar to a series of nested if/else and there are two forms that
+  /// can be used. The first form consists of a series of boolean "when" expressions with
+  /// corresponding "then" expressions, and an optional "else" expression.
+  ///
+  /// CASE WHEN condition THEN result
+  ///      [WHEN ...]
+  ///      [ELSE result]
+  /// END
+  ///
+  /// The second form uses a base expression and then a series of "when" clauses that match on a
+  /// literal value.
+  ///
+  /// CASE expression
+  ///     WHEN value THEN result
+  ///     [WHEN ...]
+  ///     [ELSE result]
+  /// END
+  Case {
+    /// Optional base expression that can be compared to literal values in the "when" expressions
+    expr: Option<Box<Expr>>,
+    /// One or more when/then expressions
+    when_then_expr: Vec<(Box<Expr>, Box<Expr>)>,
+    /// Optional "else" expression
+    else_expr: Option<Box<Expr>>,
+  },
+  /// Casts the expression to a given type and will return a runtime error if the expression cannot be cast.
+  /// This expression is guaranteed to have a fixed type.
+  Cast {
+    /// The expression being cast
+    expr: Box<Expr>,
+    /// The `DataType` the expression will yield
+    data_type: DataType,
+  },
+  /// Casts the expression to a given type and will return a null value if the expression cannot be cast.
+  /// This expression is guaranteed to have a fixed type.
+  TryCast {
+    /// The expression being cast
+    expr: Box<Expr>,
+    /// The `DataType` the expression will yield
+    data_type: DataType,
+  },
+  /// A sort expression, that can be used to sort values.
+  Sort {
+    /// The expression to sort on
+    expr: Box<Expr>,
+    /// The direction of the sort
+    asc: bool,
+    /// Whether to put Nulls before all other data values
+    nulls_first: bool,
+  },
+  /// Represents the call of a built-in scalar function with a set of arguments.
+  ScalarFunction {
+    /// The function
+    fun: built_in_function::BuiltinScalarFunction,
+    /// List of expressions to feed to the functions as arguments
+    args: Vec<Expr>,
+  },
+  /// Represents the call of a user-defined scalar function with arguments.
+  ScalarUDF {
+    /// The function
+    fun: Arc<ScalarUDF>,
+    /// List of expressions to feed to the functions as arguments
+    args: Vec<Expr>,
+  },
+  /// Represents the call of an aggregate built-in function with arguments.
+  AggregateFunction {
+    /// Name of the function
+    fun: aggregate_function::AggregateFunction,
+    /// List of expressions to feed to the functions as arguments
+    args: Vec<Expr>,
+    /// Whether this is a DISTINCT aggregation or not
+    distinct: bool,
+  },
+  /// Represents the call of a window function with arguments.
+  WindowFunction {
+    /// Name of the function
+    fun: window_function::WindowFunction,
+    /// List of expressions to feed to the functions as arguments
+    args: Vec<Expr>,
+    /// List of partition by expressions
+    partition_by: Vec<Expr>,
+    /// List of order by expressions
+    order_by: Vec<Expr>,
+    /// Window frame
+    window_frame: Option<window_frame::WindowFrame>,
+  },
+  /// aggregate function
+  AggregateUDF {
+    /// The function
+    fun: Arc<AggregateUDF>,
+    /// List of expressions to feed to the functions as arguments
+    args: Vec<Expr>,
+  },
+  /// Returns whether the list contains the expr value.
+  InList {
+    /// The expression to compare
+    expr: Box<Expr>,
+    /// A list of values to compare against
+    list: Vec<Expr>,
+    /// Whether the expression is negated
+    negated: bool,
+  },
+  /// Represents a reference to all fields in a schema.
+  Wildcard,
+}
+
+/// Fixed seed for the hashing so that Ords are consistent across runs
+const SEED: ahash::RandomState = ahash::RandomState::with_seeds(0, 0, 0, 0);
+
+impl PartialOrd for Expr {
+  fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+    let mut hasher = SEED.build_hasher();
+    self.hash(&mut hasher);
+    let s = hasher.finish();
+
+    let mut hasher = SEED.build_hasher();
+    other.hash(&mut hasher);
+    let o = hasher.finish();
+
+    Some(s.cmp(&o))
+  }
+}
+
+impl Expr {
+  /// Returns the name of this expression based on [crate::logical_plan::DFSchema].
+  ///
+  /// This represents how a column with this expression is named when no alias is chosen
+  pub fn name(&self, input_schema: &DFSchema) -> Result<String> {
+    create_name(self, input_schema)
+  }
+
+  /// Return `self == other`
+  pub fn eq(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::Eq, other)
+  }
+
+  /// Return `self != other`
+  pub fn not_eq(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::NotEq, other)
+  }
+
+  /// Return `self > other`
+  pub fn gt(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::Gt, other)
+  }
+
+  /// Return `self >= other`
+  pub fn gt_eq(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::GtEq, other)
+  }
+
+  /// Return `self < other`
+  pub fn lt(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::Lt, other)
+  }
+
+  /// Return `self <= other`
+  pub fn lt_eq(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::LtEq, other)
+  }
+
+  /// Return `self && other`
+  pub fn and(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::And, other)
+  }
+
+  /// Return `self || other`
+  pub fn or(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::Or, other)
+  }
+
+  /// Return `!self`
+  #[allow(clippy::should_implement_trait)]
+  pub fn not(self) -> Expr {
+    !self
+  }
+
+  /// Calculate the modulus of two expressions.
+  /// Return `self % other`
+  pub fn modulus(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::Modulo, other)
+  }
+
+  /// Return `self LIKE other`
+  pub fn like(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::Like, other)
+  }
+
+  /// Return `self NOT LIKE other`
+  pub fn not_like(self, other: Expr) -> Expr {
+    binary_expr(self, Operator::NotLike, other)
+  }
+
+  /// Return `self AS name` alias expression
+  pub fn alias(self, name: &str) -> Expr {
+    Expr::Alias(Box::new(self), name.to_owned())
+  }
+
+  /// Return `self IN <list>` if `negated` is false, otherwise
+  /// return `self NOT IN <list>`.a
+  pub fn in_list(self, list: Vec<Expr>, negated: bool) -> Expr {
+    Expr::InList {
+      expr: Box::new(self),
+      list,
+      negated,
+    }
+  }
+
+  /// Return `IsNull(Box(self))
+  #[allow(clippy::wrong_self_convention)]
+  pub fn is_null(self) -> Expr {
+    Expr::IsNull(Box::new(self))
+  }
+
+  /// Return `IsNotNull(Box(self))
+  #[allow(clippy::wrong_self_convention)]
+  pub fn is_not_null(self) -> Expr {
+    Expr::IsNotNull(Box::new(self))
+  }
+
+  /// Create a sort expression from an existing expression.
+  ///
+  /// ```
+  /// # use datafusion::logical_plan::col;
+  /// let sort_expr = col("foo").sort(true, true); // SORT ASC NULLS_FIRST
+  /// ```
+  pub fn sort(self, asc: bool, nulls_first: bool) -> Expr {
+    Expr::Sort {
+      expr: Box::new(self),
+      asc,
+      nulls_first,
+    }
+  }
+}
+
+impl Not for Expr {
+  type Output = Self;
+
+  fn not(self) -> Self::Output {
+    Expr::Not(Box::new(self))
+  }
+}
+
+impl std::fmt::Display for Expr {
+  fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+    match self {
+      Expr::BinaryExpr {
+        ref left,
+        ref right,
+        ref op,
+      } => write!(f, "{} {} {}", left, op, right),
+      Expr::AggregateFunction {
+        /// Name of the function
+        ref fun,
+        /// List of expressions to feed to the functions as arguments
+        ref args,
+        /// Whether this is a DISTINCT aggregation or not
+        ref distinct,
+      } => fmt_function(f, &fun.to_string(), *distinct, args, true),
+      Expr::ScalarFunction {
+        /// Name of the function
+        ref fun,
+        /// List of expressions to feed to the functions as arguments
+        ref args,
+      } => fmt_function(f, &fun.to_string(), false, args, true),
+      _ => write!(f, "{:?}", self),
+    }
+  }
+}
+
+impl fmt::Debug for Expr {
+  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+    match self {
+      Expr::Alias(expr, alias) => write!(f, "{:?} AS {}", expr, alias),
+      Expr::Column(c) => write!(f, "{}", c),
+      Expr::ScalarVariable(var_names) => write!(f, "{}", var_names.join(".")),
+      Expr::Literal(v) => write!(f, "{:?}", v),
+      Expr::Case {
+        expr,
+        when_then_expr,
+        else_expr,
+        ..
+      } => {
+        write!(f, "CASE ")?;
+        if let Some(e) = expr {
+          write!(f, "{:?} ", e)?;
+        }
+        for (w, t) in when_then_expr {
+          write!(f, "WHEN {:?} THEN {:?} ", w, t)?;
+        }
+        if let Some(e) = else_expr {
+          write!(f, "ELSE {:?} ", e)?;
+        }
+        write!(f, "END")
+      }
+      Expr::Cast { expr, data_type } => {
+        write!(f, "CAST({:?} AS {:?})", expr, data_type)
+      }
+      Expr::TryCast { expr, data_type } => {
+        write!(f, "TRY_CAST({:?} AS {:?})", expr, data_type)
+      }
+      Expr::Not(expr) => write!(f, "NOT {:?}", expr),
+      Expr::Negative(expr) => write!(f, "(- {:?})", expr),
+      Expr::IsNull(expr) => write!(f, "{:?} IS NULL", expr),
+      Expr::IsNotNull(expr) => write!(f, "{:?} IS NOT NULL", expr),
+      Expr::BinaryExpr { left, op, right } => {
+        write!(f, "{:?} {} {:?}", left, op, right)
+      }
+      Expr::Sort {
+        expr,
+        asc,
+        nulls_first,
+      } => {
+        if *asc {
+          write!(f, "{:?} ASC", expr)?;
+        } else {
+          write!(f, "{:?} DESC", expr)?;
+        }
+        if *nulls_first {
+          write!(f, " NULLS FIRST")
+        } else {
+          write!(f, " NULLS LAST")
+        }
+      }
+      Expr::ScalarFunction { fun, args, .. } => {
+        fmt_function(f, &fun.to_string(), false, args, false)
+      }
+      Expr::ScalarUDF { fun, ref args, .. } => {
+        fmt_function(f, &fun.name, false, args, false)
+      }
+      Expr::WindowFunction {
+        fun,
+        args,
+        partition_by,
+        order_by,
+        window_frame,
+      } => {
+        fmt_function(f, &fun.to_string(), false, args, false)?;
+        if !partition_by.is_empty() {
+          write!(f, " PARTITION BY {:?}", partition_by)?;
+        }
+        if !order_by.is_empty() {
+          write!(f, " ORDER BY {:?}", order_by)?;
+        }
+        if let Some(window_frame) = window_frame {
+          write!(
+            f,
+            " {} BETWEEN {} AND {}",
+            window_frame.units, window_frame.start_bound, window_frame.end_bound
+          )?;
+        }
+        Ok(())
+      }
+      Expr::AggregateFunction {
+        fun,
+        distinct,
+        ref args,
+        ..
+      } => fmt_function(f, &fun.to_string(), *distinct, args, true),
+      Expr::AggregateUDF { fun, ref args, .. } => {
+        fmt_function(f, &fun.name, false, args, false)
+      }
+      Expr::Between {
+        expr,
+        negated,
+        low,
+        high,
+      } => {
+        if *negated {
+          write!(f, "{:?} NOT BETWEEN {:?} AND {:?}", expr, low, high)
+        } else {
+          write!(f, "{:?} BETWEEN {:?} AND {:?}", expr, low, high)
+        }
+      }
+      Expr::InList {
+        expr,
+        list,
+        negated,
+      } => {
+        if *negated {
+          write!(f, "{:?} NOT IN ({:?})", expr, list)
+        } else {
+          write!(f, "{:?} IN ({:?})", expr, list)
+        }
+      }
+      Expr::Wildcard => write!(f, "*"),
+      Expr::GetIndexedField { ref expr, key } => {
+        write!(f, "({:?})[{}]", expr, key)
+      }
+    }
+  }
+}
+
+fn fmt_function(
+  f: &mut fmt::Formatter,
+  fun: &str,
+  distinct: bool,
+  args: &[Expr],
+  display: bool,
+) -> fmt::Result {
+  let args: Vec<String> = match display {
+    true => args.iter().map(|arg| format!("{}", arg)).collect(),
+    false => args.iter().map(|arg| format!("{:?}", arg)).collect(),
+  };
+
+  // let args: Vec<String> = args.iter().map(|arg| format!("{:?}", arg)).collect();
+  let distinct_str = match distinct {
+    true => "DISTINCT ",
+    false => "",
+  };
+  write!(f, "{}({}{})", fun, distinct_str, args.join(", "))
+}
+
+fn create_function_name(
+  fun: &str,
+  distinct: bool,
+  args: &[Expr],
+  input_schema: &DFSchema,
+) -> Result<String> {
+  let names: Vec<String> = args
+    .iter()
+    .map(|e| create_name(e, input_schema))
+    .collect::<Result<_>>()?;
+  let distinct_str = match distinct {
+    true => "DISTINCT ",
+    false => "",
+  };
+  Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))
+}
+
+/// Returns a readable name of an expression based on the input schema.
+/// This function recursively transverses the expression for names such as "CAST(a > 2)".
+fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
+  match e {
+    Expr::Alias(_, name) => Ok(name.clone()),
+    Expr::Column(c) => Ok(c.flat_name()),
+    Expr::ScalarVariable(variable_names) => Ok(variable_names.join(".")),
+    Expr::Literal(value) => Ok(format!("{:?}", value)),
+    Expr::BinaryExpr { left, op, right } => {
+      let left = create_name(left, input_schema)?;
+      let right = create_name(right, input_schema)?;
+      Ok(format!("{} {} {}", left, op, right))
+    }
+    Expr::Case {
+      expr,
+      when_then_expr,
+      else_expr,
+    } => {
+      let mut name = "CASE ".to_string();
+      if let Some(e) = expr {
+        let e = create_name(e, input_schema)?;
+        name += &format!("{} ", e);
+      }
+      for (w, t) in when_then_expr {
+        let when = create_name(w, input_schema)?;
+        let then = create_name(t, input_schema)?;
+        name += &format!("WHEN {} THEN {} ", when, then);
+      }
+      if let Some(e) = else_expr {
+        let e = create_name(e, input_schema)?;
+        name += &format!("ELSE {} ", e);
+      }
+      name += "END";
+      Ok(name)
+    }
+    Expr::Cast { expr, data_type } => {
+      let expr = create_name(expr, input_schema)?;
+      Ok(format!("CAST({} AS {:?})", expr, data_type))
+    }
+    Expr::TryCast { expr, data_type } => {
+      let expr = create_name(expr, input_schema)?;
+      Ok(format!("TRY_CAST({} AS {:?})", expr, data_type))
+    }
+    Expr::Not(expr) => {
+      let expr = create_name(expr, input_schema)?;
+      Ok(format!("NOT {}", expr))
+    }
+    Expr::Negative(expr) => {
+      let expr = create_name(expr, input_schema)?;
+      Ok(format!("(- {})", expr))
+    }
+    Expr::IsNull(expr) => {
+      let expr = create_name(expr, input_schema)?;
+      Ok(format!("{} IS NULL", expr))
+    }
+    Expr::IsNotNull(expr) => {
+      let expr = create_name(expr, input_schema)?;
+      Ok(format!("{} IS NOT NULL", expr))
+    }
+    Expr::GetIndexedField { expr, key } => {
+      let expr = create_name(expr, input_schema)?;
+      Ok(format!("{}[{}]", expr, key))
+    }
+    Expr::ScalarFunction { fun, args, .. } => {
+      create_function_name(&fun.to_string(), false, args, input_schema)
+    }
+    Expr::ScalarUDF { fun, args, .. } => {
+      create_function_name(&fun.name, false, args, input_schema)
+    }
+    Expr::WindowFunction {
+      fun,
+      args,
+      window_frame,
+      partition_by,
+      order_by,
+    } => {
+      let mut parts: Vec<String> = vec![create_function_name(
+        &fun.to_string(),
+        false,
+        args,
+        input_schema,
+      )?];
+      if !partition_by.is_empty() {
+        parts.push(format!("PARTITION BY {:?}", partition_by));
+      }
+      if !order_by.is_empty() {
+        parts.push(format!("ORDER BY {:?}", order_by));
+      }
+      if let Some(window_frame) = window_frame {
+        parts.push(format!("{}", window_frame));
+      }
+      Ok(parts.join(" "))
+    }
+    Expr::AggregateFunction {
+      fun,
+      distinct,
+      args,
+      ..
+    } => create_function_name(&fun.to_string(), *distinct, args, input_schema),
+    Expr::AggregateUDF { fun, args } => {
+      let mut names = Vec::with_capacity(args.len());
+      for e in args {
+        names.push(create_name(e, input_schema)?);
+      }
+      Ok(format!("{}({})", fun.name, names.join(",")))
+    }
+    Expr::InList {
+      expr,
+      list,
+      negated,
+    } => {
+      let expr = create_name(expr, input_schema)?;
+      let list = list.iter().map(|expr| create_name(expr, input_schema));
+      if *negated {
+        Ok(format!("{} NOT IN ({:?})", expr, list))
+      } else {
+        Ok(format!("{} IN ({:?})", expr, list))
+      }
+    }
+    Expr::Between {
+      expr,
+      negated,
+      low,
+      high,
+    } => {
+      let expr = create_name(expr, input_schema)?;
+      let low = create_name(low, input_schema)?;
+      let high = create_name(high, input_schema)?;
+      if *negated {
+        Ok(format!("{} NOT BETWEEN {} AND {}", expr, low, high))
+      } else {
+        Ok(format!("{} BETWEEN {} AND {}", expr, low, high))
+      }
+    }
+    Expr::Sort { .. } => Err(DataFusionError::Internal(
+      "Create name does not support sort expression".to_string(),
+    )),
+    Expr::Wildcard => Err(DataFusionError::Internal(
+      "Create name does not support wildcard".to_string(),
+    )),
+  }
+}
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index 2491fcf..fe68276 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -19,16 +19,50 @@ mod accumulator;
 mod aggregate_function;
 mod built_in_function;
 mod columnar_value;
+mod expr;
 mod operator;
 mod signature;
+mod udaf;
+mod udf;
 mod window_frame;
 mod window_function;
 
+use arrow::datatypes::DataType;
+use datafusion_common::Result;
+use std::sync::Arc;
+
+/// Scalar function
+///
+/// The Fn param is the wrapped function but be aware that the function will
+/// be passed with the slice / vec of columnar values (either scalar or array)
+/// with the exception of zero param function, where a singular element vec
+/// will be passed. In that case the single element is a null array to indicate
+/// the batch's row count (so that the generative zero-argument function can know
+/// the result array size).
+pub type ScalarFunctionImplementation =
+  Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
+
+/// A function's return type
+pub type ReturnTypeFunction =
+  Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
+
+/// the implementation of an aggregate function
+pub type AccumulatorFunctionImplementation =
+  Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync>;
+
+/// This signature corresponds to which types an aggregator serializes
+/// its state, given its return datatype.
+pub type StateTypeFunction =
+  Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
+
 pub use accumulator::Accumulator;
 pub use aggregate_function::AggregateFunction;
 pub use built_in_function::BuiltinScalarFunction;
 pub use columnar_value::{ColumnarValue, NullColumnarValue};
+pub use expr::Expr;
 pub use operator::Operator;
 pub use signature::{Signature, TypeSignature, Volatility};
+pub use udaf::AggregateUDF;
+pub use udf::ScalarUDF;
 pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
 pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion-expr/src/operator.rs b/datafusion-expr/src/operator.rs
index e6b7e35..11a9a77 100644
--- a/datafusion-expr/src/operator.rs
+++ b/datafusion-expr/src/operator.rs
@@ -15,7 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::expr::binary_expr;
+use crate::Expr;
 use std::fmt;
+use std::ops;
 
 /// Operators applied to expressions
 #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)]
@@ -95,3 +98,72 @@ impl fmt::Display for Operator {
         write!(f, "{}", display)
     }
 }
+
+impl ops::Add for Expr {
+    type Output = Self;
+
+    fn add(self, rhs: Self) -> Self {
+        binary_expr(self, Operator::Plus, rhs)
+    }
+}
+
+impl ops::Sub for Expr {
+    type Output = Self;
+
+    fn sub(self, rhs: Self) -> Self {
+        binary_expr(self, Operator::Minus, rhs)
+    }
+}
+
+impl ops::Mul for Expr {
+    type Output = Self;
+
+    fn mul(self, rhs: Self) -> Self {
+        binary_expr(self, Operator::Multiply, rhs)
+    }
+}
+
+impl ops::Div for Expr {
+    type Output = Self;
+
+    fn div(self, rhs: Self) -> Self {
+        binary_expr(self, Operator::Divide, rhs)
+    }
+}
+
+impl ops::Rem for Expr {
+    type Output = Self;
+
+    fn rem(self, rhs: Self) -> Self {
+        binary_expr(self, Operator::Modulo, rhs)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::prelude::lit;
+
+    #[test]
+    fn test_operators() {
+        assert_eq!(
+            format!("{:?}", lit(1u32) + lit(2u32)),
+            "UInt32(1) + UInt32(2)"
+        );
+        assert_eq!(
+            format!("{:?}", lit(1u32) - lit(2u32)),
+            "UInt32(1) - UInt32(2)"
+        );
+        assert_eq!(
+            format!("{:?}", lit(1u32) * lit(2u32)),
+            "UInt32(1) * UInt32(2)"
+        );
+        assert_eq!(
+            format!("{:?}", lit(1u32) / lit(2u32)),
+            "UInt32(1) / UInt32(2)"
+        );
+        assert_eq!(
+            format!("{:?}", lit(1u32) % lit(2u32)),
+            "UInt32(1) % UInt32(2)"
+        );
+    }
+}
diff --git a/datafusion-expr/src/udaf.rs b/datafusion-expr/src/udaf.rs
new file mode 100644
index 0000000..142cfe1
--- /dev/null
+++ b/datafusion-expr/src/udaf.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains functions and structs supporting user-defined aggregate functions.
+
+use crate::Expr;
+use crate::{
+  AccumulatorFunctionImplementation, ReturnTypeFunction, Signature, StateTypeFunction,
+};
+use std::fmt::{self, Debug, Formatter};
+use std::sync::Arc;
+
+/// Logical representation of a user-defined aggregate function (UDAF)
+/// A UDAF is different from a UDF in that it is stateful across batches.
+#[derive(Clone)]
+pub struct AggregateUDF {
+  /// name
+  pub name: String,
+  /// signature
+  pub signature: Signature,
+  /// Return type
+  pub return_type: ReturnTypeFunction,
+  /// actual implementation
+  pub accumulator: AccumulatorFunctionImplementation,
+  /// the accumulator's state's description as a function of the return type
+  pub state_type: StateTypeFunction,
+}
+
+impl Debug for AggregateUDF {
+  fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+    f.debug_struct("AggregateUDF")
+      .field("name", &self.name)
+      .field("signature", &self.signature)
+      .field("fun", &"<FUNC>")
+      .finish()
+  }
+}
+
+impl PartialEq for AggregateUDF {
+  fn eq(&self, other: &Self) -> bool {
+    self.name == other.name && self.signature == other.signature
+  }
+}
+
+impl std::hash::Hash for AggregateUDF {
+  fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+    self.name.hash(state);
+    self.signature.hash(state);
+  }
+}
+
+impl AggregateUDF {
+  /// Create a new AggregateUDF
+  pub fn new(
+    name: &str,
+    signature: &Signature,
+    return_type: &ReturnTypeFunction,
+    accumulator: &AccumulatorFunctionImplementation,
+    state_type: &StateTypeFunction,
+  ) -> Self {
+    Self {
+      name: name.to_owned(),
+      signature: signature.clone(),
+      return_type: return_type.clone(),
+      accumulator: accumulator.clone(),
+      state_type: state_type.clone(),
+    }
+  }
+
+  /// creates a logical expression with a call of the UDAF
+  /// This utility allows using the UDAF without requiring access to the registry.
+  pub fn call(&self, args: Vec<Expr>) -> Expr {
+    Expr::AggregateUDF {
+      fun: Arc::new(self.clone()),
+      args,
+    }
+  }
+}
diff --git a/datafusion-expr/src/udf.rs b/datafusion-expr/src/udf.rs
new file mode 100644
index 0000000..247d6a2
--- /dev/null
+++ b/datafusion-expr/src/udf.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! UDF support
+
+use crate::{Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature};
+use std::fmt;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+/// Logical representation of a UDF.
+#[derive(Clone)]
+pub struct ScalarUDF {
+  /// name
+  pub name: String,
+  /// signature
+  pub signature: Signature,
+  /// Return type
+  pub return_type: ReturnTypeFunction,
+  /// actual implementation
+  ///
+  /// The fn param is the wrapped function but be aware that the function will
+  /// be passed with the slice / vec of columnar values (either scalar or array)
+  /// with the exception of zero param function, where a singular element vec
+  /// will be passed. In that case the single element is a null array to indicate
+  /// the batch's row count (so that the generative zero-argument function can know
+  /// the result array size).
+  pub fun: ScalarFunctionImplementation,
+}
+
+impl Debug for ScalarUDF {
+  fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+    f.debug_struct("ScalarUDF")
+      .field("name", &self.name)
+      .field("signature", &self.signature)
+      .field("fun", &"<FUNC>")
+      .finish()
+  }
+}
+
+impl PartialEq for ScalarUDF {
+  fn eq(&self, other: &Self) -> bool {
+    self.name == other.name && self.signature == other.signature
+  }
+}
+
+impl std::hash::Hash for ScalarUDF {
+  fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+    self.name.hash(state);
+    self.signature.hash(state);
+  }
+}
+
+impl ScalarUDF {
+  /// Create a new ScalarUDF
+  pub fn new(
+    name: &str,
+    signature: &Signature,
+    return_type: &ReturnTypeFunction,
+    fun: &ScalarFunctionImplementation,
+  ) -> Self {
+    Self {
+      name: name.to_owned(),
+      signature: signature.clone(),
+      return_type: return_type.clone(),
+      fun: fun.clone(),
+    }
+  }
+
+  /// creates a logical expression with a call of the UDF
+  /// This utility allows using the UDF without requiring access to the registry.
+  pub fn call(&self, args: Vec<Expr>) -> Expr {
+    Expr::ScalarUDF {
+      fun: Arc::new(self.clone()),
+      args,
+    }
+  }
+}
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index f19e9d8..cc269be 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -22,377 +22,21 @@ pub use super::Operator;
 use crate::error::{DataFusionError, Result};
 use crate::logical_plan::ExprSchemable;
 use crate::logical_plan::{window_frames, DFField, DFSchema};
-use crate::physical_plan::functions::Volatility;
-use crate::physical_plan::{aggregates, functions, udf::ScalarUDF, window_functions};
+use crate::physical_plan::{aggregates, functions, udf::ScalarUDF};
 use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue};
-use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
 use arrow::datatypes::DataType;
 pub use datafusion_common::{Column, ExprSchema};
-use functions::{ReturnTypeFunction, ScalarFunctionImplementation, Signature};
+use datafusion_expr::AccumulatorFunctionImplementation;
+use datafusion_expr::StateTypeFunction;
+use datafusion_expr::{
+    ReturnTypeFunction, ScalarFunctionImplementation, Signature, Volatility,
+};
 use std::collections::HashSet;
 use std::fmt;
 use std::hash::{BuildHasher, Hash, Hasher};
-use std::ops::Not;
 use std::sync::Arc;
 
-/// `Expr` is a central struct of DataFusion's query API, and
-/// represent logical expressions such as `A + 1`, or `CAST(c1 AS
-/// int)`.
-///
-/// An `Expr` can compute its [DataType](arrow::datatypes::DataType)
-/// and nullability, and has functions for building up complex
-/// expressions.
-///
-/// # Examples
-///
-/// ## Create an expression `c1` referring to column named "c1"
-/// ```
-/// # use datafusion::logical_plan::*;
-/// let expr = col("c1");
-/// assert_eq!(expr, Expr::Column(Column::from_name("c1")));
-/// ```
-///
-/// ## Create the expression `c1 + c2` to add columns "c1" and "c2" together
-/// ```
-/// # use datafusion::logical_plan::*;
-/// let expr = col("c1") + col("c2");
-///
-/// assert!(matches!(expr, Expr::BinaryExpr { ..} ));
-/// if let Expr::BinaryExpr { left, right, op } = expr {
-///   assert_eq!(*left, col("c1"));
-///   assert_eq!(*right, col("c2"));
-///   assert_eq!(op, Operator::Plus);
-/// }
-/// ```
-///
-/// ## Create expression `c1 = 42` to compare the value in coumn "c1" to the literal value `42`
-/// ```
-/// # use datafusion::logical_plan::*;
-/// # use datafusion::scalar::*;
-/// let expr = col("c1").eq(lit(42));
-///
-/// assert!(matches!(expr, Expr::BinaryExpr { ..} ));
-/// if let Expr::BinaryExpr { left, right, op } = expr {
-///   assert_eq!(*left, col("c1"));
-///   let scalar = ScalarValue::Int32(Some(42));
-///   assert_eq!(*right, Expr::Literal(scalar));
-///   assert_eq!(op, Operator::Eq);
-/// }
-/// ```
-#[derive(Clone, PartialEq, Hash)]
-pub enum Expr {
-    /// An expression with a specific name.
-    Alias(Box<Expr>, String),
-    /// A named reference to a qualified filed in a schema.
-    Column(Column),
-    /// A named reference to a variable in a registry.
-    ScalarVariable(Vec<String>),
-    /// A constant value.
-    Literal(ScalarValue),
-    /// A binary expression such as "age > 21"
-    BinaryExpr {
-        /// Left-hand side of the expression
-        left: Box<Expr>,
-        /// The comparison operator
-        op: Operator,
-        /// Right-hand side of the expression
-        right: Box<Expr>,
-    },
-    /// Negation of an expression. The expression's type must be a boolean to make sense.
-    Not(Box<Expr>),
-    /// Whether an expression is not Null. This expression is never null.
-    IsNotNull(Box<Expr>),
-    /// Whether an expression is Null. This expression is never null.
-    IsNull(Box<Expr>),
-    /// arithmetic negation of an expression, the operand must be of a signed numeric data type
-    Negative(Box<Expr>),
-    /// Returns the field of a [`ListArray`] or [`StructArray`] by key
-    GetIndexedField {
-        /// the expression to take the field from
-        expr: Box<Expr>,
-        /// The name of the field to take
-        key: ScalarValue,
-    },
-    /// Whether an expression is between a given range.
-    Between {
-        /// The value to compare
-        expr: Box<Expr>,
-        /// Whether the expression is negated
-        negated: bool,
-        /// The low end of the range
-        low: Box<Expr>,
-        /// The high end of the range
-        high: Box<Expr>,
-    },
-    /// The CASE expression is similar to a series of nested if/else and there are two forms that
-    /// can be used. The first form consists of a series of boolean "when" expressions with
-    /// corresponding "then" expressions, and an optional "else" expression.
-    ///
-    /// CASE WHEN condition THEN result
-    ///      [WHEN ...]
-    ///      [ELSE result]
-    /// END
-    ///
-    /// The second form uses a base expression and then a series of "when" clauses that match on a
-    /// literal value.
-    ///
-    /// CASE expression
-    ///     WHEN value THEN result
-    ///     [WHEN ...]
-    ///     [ELSE result]
-    /// END
-    Case {
-        /// Optional base expression that can be compared to literal values in the "when" expressions
-        expr: Option<Box<Expr>>,
-        /// One or more when/then expressions
-        when_then_expr: Vec<(Box<Expr>, Box<Expr>)>,
-        /// Optional "else" expression
-        else_expr: Option<Box<Expr>>,
-    },
-    /// Casts the expression to a given type and will return a runtime error if the expression cannot be cast.
-    /// This expression is guaranteed to have a fixed type.
-    Cast {
-        /// The expression being cast
-        expr: Box<Expr>,
-        /// The `DataType` the expression will yield
-        data_type: DataType,
-    },
-    /// Casts the expression to a given type and will return a null value if the expression cannot be cast.
-    /// This expression is guaranteed to have a fixed type.
-    TryCast {
-        /// The expression being cast
-        expr: Box<Expr>,
-        /// The `DataType` the expression will yield
-        data_type: DataType,
-    },
-    /// A sort expression, that can be used to sort values.
-    Sort {
-        /// The expression to sort on
-        expr: Box<Expr>,
-        /// The direction of the sort
-        asc: bool,
-        /// Whether to put Nulls before all other data values
-        nulls_first: bool,
-    },
-    /// Represents the call of a built-in scalar function with a set of arguments.
-    ScalarFunction {
-        /// The function
-        fun: functions::BuiltinScalarFunction,
-        /// List of expressions to feed to the functions as arguments
-        args: Vec<Expr>,
-    },
-    /// Represents the call of a user-defined scalar function with arguments.
-    ScalarUDF {
-        /// The function
-        fun: Arc<ScalarUDF>,
-        /// List of expressions to feed to the functions as arguments
-        args: Vec<Expr>,
-    },
-    /// Represents the call of an aggregate built-in function with arguments.
-    AggregateFunction {
-        /// Name of the function
-        fun: aggregates::AggregateFunction,
-        /// List of expressions to feed to the functions as arguments
-        args: Vec<Expr>,
-        /// Whether this is a DISTINCT aggregation or not
-        distinct: bool,
-    },
-    /// Represents the call of a window function with arguments.
-    WindowFunction {
-        /// Name of the function
-        fun: window_functions::WindowFunction,
-        /// List of expressions to feed to the functions as arguments
-        args: Vec<Expr>,
-        /// List of partition by expressions
-        partition_by: Vec<Expr>,
-        /// List of order by expressions
-        order_by: Vec<Expr>,
-        /// Window frame
-        window_frame: Option<window_frames::WindowFrame>,
-    },
-    /// aggregate function
-    AggregateUDF {
-        /// The function
-        fun: Arc<AggregateUDF>,
-        /// List of expressions to feed to the functions as arguments
-        args: Vec<Expr>,
-    },
-    /// Returns whether the list contains the expr value.
-    InList {
-        /// The expression to compare
-        expr: Box<Expr>,
-        /// A list of values to compare against
-        list: Vec<Expr>,
-        /// Whether the expression is negated
-        negated: bool,
-    },
-    /// Represents a reference to all fields in a schema.
-    Wildcard,
-}
-
-/// Fixed seed for the hashing so that Ords are consistent across runs
-const SEED: ahash::RandomState = ahash::RandomState::with_seeds(0, 0, 0, 0);
-
-impl PartialOrd for Expr {
-    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
-        let mut hasher = SEED.build_hasher();
-        self.hash(&mut hasher);
-        let s = hasher.finish();
-
-        let mut hasher = SEED.build_hasher();
-        other.hash(&mut hasher);
-        let o = hasher.finish();
-
-        Some(s.cmp(&o))
-    }
-}
-
-impl Expr {
-    /// Returns the name of this expression based on [crate::logical_plan::DFSchema].
-    ///
-    /// This represents how a column with this expression is named when no alias is chosen
-    pub fn name(&self, input_schema: &DFSchema) -> Result<String> {
-        create_name(self, input_schema)
-    }
-
-    /// Return `self == other`
-    pub fn eq(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::Eq, other)
-    }
-
-    /// Return `self != other`
-    pub fn not_eq(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::NotEq, other)
-    }
-
-    /// Return `self > other`
-    pub fn gt(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::Gt, other)
-    }
-
-    /// Return `self >= other`
-    pub fn gt_eq(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::GtEq, other)
-    }
-
-    /// Return `self < other`
-    pub fn lt(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::Lt, other)
-    }
-
-    /// Return `self <= other`
-    pub fn lt_eq(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::LtEq, other)
-    }
-
-    /// Return `self && other`
-    pub fn and(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::And, other)
-    }
-
-    /// Return `self || other`
-    pub fn or(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::Or, other)
-    }
-
-    /// Return `!self`
-    #[allow(clippy::should_implement_trait)]
-    pub fn not(self) -> Expr {
-        !self
-    }
-
-    /// Calculate the modulus of two expressions.
-    /// Return `self % other`
-    pub fn modulus(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::Modulo, other)
-    }
-
-    /// Return `self LIKE other`
-    pub fn like(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::Like, other)
-    }
-
-    /// Return `self NOT LIKE other`
-    pub fn not_like(self, other: Expr) -> Expr {
-        binary_expr(self, Operator::NotLike, other)
-    }
-
-    /// Return `self AS name` alias expression
-    pub fn alias(self, name: &str) -> Expr {
-        Expr::Alias(Box::new(self), name.to_owned())
-    }
-
-    /// Return `self IN <list>` if `negated` is false, otherwise
-    /// return `self NOT IN <list>`.a
-    pub fn in_list(self, list: Vec<Expr>, negated: bool) -> Expr {
-        Expr::InList {
-            expr: Box::new(self),
-            list,
-            negated,
-        }
-    }
-
-    /// Return `IsNull(Box(self))
-    #[allow(clippy::wrong_self_convention)]
-    pub fn is_null(self) -> Expr {
-        Expr::IsNull(Box::new(self))
-    }
-
-    /// Return `IsNotNull(Box(self))
-    #[allow(clippy::wrong_self_convention)]
-    pub fn is_not_null(self) -> Expr {
-        Expr::IsNotNull(Box::new(self))
-    }
-
-    /// Create a sort expression from an existing expression.
-    ///
-    /// ```
-    /// # use datafusion::logical_plan::col;
-    /// let sort_expr = col("foo").sort(true, true); // SORT ASC NULLS_FIRST
-    /// ```
-    pub fn sort(self, asc: bool, nulls_first: bool) -> Expr {
-        Expr::Sort {
-            expr: Box::new(self),
-            asc,
-            nulls_first,
-        }
-    }
-}
-
-impl Not for Expr {
-    type Output = Self;
-
-    fn not(self) -> Self::Output {
-        Expr::Not(Box::new(self))
-    }
-}
-
-impl std::fmt::Display for Expr {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        match self {
-            Expr::BinaryExpr {
-                ref left,
-                ref right,
-                ref op,
-            } => write!(f, "{} {} {}", left, op, right),
-            Expr::AggregateFunction {
-                /// Name of the function
-                ref fun,
-                /// List of expressions to feed to the functions as arguments
-                ref args,
-                /// Whether this is a DISTINCT aggregation or not
-                ref distinct,
-            } => fmt_function(f, &fun.to_string(), *distinct, args, true),
-            Expr::ScalarFunction {
-                /// Name of the function
-                ref fun,
-                /// List of expressions to feed to the functions as arguments
-                ref args,
-            } => fmt_function(f, &fun.to_string(), false, args, true),
-            _ => write!(f, "{:?}", self),
-        }
-    }
-}
+pub use datafusion_expr::Expr;
 
 /// Helper struct for building [Expr::Case]
 pub struct CaseBuilder {
@@ -484,15 +128,6 @@ pub fn when(when: Expr, then: Expr) -> CaseBuilder {
     }
 }
 
-/// return a new expression l <op> r
-pub fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr {
-    Expr::BinaryExpr {
-        left: Box::new(l),
-        op,
-        right: Box::new(r),
-    }
-}
-
 /// return a new expression with a logical AND
 pub fn and(left: Expr, right: Expr) -> Expr {
     Expr::BinaryExpr {
@@ -934,311 +569,6 @@ pub fn create_udaf(
     )
 }
 
-fn fmt_function(
-    f: &mut fmt::Formatter,
-    fun: &str,
-    distinct: bool,
-    args: &[Expr],
-    display: bool,
-) -> fmt::Result {
-    let args: Vec<String> = match display {
-        true => args.iter().map(|arg| format!("{}", arg)).collect(),
-        false => args.iter().map(|arg| format!("{:?}", arg)).collect(),
-    };
-
-    // let args: Vec<String> = args.iter().map(|arg| format!("{:?}", arg)).collect();
-    let distinct_str = match distinct {
-        true => "DISTINCT ",
-        false => "",
-    };
-    write!(f, "{}({}{})", fun, distinct_str, args.join(", "))
-}
-
-impl fmt::Debug for Expr {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match self {
-            Expr::Alias(expr, alias) => write!(f, "{:?} AS {}", expr, alias),
-            Expr::Column(c) => write!(f, "{}", c),
-            Expr::ScalarVariable(var_names) => write!(f, "{}", var_names.join(".")),
-            Expr::Literal(v) => write!(f, "{:?}", v),
-            Expr::Case {
-                expr,
-                when_then_expr,
-                else_expr,
-                ..
-            } => {
-                write!(f, "CASE ")?;
-                if let Some(e) = expr {
-                    write!(f, "{:?} ", e)?;
-                }
-                for (w, t) in when_then_expr {
-                    write!(f, "WHEN {:?} THEN {:?} ", w, t)?;
-                }
-                if let Some(e) = else_expr {
-                    write!(f, "ELSE {:?} ", e)?;
-                }
-                write!(f, "END")
-            }
-            Expr::Cast { expr, data_type } => {
-                write!(f, "CAST({:?} AS {:?})", expr, data_type)
-            }
-            Expr::TryCast { expr, data_type } => {
-                write!(f, "TRY_CAST({:?} AS {:?})", expr, data_type)
-            }
-            Expr::Not(expr) => write!(f, "NOT {:?}", expr),
-            Expr::Negative(expr) => write!(f, "(- {:?})", expr),
-            Expr::IsNull(expr) => write!(f, "{:?} IS NULL", expr),
-            Expr::IsNotNull(expr) => write!(f, "{:?} IS NOT NULL", expr),
-            Expr::BinaryExpr { left, op, right } => {
-                write!(f, "{:?} {} {:?}", left, op, right)
-            }
-            Expr::Sort {
-                expr,
-                asc,
-                nulls_first,
-            } => {
-                if *asc {
-                    write!(f, "{:?} ASC", expr)?;
-                } else {
-                    write!(f, "{:?} DESC", expr)?;
-                }
-                if *nulls_first {
-                    write!(f, " NULLS FIRST")
-                } else {
-                    write!(f, " NULLS LAST")
-                }
-            }
-            Expr::ScalarFunction { fun, args, .. } => {
-                fmt_function(f, &fun.to_string(), false, args, false)
-            }
-            Expr::ScalarUDF { fun, ref args, .. } => {
-                fmt_function(f, &fun.name, false, args, false)
-            }
-            Expr::WindowFunction {
-                fun,
-                args,
-                partition_by,
-                order_by,
-                window_frame,
-            } => {
-                fmt_function(f, &fun.to_string(), false, args, false)?;
-                if !partition_by.is_empty() {
-                    write!(f, " PARTITION BY {:?}", partition_by)?;
-                }
-                if !order_by.is_empty() {
-                    write!(f, " ORDER BY {:?}", order_by)?;
-                }
-                if let Some(window_frame) = window_frame {
-                    write!(
-                        f,
-                        " {} BETWEEN {} AND {}",
-                        window_frame.units,
-                        window_frame.start_bound,
-                        window_frame.end_bound
-                    )?;
-                }
-                Ok(())
-            }
-            Expr::AggregateFunction {
-                fun,
-                distinct,
-                ref args,
-                ..
-            } => fmt_function(f, &fun.to_string(), *distinct, args, true),
-            Expr::AggregateUDF { fun, ref args, .. } => {
-                fmt_function(f, &fun.name, false, args, false)
-            }
-            Expr::Between {
-                expr,
-                negated,
-                low,
-                high,
-            } => {
-                if *negated {
-                    write!(f, "{:?} NOT BETWEEN {:?} AND {:?}", expr, low, high)
-                } else {
-                    write!(f, "{:?} BETWEEN {:?} AND {:?}", expr, low, high)
-                }
-            }
-            Expr::InList {
-                expr,
-                list,
-                negated,
-            } => {
-                if *negated {
-                    write!(f, "{:?} NOT IN ({:?})", expr, list)
-                } else {
-                    write!(f, "{:?} IN ({:?})", expr, list)
-                }
-            }
-            Expr::Wildcard => write!(f, "*"),
-            Expr::GetIndexedField { ref expr, key } => {
-                write!(f, "({:?})[{}]", expr, key)
-            }
-        }
-    }
-}
-
-fn create_function_name(
-    fun: &str,
-    distinct: bool,
-    args: &[Expr],
-    input_schema: &DFSchema,
-) -> Result<String> {
-    let names: Vec<String> = args
-        .iter()
-        .map(|e| create_name(e, input_schema))
-        .collect::<Result<_>>()?;
-    let distinct_str = match distinct {
-        true => "DISTINCT ",
-        false => "",
-    };
-    Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))
-}
-
-/// Returns a readable name of an expression based on the input schema.
-/// This function recursively transverses the expression for names such as "CAST(a > 2)".
-fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
-    match e {
-        Expr::Alias(_, name) => Ok(name.clone()),
-        Expr::Column(c) => Ok(c.flat_name()),
-        Expr::ScalarVariable(variable_names) => Ok(variable_names.join(".")),
-        Expr::Literal(value) => Ok(format!("{:?}", value)),
-        Expr::BinaryExpr { left, op, right } => {
-            let left = create_name(left, input_schema)?;
-            let right = create_name(right, input_schema)?;
-            Ok(format!("{} {} {}", left, op, right))
-        }
-        Expr::Case {
-            expr,
-            when_then_expr,
-            else_expr,
-        } => {
-            let mut name = "CASE ".to_string();
-            if let Some(e) = expr {
-                let e = create_name(e, input_schema)?;
-                name += &format!("{} ", e);
-            }
-            for (w, t) in when_then_expr {
-                let when = create_name(w, input_schema)?;
-                let then = create_name(t, input_schema)?;
-                name += &format!("WHEN {} THEN {} ", when, then);
-            }
-            if let Some(e) = else_expr {
-                let e = create_name(e, input_schema)?;
-                name += &format!("ELSE {} ", e);
-            }
-            name += "END";
-            Ok(name)
-        }
-        Expr::Cast { expr, data_type } => {
-            let expr = create_name(expr, input_schema)?;
-            Ok(format!("CAST({} AS {:?})", expr, data_type))
-        }
-        Expr::TryCast { expr, data_type } => {
-            let expr = create_name(expr, input_schema)?;
-            Ok(format!("TRY_CAST({} AS {:?})", expr, data_type))
-        }
-        Expr::Not(expr) => {
-            let expr = create_name(expr, input_schema)?;
-            Ok(format!("NOT {}", expr))
-        }
-        Expr::Negative(expr) => {
-            let expr = create_name(expr, input_schema)?;
-            Ok(format!("(- {})", expr))
-        }
-        Expr::IsNull(expr) => {
-            let expr = create_name(expr, input_schema)?;
-            Ok(format!("{} IS NULL", expr))
-        }
-        Expr::IsNotNull(expr) => {
-            let expr = create_name(expr, input_schema)?;
-            Ok(format!("{} IS NOT NULL", expr))
-        }
-        Expr::GetIndexedField { expr, key } => {
-            let expr = create_name(expr, input_schema)?;
-            Ok(format!("{}[{}]", expr, key))
-        }
-        Expr::ScalarFunction { fun, args, .. } => {
-            create_function_name(&fun.to_string(), false, args, input_schema)
-        }
-        Expr::ScalarUDF { fun, args, .. } => {
-            create_function_name(&fun.name, false, args, input_schema)
-        }
-        Expr::WindowFunction {
-            fun,
-            args,
-            window_frame,
-            partition_by,
-            order_by,
-        } => {
-            let mut parts: Vec<String> = vec![create_function_name(
-                &fun.to_string(),
-                false,
-                args,
-                input_schema,
-            )?];
-            if !partition_by.is_empty() {
-                parts.push(format!("PARTITION BY {:?}", partition_by));
-            }
-            if !order_by.is_empty() {
-                parts.push(format!("ORDER BY {:?}", order_by));
-            }
-            if let Some(window_frame) = window_frame {
-                parts.push(format!("{}", window_frame));
-            }
-            Ok(parts.join(" "))
-        }
-        Expr::AggregateFunction {
-            fun,
-            distinct,
-            args,
-            ..
-        } => create_function_name(&fun.to_string(), *distinct, args, input_schema),
-        Expr::AggregateUDF { fun, args } => {
-            let mut names = Vec::with_capacity(args.len());
-            for e in args {
-                names.push(create_name(e, input_schema)?);
-            }
-            Ok(format!("{}({})", fun.name, names.join(",")))
-        }
-        Expr::InList {
-            expr,
-            list,
-            negated,
-        } => {
-            let expr = create_name(expr, input_schema)?;
-            let list = list.iter().map(|expr| create_name(expr, input_schema));
-            if *negated {
-                Ok(format!("{} NOT IN ({:?})", expr, list))
-            } else {
-                Ok(format!("{} IN ({:?})", expr, list))
-            }
-        }
-        Expr::Between {
-            expr,
-            negated,
-            low,
-            high,
-        } => {
-            let expr = create_name(expr, input_schema)?;
-            let low = create_name(low, input_schema)?;
-            let high = create_name(high, input_schema)?;
-            if *negated {
-                Ok(format!("{} NOT BETWEEN {} AND {}", expr, low, high))
-            } else {
-                Ok(format!("{} BETWEEN {} AND {}", expr, low, high))
-            }
-        }
-        Expr::Sort { .. } => Err(DataFusionError::Internal(
-            "Create name does not support sort expression".to_string(),
-        )),
-        Expr::Wildcard => Err(DataFusionError::Internal(
-            "Create name does not support wildcard".to_string(),
-        )),
-    }
-}
-
 /// Create field meta-data from an expression, for use in a result set schema
 pub fn exprlist_to_fields<'a>(
     expr: impl IntoIterator<Item = &'a Expr>,
diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs
index f2ecb0f..57e34c8 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -37,11 +37,12 @@ pub mod window_frames;
 pub use builder::{
     build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
 };
+pub use datafusion_expr::expr::binary_expr;
 pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
 pub use display::display_schema;
 pub use expr::{
     abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
-    avg, binary_expr, bit_length, btrim, call_fn, case, ceil, character_length, chr, col,
+    avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, col,
     columnize_expr, combine_filters, concat, concat_ws, cos, count, count_distinct,
     create_udaf, create_udf, date_part, date_trunc, digest, exp, exprlist_to_fields,
     floor, in_list, initcap, left, length, lit, lit_timestamp_nano, ln, log10, log2,
diff --git a/datafusion/src/logical_plan/operators.rs b/datafusion/src/logical_plan/operators.rs
index 813f7e0..132f8a8 100644
--- a/datafusion/src/logical_plan/operators.rs
+++ b/datafusion/src/logical_plan/operators.rs
@@ -15,75 +15,4 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::{binary_expr, Expr};
 pub use datafusion_expr::Operator;
-use std::ops;
-
-impl ops::Add for Expr {
-    type Output = Self;
-
-    fn add(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Plus, rhs)
-    }
-}
-
-impl ops::Sub for Expr {
-    type Output = Self;
-
-    fn sub(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Minus, rhs)
-    }
-}
-
-impl ops::Mul for Expr {
-    type Output = Self;
-
-    fn mul(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Multiply, rhs)
-    }
-}
-
-impl ops::Div for Expr {
-    type Output = Self;
-
-    fn div(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Divide, rhs)
-    }
-}
-
-impl ops::Rem for Expr {
-    type Output = Self;
-
-    fn rem(self, rhs: Self) -> Self {
-        binary_expr(self, Operator::Modulo, rhs)
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use crate::prelude::lit;
-
-    #[test]
-    fn test_operators() {
-        assert_eq!(
-            format!("{:?}", lit(1u32) + lit(2u32)),
-            "UInt32(1) + UInt32(2)"
-        );
-        assert_eq!(
-            format!("{:?}", lit(1u32) - lit(2u32)),
-            "UInt32(1) - UInt32(2)"
-        );
-        assert_eq!(
-            format!("{:?}", lit(1u32) * lit(2u32)),
-            "UInt32(1) * UInt32(2)"
-        );
-        assert_eq!(
-            format!("{:?}", lit(1u32) / lit(2u32)),
-            "UInt32(1) / UInt32(2)"
-        );
-        assert_eq!(
-            format!("{:?}", lit(1u32) % lit(2u32)),
-            "UInt32(1) % UInt32(2)"
-        );
-    }
-}
diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs
index a1531d4..656b9c8 100644
--- a/datafusion/src/physical_plan/aggregates.rs
+++ b/datafusion/src/physical_plan/aggregates.rs
@@ -40,15 +40,6 @@ use expressions::{
 };
 use std::sync::Arc;
 
-/// the implementation of an aggregate function
-pub type AccumulatorFunctionImplementation =
-    Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync>;
-
-/// This signature corresponds to which types an aggregator serializes
-/// its state, given its return datatype.
-pub type StateTypeFunction =
-    Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
-
 pub use datafusion_expr::AggregateFunction;
 
 /// Returns the datatype of the aggregate function.
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index bf0aee9..f054e5c 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -53,25 +53,12 @@ use arrow::{
     record_batch::RecordBatch,
 };
 pub use datafusion_expr::NullColumnarValue;
+use datafusion_expr::ScalarFunctionImplementation;
 pub use datafusion_expr::{BuiltinScalarFunction, Signature, TypeSignature, Volatility};
 use fmt::{Debug, Formatter};
+use std::convert::From;
 use std::{any::Any, fmt, sync::Arc};
 
-/// Scalar function
-///
-/// The Fn param is the wrapped function but be aware that the function will
-/// be passed with the slice / vec of columnar values (either scalar or array)
-/// with the exception of zero param function, where a singular element vec
-/// will be passed. In that case the single element is a null array to indicate
-/// the batch's row count (so that the generative zero-argument function can know
-/// the result array size).
-pub type ScalarFunctionImplementation =
-    Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
-
-/// A function's return type
-pub type ReturnTypeFunction =
-    Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
-
 macro_rules! make_utf8_to_return_type {
     ($FUNC:ident, $largeUtf8Type:expr, $utf8Type:expr) => {
         fn $FUNC(arg_type: &DataType, name: &str) -> Result<DataType> {
diff --git a/datafusion/src/physical_plan/udaf.rs b/datafusion/src/physical_plan/udaf.rs
index 0de696d..e7b4058 100644
--- a/datafusion/src/physical_plan/udaf.rs
+++ b/datafusion/src/physical_plan/udaf.rs
@@ -28,83 +28,14 @@ use arrow::{
 
 use crate::physical_plan::PhysicalExpr;
 use crate::{error::Result, logical_plan::Expr};
-
+use datafusion_expr::{StateTypeFunction,ReturnTypeFunction,Signature,AccumulatorFunctionImplementation}
 use super::{
-    aggregates::AccumulatorFunctionImplementation,
-    aggregates::StateTypeFunction,
     expressions::format_state_name,
-    functions::{ReturnTypeFunction, Signature},
     type_coercion::coerce,
     Accumulator, AggregateExpr,
 };
 use std::sync::Arc;
-
-/// Logical representation of a user-defined aggregate function (UDAF)
-/// A UDAF is different from a UDF in that it is stateful across batches.
-#[derive(Clone)]
-pub struct AggregateUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
-    /// actual implementation
-    pub accumulator: AccumulatorFunctionImplementation,
-    /// the accumulator's state's description as a function of the return type
-    pub state_type: StateTypeFunction,
-}
-
-impl Debug for AggregateUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("AggregateUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("fun", &"<FUNC>")
-            .finish()
-    }
-}
-
-impl PartialEq for AggregateUDF {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
-    }
-}
-
-impl std::hash::Hash for AggregateUDF {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
-    }
-}
-
-impl AggregateUDF {
-    /// Create a new AggregateUDF
-    pub fn new(
-        name: &str,
-        signature: &Signature,
-        return_type: &ReturnTypeFunction,
-        accumulator: &AccumulatorFunctionImplementation,
-        state_type: &StateTypeFunction,
-    ) -> Self {
-        Self {
-            name: name.to_owned(),
-            signature: signature.clone(),
-            return_type: return_type.clone(),
-            accumulator: accumulator.clone(),
-            state_type: state_type.clone(),
-        }
-    }
-
-    /// creates a logical expression with a call of the UDAF
-    /// This utility allows using the UDAF without requiring access to the registry.
-    pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::AggregateUDF {
-            fun: Arc::new(self.clone()),
-            args,
-        }
-    }
-}
+pub use datafusion_expr::AggregateUDF;
 
 /// Creates a physical expression of the UDAF, that includes all necessary type coercion.
 /// This function errors when `args`' can't be coerced to a valid argument type of the UDAF.
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion/src/physical_plan/udf.rs
index 7355746..85e6b02 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion/src/physical_plan/udf.rs
@@ -33,74 +33,7 @@ use super::{
 };
 use std::sync::Arc;
 
-/// Logical representation of a UDF.
-#[derive(Clone)]
-pub struct ScalarUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
-    /// actual implementation
-    ///
-    /// The fn param is the wrapped function but be aware that the function will
-    /// be passed with the slice / vec of columnar values (either scalar or array)
-    /// with the exception of zero param function, where a singular element vec
-    /// will be passed. In that case the single element is a null array to indicate
-    /// the batch's row count (so that the generative zero-argument function can know
-    /// the result array size).
-    pub fun: ScalarFunctionImplementation,
-}
-
-impl Debug for ScalarUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("ScalarUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("fun", &"<FUNC>")
-            .finish()
-    }
-}
-
-impl PartialEq for ScalarUDF {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
-    }
-}
-
-impl std::hash::Hash for ScalarUDF {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
-    }
-}
-
-impl ScalarUDF {
-    /// Create a new ScalarUDF
-    pub fn new(
-        name: &str,
-        signature: &Signature,
-        return_type: &ReturnTypeFunction,
-        fun: &ScalarFunctionImplementation,
-    ) -> Self {
-        Self {
-            name: name.to_owned(),
-            signature: signature.clone(),
-            return_type: return_type.clone(),
-            fun: fun.clone(),
-        }
-    }
-
-    /// creates a logical expression with a call of the UDF
-    /// This utility allows using the UDF without requiring access to the registry.
-    pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::ScalarUDF {
-            fun: Arc::new(self.clone()),
-            args,
-        }
-    }
-}
+pub use datafusion_expr::ScalarUDF;
 
 /// Create a physical expression of the UDF.
 /// This function errors when `args`' can't be coerced to a valid argument type of the UDF.