You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/01/03 06:53:32 UTC

[arrow-datafusion] branch master updated: refactor: split expression pf planner into one part. (#4783)

This is an automated email from the ASF dual-hosted git repository.

liukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c81c101e refactor: split expression pf planner into one part. (#4783)
6c81c101e is described below

commit 6c81c101ea0189a702220d9ba7f89dd64432e35d
Author: jakevin <ja...@gmail.com>
AuthorDate: Tue Jan 3 14:53:27 2023 +0800

    refactor: split expression pf planner into one part. (#4783)
---
 datafusion/sql/src/expr/binary_op.rs    |   67 ++
 datafusion/sql/src/expr/function.rs     |  225 ++++++
 datafusion/sql/src/expr/grouping_set.rs |   84 ++
 datafusion/sql/src/expr/identifier.rs   |  159 ++++
 datafusion/sql/src/expr/mod.rs          |  492 ++++++++++++
 datafusion/sql/src/expr/order_by.rs     |   71 ++
 datafusion/sql/src/expr/subquery.rs     |   80 ++
 datafusion/sql/src/expr/substring.rs    |   75 ++
 datafusion/sql/src/expr/unary_op.rs     |   60 ++
 datafusion/sql/src/expr/value.rs        |  213 +++++
 datafusion/sql/src/lib.rs               |    1 +
 datafusion/sql/src/planner.rs           | 1306 +------------------------------
 12 files changed, 1547 insertions(+), 1286 deletions(-)

diff --git a/datafusion/sql/src/expr/binary_op.rs b/datafusion/sql/src/expr/binary_op.rs
new file mode 100644
index 000000000..af545c737
--- /dev/null
+++ b/datafusion/sql/src/expr/binary_op.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use datafusion_common::{DFSchema, DataFusionError, Result};
+use datafusion_expr::{BinaryExpr, Expr, Operator};
+use sqlparser::ast::{BinaryOperator, Expr as SQLExpr};
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+    pub(crate) fn parse_sql_binary_op(
+        &self,
+        left: SQLExpr,
+        op: BinaryOperator,
+        right: SQLExpr,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        let operator = match op {
+            BinaryOperator::Gt => Ok(Operator::Gt),
+            BinaryOperator::GtEq => Ok(Operator::GtEq),
+            BinaryOperator::Lt => Ok(Operator::Lt),
+            BinaryOperator::LtEq => Ok(Operator::LtEq),
+            BinaryOperator::Eq => Ok(Operator::Eq),
+            BinaryOperator::NotEq => Ok(Operator::NotEq),
+            BinaryOperator::Plus => Ok(Operator::Plus),
+            BinaryOperator::Minus => Ok(Operator::Minus),
+            BinaryOperator::Multiply => Ok(Operator::Multiply),
+            BinaryOperator::Divide => Ok(Operator::Divide),
+            BinaryOperator::Modulo => Ok(Operator::Modulo),
+            BinaryOperator::And => Ok(Operator::And),
+            BinaryOperator::Or => Ok(Operator::Or),
+            BinaryOperator::PGRegexMatch => Ok(Operator::RegexMatch),
+            BinaryOperator::PGRegexIMatch => Ok(Operator::RegexIMatch),
+            BinaryOperator::PGRegexNotMatch => Ok(Operator::RegexNotMatch),
+            BinaryOperator::PGRegexNotIMatch => Ok(Operator::RegexNotIMatch),
+            BinaryOperator::BitwiseAnd => Ok(Operator::BitwiseAnd),
+            BinaryOperator::BitwiseOr => Ok(Operator::BitwiseOr),
+            BinaryOperator::BitwiseXor => Ok(Operator::BitwiseXor),
+            BinaryOperator::PGBitwiseShiftRight => Ok(Operator::BitwiseShiftRight),
+            BinaryOperator::PGBitwiseShiftLeft => Ok(Operator::BitwiseShiftLeft),
+            BinaryOperator::StringConcat => Ok(Operator::StringConcat),
+            _ => Err(DataFusionError::NotImplemented(format!(
+                "Unsupported SQL binary operator {op:?}"
+            ))),
+        }?;
+
+        Ok(Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(self.sql_expr_to_logical_expr(left, schema, planner_context)?),
+            operator,
+            Box::new(self.sql_expr_to_logical_expr(right, schema, planner_context)?),
+        )))
+    }
+}
diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs
new file mode 100644
index 000000000..1845d5947
--- /dev/null
+++ b/datafusion/sql/src/expr/function.rs
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use crate::utils::normalize_ident;
+use datafusion_common::{DFSchema, DataFusionError, Result};
+use datafusion_expr::utils::COUNT_STAR_EXPANSION;
+use datafusion_expr::{
+    expr, window_function, AggregateFunction, BuiltinScalarFunction, Expr, WindowFrame,
+    WindowFrameUnits, WindowFunction,
+};
+use sqlparser::ast::{
+    Expr as SQLExpr, Function as SQLFunction, FunctionArg, FunctionArgExpr,
+};
+use std::str::FromStr;
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+    pub(super) fn sql_function_to_expr(
+        &self,
+        mut function: SQLFunction,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        let name = if function.name.0.len() > 1 {
+            // DF doesn't handle compound identifiers
+            // (e.g. "foo.bar") for function names yet
+            function.name.to_string()
+        } else {
+            normalize_ident(function.name.0[0].clone())
+        };
+
+        // next, scalar built-in
+        if let Ok(fun) = BuiltinScalarFunction::from_str(&name) {
+            let args = self.function_args_to_expr(function.args, schema)?;
+            return Ok(Expr::ScalarFunction { fun, args });
+        };
+
+        // then, window function
+        if let Some(window) = function.over.take() {
+            let partition_by = window
+                .partition_by
+                .into_iter()
+                .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
+                .collect::<Result<Vec<_>>>()?;
+            let order_by = window
+                .order_by
+                .into_iter()
+                .map(|e| self.order_by_to_sort_expr(e, schema))
+                .collect::<Result<Vec<_>>>()?;
+            let window_frame = window
+                .window_frame
+                .as_ref()
+                .map(|window_frame| {
+                    let window_frame: WindowFrame = window_frame.clone().try_into()?;
+                    if WindowFrameUnits::Range == window_frame.units
+                        && order_by.len() != 1
+                    {
+                        Err(DataFusionError::Plan(format!(
+                            "With window frame of type RANGE, the order by expression must be of length 1, got {}", order_by.len())))
+                    } else {
+                        Ok(window_frame)
+                    }
+                })
+                .transpose()?;
+            let window_frame = if let Some(window_frame) = window_frame {
+                window_frame
+            } else {
+                WindowFrame::new(!order_by.is_empty())
+            };
+            let fun = self.find_window_func(&name)?;
+            let expr = match fun {
+                WindowFunction::AggregateFunction(aggregate_fun) => {
+                    let (aggregate_fun, args) =
+                        self.aggregate_fn_to_expr(aggregate_fun, function.args, schema)?;
+
+                    Expr::WindowFunction(expr::WindowFunction::new(
+                        WindowFunction::AggregateFunction(aggregate_fun),
+                        args,
+                        partition_by,
+                        order_by,
+                        window_frame,
+                    ))
+                }
+                _ => Expr::WindowFunction(expr::WindowFunction::new(
+                    fun,
+                    self.function_args_to_expr(function.args, schema)?,
+                    partition_by,
+                    order_by,
+                    window_frame,
+                )),
+            };
+            return Ok(expr);
+        }
+
+        // next, aggregate built-ins
+        if let Ok(fun) = AggregateFunction::from_str(&name) {
+            let distinct = function.distinct;
+            let (fun, args) = self.aggregate_fn_to_expr(fun, function.args, schema)?;
+            return Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
+                fun, args, distinct, None,
+            )));
+        };
+
+        // finally, user-defined functions (UDF) and UDAF
+        match self.schema_provider.get_function_meta(&name) {
+            Some(fm) => {
+                let args = self.function_args_to_expr(function.args, schema)?;
+
+                Ok(Expr::ScalarUDF { fun: fm, args })
+            }
+            None => match self.schema_provider.get_aggregate_meta(&name) {
+                Some(fm) => {
+                    let args = self.function_args_to_expr(function.args, schema)?;
+                    Ok(Expr::AggregateUDF {
+                        fun: fm,
+                        args,
+                        filter: None,
+                    })
+                }
+                _ => Err(DataFusionError::Plan(format!("Invalid function '{name}'"))),
+            },
+        }
+    }
+
+    pub(super) fn sql_named_function_to_expr(
+        &self,
+        expr: SQLExpr,
+        fun: BuiltinScalarFunction,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        let args = vec![self.sql_expr_to_logical_expr(expr, schema, planner_context)?];
+        Ok(Expr::ScalarFunction { fun, args })
+    }
+
+    pub(super) fn find_window_func(&self, name: &str) -> Result<WindowFunction> {
+        window_function::find_df_window_func(name)
+            .or_else(|| {
+                self.schema_provider
+                    .get_aggregate_meta(name)
+                    .map(WindowFunction::AggregateUDF)
+            })
+            .ok_or_else(|| {
+                DataFusionError::Plan(format!("There is no window function named {name}"))
+            })
+    }
+
+    fn sql_fn_arg_to_logical_expr(
+        &self,
+        sql: FunctionArg,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        match sql {
+            FunctionArg::Named {
+                name: _,
+                arg: FunctionArgExpr::Expr(arg),
+            } => self.sql_expr_to_logical_expr(arg, schema, planner_context),
+            FunctionArg::Named {
+                name: _,
+                arg: FunctionArgExpr::Wildcard,
+            } => Ok(Expr::Wildcard),
+            FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
+                self.sql_expr_to_logical_expr(arg, schema, planner_context)
+            }
+            FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => Ok(Expr::Wildcard),
+            _ => Err(DataFusionError::NotImplemented(format!(
+                "Unsupported qualified wildcard argument: {sql:?}"
+            ))),
+        }
+    }
+
+    pub(super) fn function_args_to_expr(
+        &self,
+        args: Vec<FunctionArg>,
+        schema: &DFSchema,
+    ) -> Result<Vec<Expr>> {
+        args.into_iter()
+            .map(|a| {
+                self.sql_fn_arg_to_logical_expr(a, schema, &mut PlannerContext::new())
+            })
+            .collect::<Result<Vec<Expr>>>()
+    }
+
+    pub(super) fn aggregate_fn_to_expr(
+        &self,
+        fun: AggregateFunction,
+        args: Vec<FunctionArg>,
+        schema: &DFSchema,
+    ) -> Result<(AggregateFunction, Vec<Expr>)> {
+        let args = match fun {
+            // Special case rewrite COUNT(*) to COUNT(constant)
+            AggregateFunction::Count => args
+                .into_iter()
+                .map(|a| match a {
+                    FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
+                        Ok(Expr::Literal(COUNT_STAR_EXPANSION.clone()))
+                    }
+                    _ => self.sql_fn_arg_to_logical_expr(
+                        a,
+                        schema,
+                        &mut PlannerContext::new(),
+                    ),
+                })
+                .collect::<Result<Vec<Expr>>>()?,
+            _ => self.function_args_to_expr(args, schema)?,
+        };
+
+        Ok((fun, args))
+    }
+}
diff --git a/datafusion/sql/src/expr/grouping_set.rs b/datafusion/sql/src/expr/grouping_set.rs
new file mode 100644
index 000000000..c5a0b6da7
--- /dev/null
+++ b/datafusion/sql/src/expr/grouping_set.rs
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use datafusion_common::{DFSchema, DataFusionError, Result};
+use datafusion_expr::{Expr, GroupingSet};
+use sqlparser::ast::Expr as SQLExpr;
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+    pub(super) fn sql_grouping_sets_to_expr(
+        &self,
+        exprs: Vec<Vec<SQLExpr>>,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        let args: Result<Vec<Vec<_>>> = exprs
+            .into_iter()
+            .map(|v| {
+                v.into_iter()
+                    .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
+                    .collect()
+            })
+            .collect();
+        Ok(Expr::GroupingSet(GroupingSet::GroupingSets(args?)))
+    }
+
+    pub(super) fn sql_rollup_to_expr(
+        &self,
+        exprs: Vec<Vec<SQLExpr>>,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        let args: Result<Vec<_>> = exprs
+            .into_iter()
+            .map(|v| {
+                if v.len() != 1 {
+                    Err(DataFusionError::Internal(
+                        "Tuple expressions are not supported for Rollup expressions"
+                            .to_string(),
+                    ))
+                } else {
+                    self.sql_expr_to_logical_expr(v[0].clone(), schema, planner_context)
+                }
+            })
+            .collect();
+        Ok(Expr::GroupingSet(GroupingSet::Rollup(args?)))
+    }
+
+    pub(super) fn sql_cube_to_expr(
+        &self,
+        exprs: Vec<Vec<SQLExpr>>,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        let args: Result<Vec<_>> = exprs
+            .into_iter()
+            .map(|v| {
+                if v.len() != 1 {
+                    Err(DataFusionError::Internal(
+                        "Tuple expressions not are supported for Cube expressions"
+                            .to_string(),
+                    ))
+                } else {
+                    self.sql_expr_to_logical_expr(v[0].clone(), schema, planner_context)
+                }
+            })
+            .collect();
+        Ok(Expr::GroupingSet(GroupingSet::Cube(args?)))
+    }
+}
diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs
new file mode 100644
index 000000000..5a5c4004e
--- /dev/null
+++ b/datafusion/sql/src/expr/identifier.rs
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{
+    idents_to_table_reference, ContextProvider, PlannerContext, SqlToRel,
+};
+use crate::utils::normalize_ident;
+use datafusion_common::{
+    Column, DFSchema, DataFusionError, OwnedTableReference, Result, ScalarValue,
+};
+use datafusion_expr::{Case, Expr, GetIndexedField};
+use sqlparser::ast::{Expr as SQLExpr, Ident};
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+    pub(super) fn sql_identifier_to_expr(&self, id: Ident) -> Result<Expr> {
+        if id.value.starts_with('@') {
+            // TODO: figure out if ScalarVariables should be insensitive.
+            let var_names = vec![id.value];
+            let ty = self
+                .schema_provider
+                .get_variable_type(&var_names)
+                .ok_or_else(|| {
+                    DataFusionError::Execution(format!(
+                        "variable {var_names:?} has no type information"
+                    ))
+                })?;
+            Ok(Expr::ScalarVariable(ty, var_names))
+        } else {
+            // Don't use `col()` here because it will try to
+            // interpret names with '.' as if they were
+            // compound identifiers, but this is not a compound
+            // identifier. (e.g. it is "foo.bar" not foo.bar)
+
+            Ok(Expr::Column(Column {
+                relation: None,
+                name: normalize_ident(id),
+            }))
+        }
+    }
+
+    pub(super) fn sql_compound_identifier_to_expr(
+        &self,
+        ids: Vec<Ident>,
+        schema: &DFSchema,
+    ) -> Result<Expr> {
+        if ids[0].value.starts_with('@') {
+            let var_names: Vec<_> = ids.into_iter().map(normalize_ident).collect();
+            let ty = self
+                .schema_provider
+                .get_variable_type(&var_names)
+                .ok_or_else(|| {
+                    DataFusionError::Execution(format!(
+                        "variable {var_names:?} has no type information"
+                    ))
+                })?;
+            Ok(Expr::ScalarVariable(ty, var_names))
+        } else {
+            // only support "schema.table" type identifiers here
+            let (name, relation) = match idents_to_table_reference(ids)? {
+                OwnedTableReference::Partial { schema, table } => (table, schema),
+                r @ OwnedTableReference::Bare { .. }
+                | r @ OwnedTableReference::Full { .. } => {
+                    return Err(DataFusionError::Plan(format!(
+                        "Unsupported compound identifier '{r:?}'",
+                    )));
+                }
+            };
+
+            // Try and find the reference in schema
+            match schema.field_with_qualified_name(&relation, &name) {
+                Ok(_) => {
+                    // found an exact match on a qualified name so this is a table.column identifier
+                    Ok(Expr::Column(Column {
+                        relation: Some(relation),
+                        name,
+                    }))
+                }
+                Err(_) => {
+                    if let Some(field) =
+                        schema.fields().iter().find(|f| f.name().eq(&relation))
+                    {
+                        // Access to a field of a column which is a structure, example: SELECT my_struct.key
+                        Ok(Expr::GetIndexedField(GetIndexedField::new(
+                            Box::new(Expr::Column(field.qualified_column())),
+                            ScalarValue::Utf8(Some(name)),
+                        )))
+                    } else {
+                        // table.column identifier
+                        Ok(Expr::Column(Column {
+                            relation: Some(relation),
+                            name,
+                        }))
+                    }
+                }
+            }
+        }
+    }
+
+    pub(super) fn sql_case_identifier_to_expr(
+        &self,
+        operand: Option<Box<SQLExpr>>,
+        conditions: Vec<SQLExpr>,
+        results: Vec<SQLExpr>,
+        else_result: Option<Box<SQLExpr>>,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        let expr = if let Some(e) = operand {
+            Some(Box::new(self.sql_expr_to_logical_expr(
+                *e,
+                schema,
+                planner_context,
+            )?))
+        } else {
+            None
+        };
+        let when_expr = conditions
+            .into_iter()
+            .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
+            .collect::<Result<Vec<_>>>()?;
+        let then_expr = results
+            .into_iter()
+            .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
+            .collect::<Result<Vec<_>>>()?;
+        let else_expr = if let Some(e) = else_result {
+            Some(Box::new(self.sql_expr_to_logical_expr(
+                *e,
+                schema,
+                planner_context,
+            )?))
+        } else {
+            None
+        };
+
+        Ok(Expr::Case(Case::new(
+            expr,
+            when_expr
+                .iter()
+                .zip(then_expr.iter())
+                .map(|(w, t)| (Box::new(w.to_owned()), Box::new(t.to_owned())))
+                .collect(),
+            else_expr,
+        )))
+    }
+}
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
new file mode 100644
index 000000000..4accb451f
--- /dev/null
+++ b/datafusion/sql/src/expr/mod.rs
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod binary_op;
+mod function;
+mod grouping_set;
+mod identifier;
+mod order_by;
+mod subquery;
+mod substring;
+mod unary_op;
+mod value;
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use crate::utils::normalize_ident;
+use arrow_schema::DataType;
+use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue};
+use datafusion_expr::{
+    col, expr, lit, AggregateFunction, Between, BinaryExpr, BuiltinScalarFunction, Cast,
+    Expr, ExprSchemable, GetIndexedField, Like, Operator, TryCast,
+};
+use sqlparser::ast::{ArrayAgg, Expr as SQLExpr, TrimWhereField, Value};
+use sqlparser::parser::ParserError::ParserError;
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+    pub(crate) fn sql_expr_to_logical_expr(
+        &self,
+        sql: SQLExpr,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        // Workaround for https://github.com/apache/arrow-datafusion/issues/4065
+        //
+        // Minimize stack space required in debug builds to plan
+        // deeply nested binary operators by keeping the stack space
+        // needed for sql_expr_to_logical_expr minimal for BinaryOp
+        //
+        // The reason this reduces stack size in debug builds is
+        // explained in the "Technical Backstory" heading of
+        // https://github.com/apache/arrow-datafusion/pull/1047
+        //
+        // A likely better way to support deeply nested expressions
+        // would be to avoid recursion all together and use an
+        // iterative algorithm.
+        match sql {
+            SQLExpr::BinaryOp { left, op, right } => {
+                self.parse_sql_binary_op(*left, op, *right, schema, planner_context)
+            }
+            // since this function requires more space per frame
+            // avoid calling it for binary ops
+            _ => self.sql_expr_to_logical_expr_internal(sql, schema, planner_context),
+        }
+    }
+
+    /// Internal implementation. Use
+    /// [`Self::sql_expr_to_logical_expr`] to plan exprs.
+    fn sql_expr_to_logical_expr_internal(
+        &self,
+        sql: SQLExpr,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        match sql {
+            SQLExpr::Value(value) => {
+                self.parse_value(value, &planner_context.prepare_param_data_types)
+            }
+            SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction {
+                fun: BuiltinScalarFunction::DatePart,
+                args: vec![
+                    Expr::Literal(ScalarValue::Utf8(Some(format!("{field}")))),
+                    self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
+                ],
+            }),
+
+            SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema),
+            SQLExpr::Interval {
+                value,
+                leading_field,
+                leading_precision,
+                last_field,
+                fractional_seconds_precision,
+            } => self.sql_interval_to_expr(
+                *value,
+                leading_field,
+                leading_precision,
+                last_field,
+                fractional_seconds_precision,
+            ),
+            SQLExpr::Identifier(id) => self.sql_identifier_to_expr(id),
+
+            SQLExpr::MapAccess { column, keys } => {
+                if let SQLExpr::Identifier(id) = *column {
+                    plan_indexed(col(normalize_ident(id)), keys)
+                } else {
+                    Err(DataFusionError::NotImplemented(format!(
+                        "map access requires an identifier, found column {column} instead"
+                    )))
+                }
+            }
+
+            SQLExpr::ArrayIndex { obj, indexes } => {
+                let expr = self.sql_expr_to_logical_expr(*obj, schema, planner_context)?;
+                plan_indexed(expr, indexes)
+            }
+
+            SQLExpr::CompoundIdentifier(ids) => self.sql_compound_identifier_to_expr(ids, schema),
+
+            SQLExpr::Case {
+                operand,
+                conditions,
+                results,
+                else_result,
+            } => self.sql_case_identifier_to_expr(operand, conditions, results, else_result, schema, planner_context),
+
+            SQLExpr::Cast {
+                expr,
+                data_type,
+            } => Ok(Expr::Cast(Cast::new(
+                Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?),
+                self.convert_data_type(&data_type)?,
+            ))),
+
+            SQLExpr::TryCast {
+                expr,
+                data_type,
+            } => Ok(Expr::TryCast(TryCast::new(
+                Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?),
+                self.convert_data_type(&data_type)?,
+            ))),
+
+            SQLExpr::TypedString {
+                data_type,
+                value,
+            } => Ok(Expr::Cast(Cast::new(
+                Box::new(lit(value)),
+                self.convert_data_type(&data_type)?,
+            ))),
+
+            SQLExpr::IsNull(expr) => Ok(Expr::IsNull(Box::new(
+                self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
+            ))),
+
+            SQLExpr::IsNotNull(expr) => Ok(Expr::IsNotNull(Box::new(
+                self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
+            ))),
+
+            SQLExpr::IsDistinctFrom(left, right) => Ok(Expr::BinaryExpr(BinaryExpr::new(
+                Box::new(self.sql_expr_to_logical_expr(*left, schema, planner_context)?),
+                Operator::IsDistinctFrom,
+                Box::new(self.sql_expr_to_logical_expr(*right, schema, planner_context)?),
+            ))),
+
+            SQLExpr::IsNotDistinctFrom(left, right) => Ok(Expr::BinaryExpr(BinaryExpr::new(
+                Box::new(self.sql_expr_to_logical_expr(*left, schema, planner_context)?),
+                Operator::IsNotDistinctFrom,
+                Box::new(self.sql_expr_to_logical_expr(*right, schema, planner_context)?),
+            ))),
+
+            SQLExpr::IsTrue(expr) => Ok(Expr::IsTrue(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
+
+            SQLExpr::IsFalse(expr) => Ok(Expr::IsFalse(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
+
+            SQLExpr::IsNotTrue(expr) => Ok(Expr::IsNotTrue(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
+
+            SQLExpr::IsNotFalse(expr) => Ok(Expr::IsNotFalse(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
+
+            SQLExpr::IsUnknown(expr) => Ok(Expr::IsUnknown(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
+
+            SQLExpr::IsNotUnknown(expr) => Ok(Expr::IsNotUnknown(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
+
+            SQLExpr::UnaryOp { op, expr } => self.parse_sql_unary_op(op, *expr, schema, planner_context),
+
+            SQLExpr::Between {
+                expr,
+                negated,
+                low,
+                high,
+            } => Ok(Expr::Between(Between::new(
+                Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?),
+                negated,
+                Box::new(self.sql_expr_to_logical_expr(*low, schema, planner_context)?),
+                Box::new(self.sql_expr_to_logical_expr(*high, schema, planner_context)?),
+            ))),
+
+            SQLExpr::InList {
+                expr,
+                list,
+                negated,
+            } => self.sql_in_list_to_expr(*expr, list, negated, schema, planner_context),
+
+            SQLExpr::Like { negated, expr, pattern, escape_char } => self.sql_like_to_expr(negated, *expr, *pattern, escape_char, schema, planner_context),
+
+            SQLExpr::ILike { negated, expr, pattern, escape_char } =>  self.sql_ilike_to_expr(negated, *expr, *pattern, escape_char, schema, planner_context),
+
+            SQLExpr::SimilarTo { negated, expr, pattern, escape_char } => self.sql_similarto_to_expr(negated, *expr, *pattern, escape_char, schema, planner_context),
+
+            SQLExpr::BinaryOp {
+                ..
+            } => {
+                Err(DataFusionError::Internal(
+                    "binary_op should be handled by sql_expr_to_logical_expr.".to_string()
+                ))
+            }
+
+            #[cfg(feature = "unicode_expressions")]
+            SQLExpr::Substring {
+                expr,
+                substring_from,
+                substring_for,
+            } => self.sql_substring_to_expr(expr, substring_from, substring_for, schema, planner_context),
+
+            #[cfg(not(feature = "unicode_expressions"))]
+            SQLExpr::Substring {
+                ..
+            } => {
+                Err(DataFusionError::Internal(
+                    "statement substring requires compilation with feature flag: unicode_expressions.".to_string()
+                ))
+            }
+
+            SQLExpr::Trim { expr, trim_where, trim_what } => self.sql_trim_to_expr(*expr, trim_where, trim_what, schema, planner_context),
+
+            SQLExpr::AggregateExpressionWithFilter { expr, filter } => self.sql_agg_with_filter_to_expr(*expr, *filter, schema, planner_context),
+
+            SQLExpr::Function(function) => self.sql_function_to_expr(function, schema, planner_context),
+
+            SQLExpr::Rollup(exprs) => self.sql_rollup_to_expr(exprs, schema, planner_context),
+            SQLExpr::Cube(exprs) => self.sql_cube_to_expr(exprs,schema, planner_context),
+            SQLExpr::GroupingSets(exprs) => self.sql_grouping_sets_to_expr(exprs, schema, planner_context),
+
+            SQLExpr::Floor { expr, field: _field } => self.sql_named_function_to_expr(*expr, BuiltinScalarFunction::Floor, schema, planner_context),
+            SQLExpr::Ceil { expr, field: _field } => self.sql_named_function_to_expr(*expr, BuiltinScalarFunction::Ceil, schema, planner_context),
+
+            SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(*e, schema, planner_context),
+
+            SQLExpr::Exists { subquery, negated } => self.parse_exists_subquery(*subquery, negated, schema, planner_context),
+            SQLExpr::InSubquery { expr, subquery, negated } => self.parse_in_subquery(*expr, *subquery, negated, schema, planner_context),
+            SQLExpr::Subquery(subquery) => self.parse_scalar_subquery(*subquery, schema, planner_context),
+
+            SQLExpr::ArrayAgg(array_agg) => self.parse_array_agg(array_agg, schema, planner_context),
+
+            _ => Err(DataFusionError::NotImplemented(format!(
+                "Unsupported ast node in sqltorel: {sql:?}"
+            ))),
+        }
+    }
+
+    fn parse_array_agg(
+        &self,
+        array_agg: ArrayAgg,
+        input_schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        // Some dialects have special syntax for array_agg. DataFusion only supports it like a function.
+        let ArrayAgg {
+            distinct,
+            expr,
+            order_by,
+            limit,
+            within_group,
+        } = array_agg;
+
+        if let Some(order_by) = order_by {
+            return Err(DataFusionError::NotImplemented(format!(
+                "ORDER BY not supported in ARRAY_AGG: {order_by}"
+            )));
+        }
+
+        if let Some(limit) = limit {
+            return Err(DataFusionError::NotImplemented(format!(
+                "LIMIT not supported in ARRAY_AGG: {limit}"
+            )));
+        }
+
+        if within_group {
+            return Err(DataFusionError::NotImplemented(
+                "WITHIN GROUP not supported in ARRAY_AGG".to_string(),
+            ));
+        }
+
+        let args =
+            vec![self.sql_expr_to_logical_expr(*expr, input_schema, planner_context)?];
+        // next, aggregate built-ins
+        let fun = AggregateFunction::ArrayAgg;
+
+        Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
+            fun, args, distinct, None,
+        )))
+    }
+
+    fn sql_in_list_to_expr(
+        &self,
+        expr: SQLExpr,
+        list: Vec<SQLExpr>,
+        negated: bool,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        let list_expr = list
+            .into_iter()
+            .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(Expr::InList {
+            expr: Box::new(self.sql_expr_to_logical_expr(
+                expr,
+                schema,
+                planner_context,
+            )?),
+            list: list_expr,
+            negated,
+        })
+    }
+
+    fn sql_like_to_expr(
+        &self,
+        negated: bool,
+        expr: SQLExpr,
+        pattern: SQLExpr,
+        escape_char: Option<char>,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
+        let pattern_type = pattern.get_type(schema)?;
+        if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
+            return Err(DataFusionError::Plan(
+                "Invalid pattern in LIKE expression".to_string(),
+            ));
+        }
+        Ok(Expr::Like(Like::new(
+            negated,
+            Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
+            Box::new(pattern),
+            escape_char,
+        )))
+    }
+
+    fn sql_ilike_to_expr(
+        &self,
+        negated: bool,
+        expr: SQLExpr,
+        pattern: SQLExpr,
+        escape_char: Option<char>,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
+        let pattern_type = pattern.get_type(schema)?;
+        if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
+            return Err(DataFusionError::Plan(
+                "Invalid pattern in ILIKE expression".to_string(),
+            ));
+        }
+        Ok(Expr::ILike(Like::new(
+            negated,
+            Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
+            Box::new(pattern),
+            escape_char,
+        )))
+    }
+
+    fn sql_similarto_to_expr(
+        &self,
+        negated: bool,
+        expr: SQLExpr,
+        pattern: SQLExpr,
+        escape_char: Option<char>,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
+        let pattern_type = pattern.get_type(schema)?;
+        if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
+            return Err(DataFusionError::Plan(
+                "Invalid pattern in SIMILAR TO expression".to_string(),
+            ));
+        }
+        Ok(Expr::SimilarTo(Like::new(
+            negated,
+            Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
+            Box::new(pattern),
+            escape_char,
+        )))
+    }
+
+    fn sql_trim_to_expr(
+        &self,
+        expr: SQLExpr,
+        trim_where: Option<TrimWhereField>,
+        trim_what: Option<Box<SQLExpr>>,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        let fun = match trim_where {
+            Some(TrimWhereField::Leading) => BuiltinScalarFunction::Ltrim,
+            Some(TrimWhereField::Trailing) => BuiltinScalarFunction::Rtrim,
+            Some(TrimWhereField::Both) => BuiltinScalarFunction::Btrim,
+            None => BuiltinScalarFunction::Trim,
+        };
+        let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
+        let args = match trim_what {
+            Some(to_trim) => {
+                let to_trim =
+                    self.sql_expr_to_logical_expr(*to_trim, schema, planner_context)?;
+                vec![arg, to_trim]
+            }
+            None => vec![arg],
+        };
+        Ok(Expr::ScalarFunction { fun, args })
+    }
+
+    fn sql_agg_with_filter_to_expr(
+        &self,
+        expr: SQLExpr,
+        filter: SQLExpr,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        match self.sql_expr_to_logical_expr(expr, schema, planner_context)? {
+            Expr::AggregateFunction(expr::AggregateFunction {
+                fun,
+                args,
+                distinct,
+                ..
+            }) => Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
+                fun,
+                args,
+                distinct,
+                Some(Box::new(self.sql_expr_to_logical_expr(
+                    filter,
+                    schema,
+                    planner_context,
+                )?)),
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "AggregateExpressionWithFilter expression was not an AggregateFunction"
+                    .to_string(),
+            )),
+        }
+    }
+}
+
+fn plan_key(key: SQLExpr) -> Result<ScalarValue> {
+    let scalar = match key {
+        SQLExpr::Value(Value::Number(s, _)) => ScalarValue::Int64(Some(
+            s.parse()
+                .map_err(|_| ParserError(format!("Cannot parse {s} as i64.")))?,
+        )),
+        SQLExpr::Value(Value::SingleQuotedString(s) | Value::DoubleQuotedString(s)) => {
+            ScalarValue::Utf8(Some(s))
+        }
+        _ => {
+            return Err(DataFusionError::SQL(ParserError(format!(
+                "Unsuported index key expression: {key:?}"
+            ))));
+        }
+    };
+
+    Ok(scalar)
+}
+
+fn plan_indexed(expr: Expr, mut keys: Vec<SQLExpr>) -> Result<Expr> {
+    let key = keys.pop().ok_or_else(|| {
+        ParserError("Internal error: Missing index key expression".to_string())
+    })?;
+
+    let expr = if !keys.is_empty() {
+        plan_indexed(expr, keys)?
+    } else {
+        expr
+    };
+
+    Ok(Expr::GetIndexedField(GetIndexedField::new(
+        Box::new(expr),
+        plan_key(key)?,
+    )))
+}
diff --git a/datafusion/sql/src/expr/order_by.rs b/datafusion/sql/src/expr/order_by.rs
new file mode 100644
index 000000000..f3a4f2b04
--- /dev/null
+++ b/datafusion/sql/src/expr/order_by.rs
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use datafusion_common::{DFSchema, DataFusionError, Result};
+use datafusion_expr::expr::Sort;
+use datafusion_expr::Expr;
+use sqlparser::ast::{Expr as SQLExpr, OrderByExpr, Value};
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+    /// convert sql OrderByExpr to Expr::Sort
+    pub(crate) fn order_by_to_sort_expr(
+        &self,
+        e: OrderByExpr,
+        schema: &DFSchema,
+    ) -> Result<Expr> {
+        let OrderByExpr {
+            asc,
+            expr,
+            nulls_first,
+        } = e;
+
+        let expr = match expr {
+            SQLExpr::Value(Value::Number(v, _)) => {
+                let field_index = v
+                    .parse::<usize>()
+                    .map_err(|err| DataFusionError::Plan(err.to_string()))?;
+
+                if field_index == 0 {
+                    return Err(DataFusionError::Plan(
+                        "Order by index starts at 1 for column indexes".to_string(),
+                    ));
+                } else if schema.fields().len() < field_index {
+                    return Err(DataFusionError::Plan(format!(
+                        "Order by column out of bounds, specified: {}, max: {}",
+                        field_index,
+                        schema.fields().len()
+                    )));
+                }
+
+                let field = schema.field(field_index - 1);
+                Expr::Column(field.qualified_column())
+            }
+            e => self.sql_expr_to_logical_expr(e, schema, &mut PlannerContext::new())?,
+        };
+        Ok({
+            let asc = asc.unwrap_or(true);
+            Expr::Sort(Sort::new(
+                Box::new(expr),
+                asc,
+                // when asc is true, by default nulls last to be consistent with postgres
+                // postgres rule: https://www.postgresql.org/docs/current/queries-order.html
+                nulls_first.unwrap_or(!asc),
+            ))
+        })
+    }
+}
diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs
new file mode 100644
index 000000000..2d2213c86
--- /dev/null
+++ b/datafusion/sql/src/expr/subquery.rs
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use datafusion_common::{DFSchema, Result};
+use datafusion_expr::{Expr, Subquery};
+use sqlparser::ast::Expr as SQLExpr;
+use sqlparser::ast::Query;
+use std::sync::Arc;
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+    pub(super) fn parse_exists_subquery(
+        &self,
+        subquery: Query,
+        negated: bool,
+        input_schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        Ok(Expr::Exists {
+            subquery: Subquery {
+                subquery: Arc::new(self.subquery_to_plan(
+                    subquery,
+                    planner_context,
+                    input_schema,
+                )?),
+            },
+            negated,
+        })
+    }
+
+    pub(super) fn parse_in_subquery(
+        &self,
+        expr: SQLExpr,
+        subquery: Query,
+        negated: bool,
+        input_schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        Ok(Expr::InSubquery {
+            expr: Box::new(self.sql_to_rex(expr, input_schema, planner_context)?),
+            subquery: Subquery {
+                subquery: Arc::new(self.subquery_to_plan(
+                    subquery,
+                    planner_context,
+                    input_schema,
+                )?),
+            },
+            negated,
+        })
+    }
+
+    pub(super) fn parse_scalar_subquery(
+        &self,
+        subquery: Query,
+        input_schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        Ok(Expr::ScalarSubquery(Subquery {
+            subquery: Arc::new(self.subquery_to_plan(
+                subquery,
+                planner_context,
+                input_schema,
+            )?),
+        }))
+    }
+}
diff --git a/datafusion/sql/src/expr/substring.rs b/datafusion/sql/src/expr/substring.rs
new file mode 100644
index 000000000..991f82a67
--- /dev/null
+++ b/datafusion/sql/src/expr/substring.rs
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue};
+use datafusion_expr::{BuiltinScalarFunction, Expr};
+use sqlparser::ast::Expr as SQLExpr;
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+    pub(super) fn sql_substring_to_expr(
+        &self,
+        expr: Box<SQLExpr>,
+        substring_from: Option<Box<SQLExpr>>,
+        substring_for: Option<Box<SQLExpr>>,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        let args = match (substring_from, substring_for) {
+            (Some(from_expr), Some(for_expr)) => {
+                let arg =
+                    self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
+                let from_logic =
+                    self.sql_expr_to_logical_expr(*from_expr, schema, planner_context)?;
+                let for_logic =
+                    self.sql_expr_to_logical_expr(*for_expr, schema, planner_context)?;
+                vec![arg, from_logic, for_logic]
+            }
+            (Some(from_expr), None) => {
+                let arg =
+                    self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
+                let from_logic =
+                    self.sql_expr_to_logical_expr(*from_expr, schema, planner_context)?;
+                vec![arg, from_logic]
+            }
+            (None, Some(for_expr)) => {
+                let arg =
+                    self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
+                let from_logic = Expr::Literal(ScalarValue::Int64(Some(1)));
+                let for_logic =
+                    self.sql_expr_to_logical_expr(*for_expr, schema, planner_context)?;
+                vec![arg, from_logic, for_logic]
+            }
+            (None, None) => {
+                let orig_sql = SQLExpr::Substring {
+                    expr,
+                    substring_from: None,
+                    substring_for: None,
+                };
+
+                return Err(DataFusionError::Plan(format!(
+                    "Substring without for/from is not valid {orig_sql:?}"
+                )));
+            }
+        };
+
+        Ok(Expr::ScalarFunction {
+            fun: BuiltinScalarFunction::Substr,
+            args,
+        })
+    }
+}
diff --git a/datafusion/sql/src/expr/unary_op.rs b/datafusion/sql/src/expr/unary_op.rs
new file mode 100644
index 000000000..d24fc7154
--- /dev/null
+++ b/datafusion/sql/src/expr/unary_op.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use datafusion_common::{DFSchema, DataFusionError, Result};
+use datafusion_expr::{lit, Expr};
+use sqlparser::ast::{Expr as SQLExpr, UnaryOperator, Value};
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+    pub(crate) fn parse_sql_unary_op(
+        &self,
+        op: UnaryOperator,
+        expr: SQLExpr,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        match op {
+            UnaryOperator::Not => Ok(Expr::Not(Box::new(
+                self.sql_expr_to_logical_expr(expr, schema, planner_context)?,
+            ))),
+            UnaryOperator::Plus => {
+                Ok(self.sql_expr_to_logical_expr(expr, schema, planner_context)?)
+            }
+            UnaryOperator::Minus => {
+                match expr {
+                    // optimization: if it's a number literal, we apply the negative operator
+                    // here directly to calculate the new literal.
+                    SQLExpr::Value(Value::Number(n, _)) => match n.parse::<i64>() {
+                        Ok(n) => Ok(lit(-n)),
+                        Err(_) => Ok(lit(-n
+                            .parse::<f64>()
+                            .map_err(|_e| {
+                                DataFusionError::Internal(format!(
+                                    "negative operator can be only applied to integer and float operands, got: {n}"))
+                            })?)),
+                    },
+                    // not a literal, apply negative operator on expression
+                    _ => Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?))),
+                }
+            }
+            _ => Err(DataFusionError::NotImplemented(format!(
+                "Unsupported SQL unary operator {op:?}"
+            ))),
+        }
+    }
+}
diff --git a/datafusion/sql/src/expr/value.rs b/datafusion/sql/src/expr/value.rs
new file mode 100644
index 000000000..86ca28d4c
--- /dev/null
+++ b/datafusion/sql/src/expr/value.rs
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use arrow_schema::DataType;
+use datafusion_common::{parse_interval, DFSchema, DataFusionError, Result, ScalarValue};
+use datafusion_expr::{lit, Expr};
+use log::debug;
+use sqlparser::ast::{DateTimeField, Expr as SQLExpr, Value};
+use sqlparser::parser::ParserError::ParserError;
+use std::collections::HashSet;
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+    pub(crate) fn parse_value(
+        &self,
+        value: Value,
+        param_data_types: &[DataType],
+    ) -> Result<Expr> {
+        match value {
+            Value::Number(n, _) => self.parse_sql_number(&n),
+            Value::SingleQuotedString(s) | Value::DoubleQuotedString(s) => Ok(lit(s)),
+            Value::Null => Ok(Expr::Literal(ScalarValue::Null)),
+            Value::Boolean(n) => Ok(lit(n)),
+            Value::Placeholder(param) => {
+                Self::create_placeholder_expr(param, param_data_types)
+            }
+            _ => Err(DataFusionError::Plan(format!(
+                "Unsupported Value '{:?}'",
+                value,
+            ))),
+        }
+    }
+
+    /// Parse number in sql string, convert to Expr::Literal
+    fn parse_sql_number(&self, n: &str) -> Result<Expr> {
+        if n.find('E').is_some() {
+            // not implemented yet
+            // https://github.com/apache/arrow-datafusion/issues/3448
+            Err(DataFusionError::NotImplemented(
+                "sql numeric literals in scientific notation are not supported"
+                    .to_string(),
+            ))
+        } else if let Ok(n) = n.parse::<i64>() {
+            Ok(lit(n))
+        } else if self.options.parse_float_as_decimal {
+            // remove leading zeroes
+            let str = n.trim_start_matches('0');
+            if let Some(i) = str.find('.') {
+                let p = str.len() - 1;
+                let s = str.len() - i - 1;
+                let str = str.replace('.', "");
+                let n = str.parse::<i128>().map_err(|_| {
+                    DataFusionError::from(ParserError(format!(
+                        "Cannot parse {str} as i128 when building decimal"
+                    )))
+                })?;
+                Ok(Expr::Literal(ScalarValue::Decimal128(
+                    Some(n),
+                    p as u8,
+                    s as i8,
+                )))
+            } else {
+                let number = n.parse::<i128>().map_err(|_| {
+                    DataFusionError::from(ParserError(format!(
+                        "Cannot parse {n} as i128 when building decimal"
+                    )))
+                })?;
+                Ok(Expr::Literal(ScalarValue::Decimal128(Some(number), 38, 0)))
+            }
+        } else {
+            n.parse::<f64>().map(lit).map_err(|_| {
+                DataFusionError::from(ParserError(format!("Cannot parse {n} as f64")))
+            })
+        }
+    }
+
+    /// Create a placeholder expression
+    /// This is the same as Postgres's prepare statement syntax in which a placeholder starts with `$` sign and then
+    /// number 1, 2, ... etc. For example, `$1` is the first placeholder; $2 is the second one and so on.
+    fn create_placeholder_expr(
+        param: String,
+        param_data_types: &[DataType],
+    ) -> Result<Expr> {
+        // Parse the placeholder as a number because it is the only support from sqlparser and postgres
+        let index = param[1..].parse::<usize>();
+        let idx = match index {
+            Ok(index) => index - 1,
+            Err(_) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Invalid placeholder, not a number: {param}"
+                )));
+            }
+        };
+        // Check if the placeholder is in the parameter list
+        if param_data_types.len() <= idx {
+            return Err(DataFusionError::Internal(format!(
+                "Placehoder {param} does not exist in the parameter list: {param_data_types:?}"
+            )));
+        }
+        // Data type of the parameter
+        let param_type = param_data_types[idx].clone();
+        debug!(
+            "type of param {} param_data_types[idx]: {:?}",
+            param, param_type
+        );
+
+        Ok(Expr::Placeholder {
+            id: param,
+            data_type: param_type,
+        })
+    }
+
+    pub(super) fn sql_array_literal(
+        &self,
+        elements: Vec<SQLExpr>,
+        schema: &DFSchema,
+    ) -> Result<Expr> {
+        let mut values = Vec::with_capacity(elements.len());
+
+        for element in elements {
+            let value = self.sql_expr_to_logical_expr(
+                element,
+                schema,
+                &mut PlannerContext::new(),
+            )?;
+            match value {
+                Expr::Literal(scalar) => {
+                    values.push(scalar);
+                }
+                _ => {
+                    return Err(DataFusionError::NotImplemented(format!(
+                        "Arrays with elements other than literal are not supported: {value}"
+                    )));
+                }
+            }
+        }
+
+        let data_types: HashSet<DataType> =
+            values.iter().map(|e| e.get_datatype()).collect();
+
+        if data_types.is_empty() {
+            Ok(lit(ScalarValue::new_list(None, DataType::Utf8)))
+        } else if data_types.len() > 1 {
+            Err(DataFusionError::NotImplemented(format!(
+                "Arrays with different types are not supported: {data_types:?}",
+            )))
+        } else {
+            let data_type = values[0].get_datatype();
+
+            Ok(lit(ScalarValue::new_list(Some(values), data_type)))
+        }
+    }
+
+    pub(super) fn sql_interval_to_expr(
+        &self,
+        value: SQLExpr,
+        leading_field: Option<DateTimeField>,
+        leading_precision: Option<u64>,
+        last_field: Option<DateTimeField>,
+        fractional_seconds_precision: Option<u64>,
+    ) -> Result<Expr> {
+        if leading_precision.is_some() {
+            return Err(DataFusionError::NotImplemented(format!(
+                "Unsupported Interval Expression with leading_precision {leading_precision:?}"
+            )));
+        }
+
+        if last_field.is_some() {
+            return Err(DataFusionError::NotImplemented(format!(
+                "Unsupported Interval Expression with last_field {last_field:?}"
+            )));
+        }
+
+        if fractional_seconds_precision.is_some() {
+            return Err(DataFusionError::NotImplemented(format!(
+                "Unsupported Interval Expression with fractional_seconds_precision {fractional_seconds_precision:?}"
+            )));
+        }
+
+        // Only handle string exprs for now
+        let value = match value {
+            SQLExpr::Value(
+                Value::SingleQuotedString(s) | Value::DoubleQuotedString(s),
+            ) => s,
+            _ => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Unsupported interval argument. Expected string literal, got: {value:?}"
+                )));
+            }
+        };
+
+        let leading_field = leading_field
+            .as_ref()
+            .map(|dt| dt.to_string())
+            .unwrap_or_else(|| "second".to_string());
+
+        Ok(lit(parse_interval(&leading_field, &value)?))
+    }
+}
diff --git a/datafusion/sql/src/lib.rs b/datafusion/sql/src/lib.rs
index 3b6526da8..1f18c0e3b 100644
--- a/datafusion/sql/src/lib.rs
+++ b/datafusion/sql/src/lib.rs
@@ -18,6 +18,7 @@
 //! This module provides a SQL parser that translates SQL queries into an abstract syntax
 //! tree (AST), and a SQL query planner that creates a logical plan from the AST.
 
+mod expr;
 pub mod parser;
 pub mod planner;
 pub mod utils;
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 4d1b2c310..39ce2106b 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -16,35 +16,30 @@
 // under the License.
 
 //! SQL Query Planner (produces logical plan from SQL AST)
-use log::debug;
 use std::collections::{HashMap, HashSet};
-use std::str::FromStr;
 use std::sync::Arc;
-use std::{convert::TryInto, vec};
+use std::vec;
 
 use arrow_schema::*;
-use sqlparser::ast::{ArrayAgg, ExactNumberInfo, SetQuantifier};
+use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
 use sqlparser::ast::{
-    BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr,
-    Function as SQLFunction, FunctionArg, FunctionArgExpr, Ident, Join, JoinConstraint,
-    JoinOperator, ObjectName, Offset as SQLOffset, Query, Select, SelectItem, SetExpr,
-    SetOperator, ShowCreateObject, ShowStatementFilter, TableAlias, TableFactor,
-    TableWithJoins, TrimWhereField, UnaryOperator, Value, Values as SQLValues,
+    DataType as SQLDataType, Expr as SQLExpr, Ident, Join, JoinConstraint, JoinOperator,
+    ObjectName, Offset as SQLOffset, Query, Select, SelectItem, SetExpr, SetOperator,
+    ShowCreateObject, ShowStatementFilter, TableAlias, TableFactor, TableWithJoins,
+    UnaryOperator, Value, Values as SQLValues,
 };
-use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
+use sqlparser::ast::{ExactNumberInfo, SetQuantifier};
 use sqlparser::ast::{ObjectType, OrderByExpr, Statement};
 use sqlparser::ast::{TimezoneInfo, WildcardAdditionalOptions};
 use sqlparser::parser::ParserError::ParserError;
 
-use datafusion_common::parsers::{parse_interval, CompressionTypeVariant};
+use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::ToDFSchema;
 use datafusion_common::{
     field_not_found, Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
 };
 use datafusion_common::{OwnedTableReference, TableReference};
-use datafusion_expr::expr::{
-    self, Between, BinaryExpr, Case, Cast, GroupingSet, Like, Sort, TryCast,
-};
+use datafusion_expr::expr::{Cast, GroupingSet};
 use datafusion_expr::expr_rewriter::normalize_col;
 use datafusion_expr::expr_rewriter::normalize_col_with_schemas;
 use datafusion_expr::logical_plan::builder::project;
@@ -56,20 +51,14 @@ use datafusion_expr::logical_plan::{
     DropTable, DropView, Explain, JoinType, LogicalPlan, LogicalPlanBuilder,
     Partitioning, PlanType, SetVariable, ToStringifiedPlan,
 };
-use datafusion_expr::logical_plan::{Filter, Prepare, Subquery};
+use datafusion_expr::logical_plan::{Filter, Prepare};
 use datafusion_expr::utils::{
     expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns,
-    find_aggregate_exprs, find_column_exprs, find_window_exprs, COUNT_STAR_EXPANSION,
+    find_aggregate_exprs, find_column_exprs, find_window_exprs,
 };
 use datafusion_expr::Expr::Alias;
-use datafusion_expr::{
-    cast, col, lit, AggregateFunction, AggregateUDF, Expr, ExprSchemable,
-    GetIndexedField, Operator, ScalarUDF, SubqueryAlias, WindowFrame, WindowFrameUnits,
-};
-use datafusion_expr::{
-    window_function::{self, WindowFunction},
-    BuiltinScalarFunction, TableSource,
-};
+use datafusion_expr::TableSource;
+use datafusion_expr::{cast, col, lit, AggregateUDF, Expr, ScalarUDF, SubqueryAlias};
 
 use crate::parser::{CreateExternalTable, DescribeTable, Statement as DFStatement};
 use crate::utils::{make_decimal_type, normalize_ident, resolve_columns};
@@ -100,7 +89,7 @@ pub trait ContextProvider {
 /// SQL parser options
 #[derive(Debug, Default)]
 pub struct ParserOptions {
-    parse_float_as_decimal: bool,
+    pub(crate) parse_float_as_decimal: bool,
 }
 
 #[derive(Debug, Clone)]
@@ -141,44 +130,8 @@ impl PlannerContext {
 
 /// SQL query planner
 pub struct SqlToRel<'a, S: ContextProvider> {
-    schema_provider: &'a S,
-    options: ParserOptions,
-}
-
-fn plan_key(key: SQLExpr) -> Result<ScalarValue> {
-    let scalar = match key {
-        SQLExpr::Value(Value::Number(s, _)) => ScalarValue::Int64(Some(
-            s.parse()
-                .map_err(|_| ParserError(format!("Cannot parse {s} as i64.")))?,
-        )),
-        SQLExpr::Value(Value::SingleQuotedString(s) | Value::DoubleQuotedString(s)) => {
-            ScalarValue::Utf8(Some(s))
-        }
-        _ => {
-            return Err(DataFusionError::SQL(ParserError(format!(
-                "Unsuported index key expression: {key:?}"
-            ))));
-        }
-    };
-
-    Ok(scalar)
-}
-
-fn plan_indexed(expr: Expr, mut keys: Vec<SQLExpr>) -> Result<Expr> {
-    let key = keys.pop().ok_or_else(|| {
-        ParserError("Internal error: Missing index key expression".to_string())
-    })?;
-
-    let expr = if !keys.is_empty() {
-        plan_indexed(expr, keys)?
-    } else {
-        expr
-    };
-
-    Ok(Expr::GetIndexedField(GetIndexedField::new(
-        Box::new(expr),
-        plan_key(key)?,
-    )))
+    pub(crate) schema_provider: &'a S,
+    pub(crate) options: ParserOptions,
 }
 
 impl<'a, S: ContextProvider> SqlToRel<'a, S> {
@@ -1413,49 +1366,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         LogicalPlanBuilder::from(plan).sort(order_by_rex)?.build()
     }
 
-    /// convert sql OrderByExpr to Expr::Sort
-    fn order_by_to_sort_expr(&self, e: OrderByExpr, schema: &DFSchema) -> Result<Expr> {
-        let OrderByExpr {
-            asc,
-            expr,
-            nulls_first,
-        } = e;
-
-        let expr = match expr {
-            SQLExpr::Value(Value::Number(v, _)) => {
-                let field_index = v
-                    .parse::<usize>()
-                    .map_err(|err| DataFusionError::Plan(err.to_string()))?;
-
-                if field_index == 0 {
-                    return Err(DataFusionError::Plan(
-                        "Order by index starts at 1 for column indexes".to_string(),
-                    ));
-                } else if schema.fields().len() < field_index {
-                    return Err(DataFusionError::Plan(format!(
-                        "Order by column out of bounds, specified: {}, max: {}",
-                        field_index,
-                        schema.fields().len()
-                    )));
-                }
-
-                let field = schema.field(field_index - 1);
-                Expr::Column(field.qualified_column())
-            }
-            e => self.sql_expr_to_logical_expr(e, schema, &mut PlannerContext::new())?,
-        };
-        Ok({
-            let asc = asc.unwrap_or(true);
-            Expr::Sort(Sort::new(
-                Box::new(expr),
-                asc,
-                // when asc is true, by default nulls last to be consistent with postgres
-                // postgres rule: https://www.postgresql.org/docs/current/queries-order.html
-                nulls_first.unwrap_or(!asc),
-            ))
-        })
-    }
-
     /// Validate the schema provides all of the columns referenced in the expressions.
     fn validate_schema_satisfies_exprs(
         &self,
@@ -1618,128 +1528,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         }
     }
 
-    fn sql_fn_arg_to_logical_expr(
-        &self,
-        sql: FunctionArg,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        match sql {
-            FunctionArg::Named {
-                name: _,
-                arg: FunctionArgExpr::Expr(arg),
-            } => self.sql_expr_to_logical_expr(arg, schema, planner_context),
-            FunctionArg::Named {
-                name: _,
-                arg: FunctionArgExpr::Wildcard,
-            } => Ok(Expr::Wildcard),
-            FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
-                self.sql_expr_to_logical_expr(arg, schema, planner_context)
-            }
-            FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => Ok(Expr::Wildcard),
-            _ => Err(DataFusionError::NotImplemented(format!(
-                "Unsupported qualified wildcard argument: {sql:?}"
-            ))),
-        }
-    }
-
-    fn parse_sql_binary_op(
-        &self,
-        left: SQLExpr,
-        op: BinaryOperator,
-        right: SQLExpr,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        let operator = match op {
-            BinaryOperator::Gt => Ok(Operator::Gt),
-            BinaryOperator::GtEq => Ok(Operator::GtEq),
-            BinaryOperator::Lt => Ok(Operator::Lt),
-            BinaryOperator::LtEq => Ok(Operator::LtEq),
-            BinaryOperator::Eq => Ok(Operator::Eq),
-            BinaryOperator::NotEq => Ok(Operator::NotEq),
-            BinaryOperator::Plus => Ok(Operator::Plus),
-            BinaryOperator::Minus => Ok(Operator::Minus),
-            BinaryOperator::Multiply => Ok(Operator::Multiply),
-            BinaryOperator::Divide => Ok(Operator::Divide),
-            BinaryOperator::Modulo => Ok(Operator::Modulo),
-            BinaryOperator::And => Ok(Operator::And),
-            BinaryOperator::Or => Ok(Operator::Or),
-            BinaryOperator::PGRegexMatch => Ok(Operator::RegexMatch),
-            BinaryOperator::PGRegexIMatch => Ok(Operator::RegexIMatch),
-            BinaryOperator::PGRegexNotMatch => Ok(Operator::RegexNotMatch),
-            BinaryOperator::PGRegexNotIMatch => Ok(Operator::RegexNotIMatch),
-            BinaryOperator::BitwiseAnd => Ok(Operator::BitwiseAnd),
-            BinaryOperator::BitwiseOr => Ok(Operator::BitwiseOr),
-            BinaryOperator::BitwiseXor => Ok(Operator::BitwiseXor),
-            BinaryOperator::PGBitwiseShiftRight => Ok(Operator::BitwiseShiftRight),
-            BinaryOperator::PGBitwiseShiftLeft => Ok(Operator::BitwiseShiftLeft),
-            BinaryOperator::StringConcat => Ok(Operator::StringConcat),
-            _ => Err(DataFusionError::NotImplemented(format!(
-                "Unsupported SQL binary operator {op:?}"
-            ))),
-        }?;
-
-        Ok(Expr::BinaryExpr(BinaryExpr::new(
-            Box::new(self.sql_expr_to_logical_expr(left, schema, planner_context)?),
-            operator,
-            Box::new(self.sql_expr_to_logical_expr(right, schema, planner_context)?),
-        )))
-    }
-
-    fn parse_sql_unary_op(
-        &self,
-        op: UnaryOperator,
-        expr: SQLExpr,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        match op {
-            UnaryOperator::Not => Ok(Expr::Not(Box::new(
-                self.sql_expr_to_logical_expr(expr, schema, planner_context)?,
-            ))),
-            UnaryOperator::Plus => {
-                Ok(self.sql_expr_to_logical_expr(expr, schema, planner_context)?)
-            }
-            UnaryOperator::Minus => {
-                match expr {
-                    // optimization: if it's a number literal, we apply the negative operator
-                    // here directly to calculate the new literal.
-                    SQLExpr::Value(Value::Number(n, _)) => match n.parse::<i64>() {
-                        Ok(n) => Ok(lit(-n)),
-                        Err(_) => Ok(lit(-n
-                            .parse::<f64>()
-                            .map_err(|_e| {
-                                DataFusionError::Internal(format!(
-                                    "negative operator can be only applied to integer and float operands, got: {n}"))
-                            })?)),
-                    },
-                    // not a literal, apply negative operator on expression
-                    _ => Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?))),
-                }
-            }
-            _ => Err(DataFusionError::NotImplemented(format!(
-                "Unsupported SQL unary operator {op:?}"
-            ))),
-        }
-    }
-
-    fn parse_value(&self, value: Value, param_data_types: &[DataType]) -> Result<Expr> {
-        match value {
-            Value::Number(n, _) => self.parse_sql_number(&n),
-            Value::SingleQuotedString(s) | Value::DoubleQuotedString(s) => Ok(lit(s)),
-            Value::Null => Ok(Expr::Literal(ScalarValue::Null)),
-            Value::Boolean(n) => Ok(lit(n)),
-            Value::Placeholder(param) => {
-                Self::create_placeholder_expr(param, param_data_types)
-            }
-            _ => Err(DataFusionError::Plan(format!(
-                "Unsupported Value '{:?}'",
-                value,
-            ))),
-        }
-    }
-
     fn sql_values_to_plan(
         &self,
         values: SQLValues,
@@ -1798,978 +1586,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         LogicalPlanBuilder::values(values)?.build()
     }
 
-    /// Create a placeholder expression
-    /// This is the same as Postgres's prepare statement syntax in which a placeholder starts with `$` sign and then
-    /// number 1, 2, ... etc. For example, `$1` is the first placeholder; $2 is the second one and so on.
-    fn create_placeholder_expr(
-        param: String,
-        param_data_types: &[DataType],
-    ) -> Result<Expr> {
-        // Parse the placeholder as a number because it is the only support from sqlparser and postgres
-        let index = param[1..].parse::<usize>();
-        let idx = match index {
-            Ok(index) => index - 1,
-            Err(_) => {
-                return Err(DataFusionError::Internal(format!(
-                    "Invalid placeholder, not a number: {param}"
-                )));
-            }
-        };
-        // Check if the placeholder is in the parameter list
-        if param_data_types.len() <= idx {
-            return Err(DataFusionError::Internal(format!(
-                "Placehoder {param} does not exist in the parameter list: {param_data_types:?}"
-            )));
-        }
-        // Data type of the parameter
-        let param_type = param_data_types[idx].clone();
-        debug!(
-            "type of param {} param_data_types[idx]: {:?}",
-            param, param_type
-        );
-
-        Ok(Expr::Placeholder {
-            id: param,
-            data_type: param_type,
-        })
-    }
-
-    fn sql_expr_to_logical_expr(
-        &self,
-        sql: SQLExpr,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        // Workaround for https://github.com/apache/arrow-datafusion/issues/4065
-        //
-        // Minimize stack space required in debug builds to plan
-        // deeply nested binary operators by keeping the stack space
-        // needed for sql_expr_to_logical_expr minimal for BinaryOp
-        //
-        // The reason this reduces stack size in debug builds is
-        // explained in the "Technical Backstory" heading of
-        // https://github.com/apache/arrow-datafusion/pull/1047
-        //
-        // A likely better way to support deeply nested expressions
-        // would be to avoid recursion all together and use an
-        // iterative algorithm.
-        match sql {
-            SQLExpr::BinaryOp { left, op, right } => {
-                self.parse_sql_binary_op(*left, op, *right, schema, planner_context)
-            }
-            // since this function requires more space per frame
-            // avoid calling it for binary ops
-            _ => self.sql_expr_to_logical_expr_internal(sql, schema, planner_context),
-        }
-    }
-
-    /// Internal implementation. Use
-    /// [`Self::sql_expr_to_logical_expr`] to plan exprs.
-    fn sql_expr_to_logical_expr_internal(
-        &self,
-        sql: SQLExpr,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        match sql {
-            SQLExpr::Value(value) => {
-                self.parse_value(value, &planner_context.prepare_param_data_types)
-            }
-            SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction {
-                fun: BuiltinScalarFunction::DatePart,
-                args: vec![
-                    Expr::Literal(ScalarValue::Utf8(Some(format!("{field}")))),
-                    self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
-                ],
-            }),
-
-            SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema),
-            SQLExpr::Interval {
-                value,
-                leading_field,
-                leading_precision,
-                last_field,
-                fractional_seconds_precision,
-            } => self.sql_interval_to_expr(
-                *value,
-                leading_field,
-                leading_precision,
-                last_field,
-                fractional_seconds_precision,
-            ),
-            SQLExpr::Identifier(id) => self.sql_identifier_to_expr(id),
-
-            SQLExpr::MapAccess { column, keys } => {
-                if let SQLExpr::Identifier(id) = *column {
-                    plan_indexed(col(normalize_ident(id)), keys)
-                } else {
-                    Err(DataFusionError::NotImplemented(format!(
-                        "map access requires an identifier, found column {column} instead"
-                    )))
-                }
-            }
-
-            SQLExpr::ArrayIndex { obj, indexes } => {
-                let expr = self.sql_expr_to_logical_expr(*obj, schema, planner_context)?;
-                plan_indexed(expr, indexes)
-            }
-
-            SQLExpr::CompoundIdentifier(ids) => self.sql_compound_identifier_to_expr(ids, schema),
-
-            SQLExpr::Case {
-                operand,
-                conditions,
-                results,
-                else_result,
-            } => self.sql_case_identifier_to_expr(operand, conditions, results, else_result, schema, planner_context),
-
-            SQLExpr::Cast {
-                expr,
-                data_type,
-            } => Ok(Expr::Cast(Cast::new(
-                Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?),
-                self.convert_data_type(&data_type)?,
-            ))),
-
-            SQLExpr::TryCast {
-                expr,
-                data_type,
-            } => Ok(Expr::TryCast(TryCast::new(
-                Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?),
-                self.convert_data_type(&data_type)?,
-            ))),
-
-            SQLExpr::TypedString {
-                data_type,
-                value,
-            } => Ok(Expr::Cast(Cast::new(
-                Box::new(lit(value)),
-                self.convert_data_type(&data_type)?,
-            ))),
-
-            SQLExpr::IsNull(expr) => Ok(Expr::IsNull(Box::new(
-                self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
-            ))),
-
-            SQLExpr::IsNotNull(expr) => Ok(Expr::IsNotNull(Box::new(
-                self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
-            ))),
-
-            SQLExpr::IsDistinctFrom(left, right) => Ok(Expr::BinaryExpr(BinaryExpr::new(
-                Box::new(self.sql_expr_to_logical_expr(*left, schema, planner_context)?),
-                Operator::IsDistinctFrom,
-                Box::new(self.sql_expr_to_logical_expr(*right, schema, planner_context)?),
-            ))),
-
-            SQLExpr::IsNotDistinctFrom(left, right) => Ok(Expr::BinaryExpr(BinaryExpr::new(
-                Box::new(self.sql_expr_to_logical_expr(*left, schema, planner_context)?),
-                Operator::IsNotDistinctFrom,
-                Box::new(self.sql_expr_to_logical_expr(*right, schema, planner_context)?),
-            ))),
-
-            SQLExpr::IsTrue(expr) => Ok(Expr::IsTrue(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
-
-            SQLExpr::IsFalse(expr) => Ok(Expr::IsFalse(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
-
-            SQLExpr::IsNotTrue(expr) => Ok(Expr::IsNotTrue(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
-
-            SQLExpr::IsNotFalse(expr) => Ok(Expr::IsNotFalse(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
-
-            SQLExpr::IsUnknown(expr) => Ok(Expr::IsUnknown(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
-
-            SQLExpr::IsNotUnknown(expr) => Ok(Expr::IsNotUnknown(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
-
-            SQLExpr::UnaryOp { op, expr } => self.parse_sql_unary_op(op, *expr, schema, planner_context),
-
-            SQLExpr::Between {
-                expr,
-                negated,
-                low,
-                high,
-            } => Ok(Expr::Between(Between::new(
-                Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?),
-                negated,
-                Box::new(self.sql_expr_to_logical_expr(*low, schema, planner_context)?),
-                Box::new(self.sql_expr_to_logical_expr(*high, schema, planner_context)?),
-            ))),
-
-            SQLExpr::InList {
-                expr,
-                list,
-                negated,
-            } => self.sql_in_list_to_expr(*expr, list, negated, schema, planner_context),
-
-            SQLExpr::Like { negated, expr, pattern, escape_char } => self.sql_like_to_expr(negated, *expr, *pattern, escape_char, schema, planner_context),
-
-            SQLExpr::ILike { negated, expr, pattern, escape_char } =>  self.sql_ilike_to_expr(negated, *expr, *pattern, escape_char, schema, planner_context),
-
-            SQLExpr::SimilarTo { negated, expr, pattern, escape_char } => self.sql_similarto_to_expr(negated, *expr, *pattern, escape_char, schema, planner_context),
-
-
-            SQLExpr::BinaryOp {
-                ..
-            } => {
-                Err(DataFusionError::Internal(
-                    "binary_op should be handled by sql_expr_to_logical_expr.".to_string()
-                ))
-            }
-
-
-            #[cfg(feature = "unicode_expressions")]
-            SQLExpr::Substring {
-                expr,
-                substring_from,
-                substring_for,
-            } => self.sql_substring_to_expr(expr, substring_from, substring_for, schema, planner_context),
-
-            #[cfg(not(feature = "unicode_expressions"))]
-            SQLExpr::Substring {
-                ..
-            } => {
-                Err(DataFusionError::Internal(
-                    "statement substring requires compilation with feature flag: unicode_expressions.".to_string()
-                ))
-            }
-
-            SQLExpr::Trim { expr, trim_where, trim_what } => self.sql_trim_to_expr(*expr, trim_where, trim_what, schema, planner_context),
-
-            SQLExpr::AggregateExpressionWithFilter { expr, filter } => self.sql_agg_with_filter_to_expr(*expr, *filter, schema, planner_context),
-
-            SQLExpr::Function(function) => self.sql_function_to_expr(function, schema, planner_context),
-
-            SQLExpr::Rollup(exprs) => self.sql_rollup_to_expr(exprs, schema, planner_context),
-            SQLExpr::Cube(exprs) => self.sql_cube_to_expr(exprs,schema, planner_context),
-            SQLExpr::GroupingSets(exprs) => self.sql_grouping_sets_to_expr(exprs, schema, planner_context),
-
-            SQLExpr::Floor { expr, field: _field } => self.sql_named_function_to_expr(*expr, BuiltinScalarFunction::Floor, schema, planner_context),
-
-            SQLExpr::Ceil { expr, field: _field } => self.sql_named_function_to_expr(*expr, BuiltinScalarFunction::Ceil, schema, planner_context),
-
-            SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(*e, schema, planner_context),
-
-            SQLExpr::Exists { subquery, negated } => self.parse_exists_subquery(*subquery, negated, schema, planner_context),
-
-            SQLExpr::InSubquery { expr, subquery, negated } => self.parse_in_subquery(*expr, *subquery, negated, schema, planner_context),
-
-            SQLExpr::Subquery(subquery) => self.parse_scalar_subquery(*subquery, schema, planner_context),
-
-            SQLExpr::ArrayAgg(array_agg) => self.parse_array_agg(array_agg, schema, planner_context),
-
-            _ => Err(DataFusionError::NotImplemented(format!(
-                "Unsupported ast node in sqltorel: {sql:?}"
-            ))),
-        }
-    }
-
-    fn sql_function_to_expr(
-        &self,
-        mut function: SQLFunction,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        let name = if function.name.0.len() > 1 {
-            // DF doesn't handle compound identifiers
-            // (e.g. "foo.bar") for function names yet
-            function.name.to_string()
-        } else {
-            normalize_ident(function.name.0[0].clone())
-        };
-
-        // next, scalar built-in
-        if let Ok(fun) = BuiltinScalarFunction::from_str(&name) {
-            let args = self.function_args_to_expr(function.args, schema)?;
-            return Ok(Expr::ScalarFunction { fun, args });
-        };
-
-        // then, window function
-        if let Some(window) = function.over.take() {
-            let partition_by = window
-                .partition_by
-                .into_iter()
-                .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
-                .collect::<Result<Vec<_>>>()?;
-            let order_by = window
-                .order_by
-                .into_iter()
-                .map(|e| self.order_by_to_sort_expr(e, schema))
-                .collect::<Result<Vec<_>>>()?;
-            let window_frame = window
-                .window_frame
-                .as_ref()
-                .map(|window_frame| {
-                    let window_frame: WindowFrame = window_frame.clone().try_into()?;
-                    if WindowFrameUnits::Range == window_frame.units
-                        && order_by.len() != 1
-                    {
-                        Err(DataFusionError::Plan(format!(
-                            "With window frame of type RANGE, the order by expression must be of length 1, got {}", order_by.len())))
-                    } else {
-                        Ok(window_frame)
-                    }
-                })
-                .transpose()?;
-            let window_frame = if let Some(window_frame) = window_frame {
-                window_frame
-            } else {
-                WindowFrame::new(!order_by.is_empty())
-            };
-            let fun = self.find_window_func(&name)?;
-            let expr = match fun {
-                WindowFunction::AggregateFunction(aggregate_fun) => {
-                    let (aggregate_fun, args) =
-                        self.aggregate_fn_to_expr(aggregate_fun, function.args, schema)?;
-
-                    Expr::WindowFunction(expr::WindowFunction::new(
-                        WindowFunction::AggregateFunction(aggregate_fun),
-                        args,
-                        partition_by,
-                        order_by,
-                        window_frame,
-                    ))
-                }
-                _ => Expr::WindowFunction(expr::WindowFunction::new(
-                    fun,
-                    self.function_args_to_expr(function.args, schema)?,
-                    partition_by,
-                    order_by,
-                    window_frame,
-                )),
-            };
-            return Ok(expr);
-        }
-
-        // next, aggregate built-ins
-        if let Ok(fun) = AggregateFunction::from_str(&name) {
-            let distinct = function.distinct;
-            let (fun, args) = self.aggregate_fn_to_expr(fun, function.args, schema)?;
-            return Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
-                fun, args, distinct, None,
-            )));
-        };
-
-        // finally, user-defined functions (UDF) and UDAF
-        match self.schema_provider.get_function_meta(&name) {
-            Some(fm) => {
-                let args = self.function_args_to_expr(function.args, schema)?;
-
-                Ok(Expr::ScalarUDF { fun: fm, args })
-            }
-            None => match self.schema_provider.get_aggregate_meta(&name) {
-                Some(fm) => {
-                    let args = self.function_args_to_expr(function.args, schema)?;
-                    Ok(Expr::AggregateUDF {
-                        fun: fm,
-                        args,
-                        filter: None,
-                    })
-                }
-                _ => Err(DataFusionError::Plan(format!("Invalid function '{name}'"))),
-            },
-        }
-    }
-
-    fn sql_named_function_to_expr(
-        &self,
-        expr: SQLExpr,
-        fun: BuiltinScalarFunction,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        let args = vec![self.sql_expr_to_logical_expr(expr, schema, planner_context)?];
-        Ok(Expr::ScalarFunction { fun, args })
-    }
-
-    fn sql_substring_to_expr(
-        &self,
-        expr: Box<SQLExpr>,
-        substring_from: Option<Box<SQLExpr>>,
-        substring_for: Option<Box<SQLExpr>>,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        let args = match (substring_from, substring_for) {
-            (Some(from_expr), Some(for_expr)) => {
-                let arg =
-                    self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
-                let from_logic =
-                    self.sql_expr_to_logical_expr(*from_expr, schema, planner_context)?;
-                let for_logic =
-                    self.sql_expr_to_logical_expr(*for_expr, schema, planner_context)?;
-                vec![arg, from_logic, for_logic]
-            }
-            (Some(from_expr), None) => {
-                let arg =
-                    self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
-                let from_logic =
-                    self.sql_expr_to_logical_expr(*from_expr, schema, planner_context)?;
-                vec![arg, from_logic]
-            }
-            (None, Some(for_expr)) => {
-                let arg =
-                    self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
-                let from_logic = Expr::Literal(ScalarValue::Int64(Some(1)));
-                let for_logic =
-                    self.sql_expr_to_logical_expr(*for_expr, schema, planner_context)?;
-                vec![arg, from_logic, for_logic]
-            }
-            (None, None) => {
-                let orig_sql = SQLExpr::Substring {
-                    expr,
-                    substring_from: None,
-                    substring_for: None,
-                };
-
-                return Err(DataFusionError::Plan(format!(
-                    "Substring without for/from is not valid {orig_sql:?}"
-                )));
-            }
-        };
-
-        Ok(Expr::ScalarFunction {
-            fun: BuiltinScalarFunction::Substr,
-            args,
-        })
-    }
-
-    fn sql_in_list_to_expr(
-        &self,
-        expr: SQLExpr,
-        list: Vec<SQLExpr>,
-        negated: bool,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        let list_expr = list
-            .into_iter()
-            .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
-            .collect::<Result<Vec<_>>>()?;
-
-        Ok(Expr::InList {
-            expr: Box::new(self.sql_expr_to_logical_expr(
-                expr,
-                schema,
-                planner_context,
-            )?),
-            list: list_expr,
-            negated,
-        })
-    }
-
-    fn sql_like_to_expr(
-        &self,
-        negated: bool,
-        expr: SQLExpr,
-        pattern: SQLExpr,
-        escape_char: Option<char>,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
-        let pattern_type = pattern.get_type(schema)?;
-        if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
-            return Err(DataFusionError::Plan(
-                "Invalid pattern in LIKE expression".to_string(),
-            ));
-        }
-        Ok(Expr::Like(Like::new(
-            negated,
-            Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
-            Box::new(pattern),
-            escape_char,
-        )))
-    }
-
-    fn sql_ilike_to_expr(
-        &self,
-        negated: bool,
-        expr: SQLExpr,
-        pattern: SQLExpr,
-        escape_char: Option<char>,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
-        let pattern_type = pattern.get_type(schema)?;
-        if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
-            return Err(DataFusionError::Plan(
-                "Invalid pattern in ILIKE expression".to_string(),
-            ));
-        }
-        Ok(Expr::ILike(Like::new(
-            negated,
-            Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
-            Box::new(pattern),
-            escape_char,
-        )))
-    }
-
-    fn sql_similarto_to_expr(
-        &self,
-        negated: bool,
-        expr: SQLExpr,
-        pattern: SQLExpr,
-        escape_char: Option<char>,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
-        let pattern_type = pattern.get_type(schema)?;
-        if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
-            return Err(DataFusionError::Plan(
-                "Invalid pattern in SIMILAR TO expression".to_string(),
-            ));
-        }
-        Ok(Expr::SimilarTo(Like::new(
-            negated,
-            Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
-            Box::new(pattern),
-            escape_char,
-        )))
-    }
-
-    fn sql_trim_to_expr(
-        &self,
-        expr: SQLExpr,
-        trim_where: Option<TrimWhereField>,
-        trim_what: Option<Box<SQLExpr>>,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        let fun = match trim_where {
-            Some(TrimWhereField::Leading) => BuiltinScalarFunction::Ltrim,
-            Some(TrimWhereField::Trailing) => BuiltinScalarFunction::Rtrim,
-            Some(TrimWhereField::Both) => BuiltinScalarFunction::Btrim,
-            None => BuiltinScalarFunction::Trim,
-        };
-        let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
-        let args = match trim_what {
-            Some(to_trim) => {
-                let to_trim =
-                    self.sql_expr_to_logical_expr(*to_trim, schema, planner_context)?;
-                vec![arg, to_trim]
-            }
-            None => vec![arg],
-        };
-        Ok(Expr::ScalarFunction { fun, args })
-    }
-
-    fn sql_agg_with_filter_to_expr(
-        &self,
-        expr: SQLExpr,
-        filter: SQLExpr,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        match self.sql_expr_to_logical_expr(expr, schema, planner_context)? {
-            Expr::AggregateFunction(expr::AggregateFunction {
-                fun,
-                args,
-                distinct,
-                ..
-            }) => Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
-                fun,
-                args,
-                distinct,
-                Some(Box::new(self.sql_expr_to_logical_expr(
-                    filter,
-                    schema,
-                    planner_context,
-                )?)),
-            ))),
-            _ => Err(DataFusionError::Internal(
-                "AggregateExpressionWithFilter expression was not an AggregateFunction"
-                    .to_string(),
-            )),
-        }
-    }
-
-    fn sql_identifier_to_expr(&self, id: Ident) -> Result<Expr> {
-        if id.value.starts_with('@') {
-            // TODO: figure out if ScalarVariables should be insensitive.
-            let var_names = vec![id.value];
-            let ty = self
-                .schema_provider
-                .get_variable_type(&var_names)
-                .ok_or_else(|| {
-                    DataFusionError::Execution(format!(
-                        "variable {var_names:?} has no type information"
-                    ))
-                })?;
-            Ok(Expr::ScalarVariable(ty, var_names))
-        } else {
-            // Don't use `col()` here because it will try to
-            // interpret names with '.' as if they were
-            // compound identifiers, but this is not a compound
-            // identifier. (e.g. it is "foo.bar" not foo.bar)
-
-            Ok(Expr::Column(Column {
-                relation: None,
-                name: normalize_ident(id),
-            }))
-        }
-    }
-
-    fn sql_compound_identifier_to_expr(
-        &self,
-        ids: Vec<Ident>,
-        schema: &DFSchema,
-    ) -> Result<Expr> {
-        if ids[0].value.starts_with('@') {
-            let var_names: Vec<_> = ids.into_iter().map(normalize_ident).collect();
-            let ty = self
-                .schema_provider
-                .get_variable_type(&var_names)
-                .ok_or_else(|| {
-                    DataFusionError::Execution(format!(
-                        "variable {var_names:?} has no type information"
-                    ))
-                })?;
-            Ok(Expr::ScalarVariable(ty, var_names))
-        } else {
-            // only support "schema.table" type identifiers here
-            let (name, relation) = match idents_to_table_reference(ids)? {
-                OwnedTableReference::Partial { schema, table } => (table, schema),
-                r @ OwnedTableReference::Bare { .. }
-                | r @ OwnedTableReference::Full { .. } => {
-                    return Err(DataFusionError::Plan(format!(
-                        "Unsupported compound identifier '{r:?}'",
-                    )));
-                }
-            };
-
-            // Try and find the reference in schema
-            match schema.field_with_qualified_name(&relation, &name) {
-                Ok(_) => {
-                    // found an exact match on a qualified name so this is a table.column identifier
-                    Ok(Expr::Column(Column {
-                        relation: Some(relation),
-                        name,
-                    }))
-                }
-                Err(_) => {
-                    if let Some(field) =
-                        schema.fields().iter().find(|f| f.name().eq(&relation))
-                    {
-                        // Access to a field of a column which is a structure, example: SELECT my_struct.key
-                        Ok(Expr::GetIndexedField(GetIndexedField::new(
-                            Box::new(Expr::Column(field.qualified_column())),
-                            ScalarValue::Utf8(Some(name)),
-                        )))
-                    } else {
-                        // table.column identifier
-                        Ok(Expr::Column(Column {
-                            relation: Some(relation),
-                            name,
-                        }))
-                    }
-                }
-            }
-        }
-    }
-
-    fn sql_case_identifier_to_expr(
-        &self,
-        operand: Option<Box<SQLExpr>>,
-        conditions: Vec<SQLExpr>,
-        results: Vec<SQLExpr>,
-        else_result: Option<Box<SQLExpr>>,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        let expr = if let Some(e) = operand {
-            Some(Box::new(self.sql_expr_to_logical_expr(
-                *e,
-                schema,
-                planner_context,
-            )?))
-        } else {
-            None
-        };
-        let when_expr = conditions
-            .into_iter()
-            .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
-            .collect::<Result<Vec<_>>>()?;
-        let then_expr = results
-            .into_iter()
-            .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
-            .collect::<Result<Vec<_>>>()?;
-        let else_expr = if let Some(e) = else_result {
-            Some(Box::new(self.sql_expr_to_logical_expr(
-                *e,
-                schema,
-                planner_context,
-            )?))
-        } else {
-            None
-        };
-
-        Ok(Expr::Case(Case::new(
-            expr,
-            when_expr
-                .iter()
-                .zip(then_expr.iter())
-                .map(|(w, t)| (Box::new(w.to_owned()), Box::new(t.to_owned())))
-                .collect(),
-            else_expr,
-        )))
-    }
-
-    fn find_window_func(&self, name: &str) -> Result<WindowFunction> {
-        window_function::find_df_window_func(name)
-            .or_else(|| {
-                self.schema_provider
-                    .get_aggregate_meta(name)
-                    .map(WindowFunction::AggregateUDF)
-            })
-            .ok_or_else(|| {
-                DataFusionError::Plan(format!("There is no window function named {name}"))
-            })
-    }
-
-    fn sql_rollup_to_expr(
-        &self,
-        exprs: Vec<Vec<SQLExpr>>,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        let args: Result<Vec<_>> = exprs
-            .into_iter()
-            .map(|v| {
-                if v.len() != 1 {
-                    Err(DataFusionError::Internal(
-                        "Tuple expressions are not supported for Rollup expressions"
-                            .to_string(),
-                    ))
-                } else {
-                    self.sql_expr_to_logical_expr(v[0].clone(), schema, planner_context)
-                }
-            })
-            .collect();
-        Ok(Expr::GroupingSet(GroupingSet::Rollup(args?)))
-    }
-
-    fn sql_cube_to_expr(
-        &self,
-        exprs: Vec<Vec<SQLExpr>>,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        let args: Result<Vec<_>> = exprs
-            .into_iter()
-            .map(|v| {
-                if v.len() != 1 {
-                    Err(DataFusionError::Internal(
-                        "Tuple expressions not are supported for Cube expressions"
-                            .to_string(),
-                    ))
-                } else {
-                    self.sql_expr_to_logical_expr(v[0].clone(), schema, planner_context)
-                }
-            })
-            .collect();
-        Ok(Expr::GroupingSet(GroupingSet::Cube(args?)))
-    }
-
-    fn sql_grouping_sets_to_expr(
-        &self,
-        exprs: Vec<Vec<SQLExpr>>,
-        schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        let args: Result<Vec<Vec<_>>> = exprs
-            .into_iter()
-            .map(|v| {
-                v.into_iter()
-                    .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
-                    .collect()
-            })
-            .collect();
-        Ok(Expr::GroupingSet(GroupingSet::GroupingSets(args?)))
-    }
-
-    fn parse_exists_subquery(
-        &self,
-        subquery: Query,
-        negated: bool,
-        input_schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        Ok(Expr::Exists {
-            subquery: Subquery {
-                subquery: Arc::new(self.subquery_to_plan(
-                    subquery,
-                    planner_context,
-                    input_schema,
-                )?),
-            },
-            negated,
-        })
-    }
-
-    fn parse_in_subquery(
-        &self,
-        expr: SQLExpr,
-        subquery: Query,
-        negated: bool,
-        input_schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        Ok(Expr::InSubquery {
-            expr: Box::new(self.sql_to_rex(expr, input_schema, planner_context)?),
-            subquery: Subquery {
-                subquery: Arc::new(self.subquery_to_plan(
-                    subquery,
-                    planner_context,
-                    input_schema,
-                )?),
-            },
-            negated,
-        })
-    }
-
-    fn parse_scalar_subquery(
-        &self,
-        subquery: Query,
-        input_schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        Ok(Expr::ScalarSubquery(Subquery {
-            subquery: Arc::new(self.subquery_to_plan(
-                subquery,
-                planner_context,
-                input_schema,
-            )?),
-        }))
-    }
-
-    fn parse_array_agg(
-        &self,
-        array_agg: ArrayAgg,
-        input_schema: &DFSchema,
-        planner_context: &mut PlannerContext,
-    ) -> Result<Expr> {
-        // Some dialects have special syntax for array_agg. DataFusion only supports it like a function.
-        let ArrayAgg {
-            distinct,
-            expr,
-            order_by,
-            limit,
-            within_group,
-        } = array_agg;
-
-        if let Some(order_by) = order_by {
-            return Err(DataFusionError::NotImplemented(format!(
-                "ORDER BY not supported in ARRAY_AGG: {order_by}"
-            )));
-        }
-
-        if let Some(limit) = limit {
-            return Err(DataFusionError::NotImplemented(format!(
-                "LIMIT not supported in ARRAY_AGG: {limit}"
-            )));
-        }
-
-        if within_group {
-            return Err(DataFusionError::NotImplemented(
-                "WITHIN GROUP not supported in ARRAY_AGG".to_string(),
-            ));
-        }
-
-        let args =
-            vec![self.sql_expr_to_logical_expr(*expr, input_schema, planner_context)?];
-        // next, aggregate built-ins
-        let fun = AggregateFunction::ArrayAgg;
-
-        Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
-            fun, args, distinct, None,
-        )))
-    }
-
-    fn function_args_to_expr(
-        &self,
-        args: Vec<FunctionArg>,
-        schema: &DFSchema,
-    ) -> Result<Vec<Expr>> {
-        args.into_iter()
-            .map(|a| {
-                self.sql_fn_arg_to_logical_expr(a, schema, &mut PlannerContext::new())
-            })
-            .collect::<Result<Vec<Expr>>>()
-    }
-
-    fn aggregate_fn_to_expr(
-        &self,
-        fun: AggregateFunction,
-        args: Vec<FunctionArg>,
-        schema: &DFSchema,
-    ) -> Result<(AggregateFunction, Vec<Expr>)> {
-        let args = match fun {
-            // Special case rewrite COUNT(*) to COUNT(constant)
-            AggregateFunction::Count => args
-                .into_iter()
-                .map(|a| match a {
-                    FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
-                        Ok(Expr::Literal(COUNT_STAR_EXPANSION.clone()))
-                    }
-                    _ => self.sql_fn_arg_to_logical_expr(
-                        a,
-                        schema,
-                        &mut PlannerContext::new(),
-                    ),
-                })
-                .collect::<Result<Vec<Expr>>>()?,
-            _ => self.function_args_to_expr(args, schema)?,
-        };
-
-        Ok((fun, args))
-    }
-
-    fn sql_interval_to_expr(
-        &self,
-        value: SQLExpr,
-        leading_field: Option<DateTimeField>,
-        leading_precision: Option<u64>,
-        last_field: Option<DateTimeField>,
-        fractional_seconds_precision: Option<u64>,
-    ) -> Result<Expr> {
-        if leading_precision.is_some() {
-            return Err(DataFusionError::NotImplemented(format!(
-                "Unsupported Interval Expression with leading_precision {leading_precision:?}"
-            )));
-        }
-
-        if last_field.is_some() {
-            return Err(DataFusionError::NotImplemented(format!(
-                "Unsupported Interval Expression with last_field {last_field:?}"
-            )));
-        }
-
-        if fractional_seconds_precision.is_some() {
-            return Err(DataFusionError::NotImplemented(format!(
-                "Unsupported Interval Expression with fractional_seconds_precision {fractional_seconds_precision:?}"
-            )));
-        }
-
-        // Only handle string exprs for now
-        let value = match value {
-            SQLExpr::Value(
-                Value::SingleQuotedString(s) | Value::DoubleQuotedString(s),
-            ) => s,
-            _ => {
-                return Err(DataFusionError::NotImplemented(format!(
-                    "Unsupported interval argument. Expected string literal, got: {value:?}"
-                )));
-            }
-        };
-
-        let leading_field = leading_field
-            .as_ref()
-            .map(|dt| dt.to_string())
-            .unwrap_or_else(|| "second".to_string());
-
-        Ok(lit(parse_interval(&leading_field, &value)?))
-    }
-
     fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
         let variable = ObjectName(variable.to_vec()).to_string();
 
@@ -2955,91 +1771,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             .is_ok()
     }
 
-    fn sql_array_literal(
-        &self,
-        elements: Vec<SQLExpr>,
-        schema: &DFSchema,
-    ) -> Result<Expr> {
-        let mut values = Vec::with_capacity(elements.len());
-
-        for element in elements {
-            let value = self.sql_expr_to_logical_expr(
-                element,
-                schema,
-                &mut PlannerContext::new(),
-            )?;
-            match value {
-                Expr::Literal(scalar) => {
-                    values.push(scalar);
-                }
-                _ => {
-                    return Err(DataFusionError::NotImplemented(format!(
-                        "Arrays with elements other than literal are not supported: {value}"
-                    )));
-                }
-            }
-        }
-
-        let data_types: HashSet<DataType> =
-            values.iter().map(|e| e.get_datatype()).collect();
-
-        if data_types.is_empty() {
-            Ok(lit(ScalarValue::new_list(None, DataType::Utf8)))
-        } else if data_types.len() > 1 {
-            Err(DataFusionError::NotImplemented(format!(
-                "Arrays with different types are not supported: {data_types:?}",
-            )))
-        } else {
-            let data_type = values[0].get_datatype();
-
-            Ok(lit(ScalarValue::new_list(Some(values), data_type)))
-        }
-    }
-
-    /// Parse number in sql string, convert to Expr::Literal
-    fn parse_sql_number(&self, n: &str) -> Result<Expr> {
-        if n.find('E').is_some() {
-            // not implemented yet
-            // https://github.com/apache/arrow-datafusion/issues/3448
-            Err(DataFusionError::NotImplemented(
-                "sql numeric literals in scientific notation are not supported"
-                    .to_string(),
-            ))
-        } else if let Ok(n) = n.parse::<i64>() {
-            Ok(lit(n))
-        } else if self.options.parse_float_as_decimal {
-            // remove leading zeroes
-            let str = n.trim_start_matches('0');
-            if let Some(i) = str.find('.') {
-                let p = str.len() - 1;
-                let s = str.len() - i - 1;
-                let str = str.replace('.', "");
-                let n = str.parse::<i128>().map_err(|_| {
-                    DataFusionError::from(ParserError(format!(
-                        "Cannot parse {str} as i128 when building decimal"
-                    )))
-                })?;
-                Ok(Expr::Literal(ScalarValue::Decimal128(
-                    Some(n),
-                    p as u8,
-                    s as i8,
-                )))
-            } else {
-                let number = n.parse::<i128>().map_err(|_| {
-                    DataFusionError::from(ParserError(format!(
-                        "Cannot parse {n} as i128 when building decimal"
-                    )))
-                })?;
-                Ok(Expr::Literal(ScalarValue::Decimal128(Some(number), 38, 0)))
-            }
-        } else {
-            n.parse::<f64>().map(lit).map_err(|_| {
-                DataFusionError::from(ParserError(format!("Cannot parse {n} as f64")))
-            })
-        }
-    }
-
-    fn convert_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
+    pub(crate) fn convert_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
         match sql_type {
             SQLDataType::Array(Some(inner_sql_type)) => {
                 let data_type = self.convert_simple_data_type(inner_sql_type)?;
@@ -3181,7 +1913,9 @@ pub fn object_name_to_table_reference(
 }
 
 /// Create a [`OwnedTableReference`] after normalizing the specified identifier
-fn idents_to_table_reference(idents: Vec<Ident>) -> Result<OwnedTableReference> {
+pub(crate) fn idents_to_table_reference(
+    idents: Vec<Ident>,
+) -> Result<OwnedTableReference> {
     struct IdentTaker(Vec<Ident>);
     /// take the next identifier from the back of idents, panic'ing if
     /// there are none left