You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/01/03 06:53:32 UTC
[arrow-datafusion] branch master updated: refactor: split expression pf planner into one part. (#4783)
This is an automated email from the ASF dual-hosted git repository.
liukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 6c81c101e refactor: split expression pf planner into one part. (#4783)
6c81c101e is described below
commit 6c81c101ea0189a702220d9ba7f89dd64432e35d
Author: jakevin <ja...@gmail.com>
AuthorDate: Tue Jan 3 14:53:27 2023 +0800
refactor: split expression pf planner into one part. (#4783)
---
datafusion/sql/src/expr/binary_op.rs | 67 ++
datafusion/sql/src/expr/function.rs | 225 ++++++
datafusion/sql/src/expr/grouping_set.rs | 84 ++
datafusion/sql/src/expr/identifier.rs | 159 ++++
datafusion/sql/src/expr/mod.rs | 492 ++++++++++++
datafusion/sql/src/expr/order_by.rs | 71 ++
datafusion/sql/src/expr/subquery.rs | 80 ++
datafusion/sql/src/expr/substring.rs | 75 ++
datafusion/sql/src/expr/unary_op.rs | 60 ++
datafusion/sql/src/expr/value.rs | 213 +++++
datafusion/sql/src/lib.rs | 1 +
datafusion/sql/src/planner.rs | 1306 +------------------------------
12 files changed, 1547 insertions(+), 1286 deletions(-)
diff --git a/datafusion/sql/src/expr/binary_op.rs b/datafusion/sql/src/expr/binary_op.rs
new file mode 100644
index 000000000..af545c737
--- /dev/null
+++ b/datafusion/sql/src/expr/binary_op.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use datafusion_common::{DFSchema, DataFusionError, Result};
+use datafusion_expr::{BinaryExpr, Expr, Operator};
+use sqlparser::ast::{BinaryOperator, Expr as SQLExpr};
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+ pub(crate) fn parse_sql_binary_op(
+ &self,
+ left: SQLExpr,
+ op: BinaryOperator,
+ right: SQLExpr,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ let operator = match op {
+ BinaryOperator::Gt => Ok(Operator::Gt),
+ BinaryOperator::GtEq => Ok(Operator::GtEq),
+ BinaryOperator::Lt => Ok(Operator::Lt),
+ BinaryOperator::LtEq => Ok(Operator::LtEq),
+ BinaryOperator::Eq => Ok(Operator::Eq),
+ BinaryOperator::NotEq => Ok(Operator::NotEq),
+ BinaryOperator::Plus => Ok(Operator::Plus),
+ BinaryOperator::Minus => Ok(Operator::Minus),
+ BinaryOperator::Multiply => Ok(Operator::Multiply),
+ BinaryOperator::Divide => Ok(Operator::Divide),
+ BinaryOperator::Modulo => Ok(Operator::Modulo),
+ BinaryOperator::And => Ok(Operator::And),
+ BinaryOperator::Or => Ok(Operator::Or),
+ BinaryOperator::PGRegexMatch => Ok(Operator::RegexMatch),
+ BinaryOperator::PGRegexIMatch => Ok(Operator::RegexIMatch),
+ BinaryOperator::PGRegexNotMatch => Ok(Operator::RegexNotMatch),
+ BinaryOperator::PGRegexNotIMatch => Ok(Operator::RegexNotIMatch),
+ BinaryOperator::BitwiseAnd => Ok(Operator::BitwiseAnd),
+ BinaryOperator::BitwiseOr => Ok(Operator::BitwiseOr),
+ BinaryOperator::BitwiseXor => Ok(Operator::BitwiseXor),
+ BinaryOperator::PGBitwiseShiftRight => Ok(Operator::BitwiseShiftRight),
+ BinaryOperator::PGBitwiseShiftLeft => Ok(Operator::BitwiseShiftLeft),
+ BinaryOperator::StringConcat => Ok(Operator::StringConcat),
+ _ => Err(DataFusionError::NotImplemented(format!(
+ "Unsupported SQL binary operator {op:?}"
+ ))),
+ }?;
+
+ Ok(Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(self.sql_expr_to_logical_expr(left, schema, planner_context)?),
+ operator,
+ Box::new(self.sql_expr_to_logical_expr(right, schema, planner_context)?),
+ )))
+ }
+}
diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs
new file mode 100644
index 000000000..1845d5947
--- /dev/null
+++ b/datafusion/sql/src/expr/function.rs
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use crate::utils::normalize_ident;
+use datafusion_common::{DFSchema, DataFusionError, Result};
+use datafusion_expr::utils::COUNT_STAR_EXPANSION;
+use datafusion_expr::{
+ expr, window_function, AggregateFunction, BuiltinScalarFunction, Expr, WindowFrame,
+ WindowFrameUnits, WindowFunction,
+};
+use sqlparser::ast::{
+ Expr as SQLExpr, Function as SQLFunction, FunctionArg, FunctionArgExpr,
+};
+use std::str::FromStr;
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+ pub(super) fn sql_function_to_expr(
+ &self,
+ mut function: SQLFunction,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ let name = if function.name.0.len() > 1 {
+ // DF doesn't handle compound identifiers
+ // (e.g. "foo.bar") for function names yet
+ function.name.to_string()
+ } else {
+ normalize_ident(function.name.0[0].clone())
+ };
+
+ // next, scalar built-in
+ if let Ok(fun) = BuiltinScalarFunction::from_str(&name) {
+ let args = self.function_args_to_expr(function.args, schema)?;
+ return Ok(Expr::ScalarFunction { fun, args });
+ };
+
+ // then, window function
+ if let Some(window) = function.over.take() {
+ let partition_by = window
+ .partition_by
+ .into_iter()
+ .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
+ .collect::<Result<Vec<_>>>()?;
+ let order_by = window
+ .order_by
+ .into_iter()
+ .map(|e| self.order_by_to_sort_expr(e, schema))
+ .collect::<Result<Vec<_>>>()?;
+ let window_frame = window
+ .window_frame
+ .as_ref()
+ .map(|window_frame| {
+ let window_frame: WindowFrame = window_frame.clone().try_into()?;
+ if WindowFrameUnits::Range == window_frame.units
+ && order_by.len() != 1
+ {
+ Err(DataFusionError::Plan(format!(
+ "With window frame of type RANGE, the order by expression must be of length 1, got {}", order_by.len())))
+ } else {
+ Ok(window_frame)
+ }
+ })
+ .transpose()?;
+ let window_frame = if let Some(window_frame) = window_frame {
+ window_frame
+ } else {
+ WindowFrame::new(!order_by.is_empty())
+ };
+ let fun = self.find_window_func(&name)?;
+ let expr = match fun {
+ WindowFunction::AggregateFunction(aggregate_fun) => {
+ let (aggregate_fun, args) =
+ self.aggregate_fn_to_expr(aggregate_fun, function.args, schema)?;
+
+ Expr::WindowFunction(expr::WindowFunction::new(
+ WindowFunction::AggregateFunction(aggregate_fun),
+ args,
+ partition_by,
+ order_by,
+ window_frame,
+ ))
+ }
+ _ => Expr::WindowFunction(expr::WindowFunction::new(
+ fun,
+ self.function_args_to_expr(function.args, schema)?,
+ partition_by,
+ order_by,
+ window_frame,
+ )),
+ };
+ return Ok(expr);
+ }
+
+ // next, aggregate built-ins
+ if let Ok(fun) = AggregateFunction::from_str(&name) {
+ let distinct = function.distinct;
+ let (fun, args) = self.aggregate_fn_to_expr(fun, function.args, schema)?;
+ return Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
+ fun, args, distinct, None,
+ )));
+ };
+
+ // finally, user-defined functions (UDF) and UDAF
+ match self.schema_provider.get_function_meta(&name) {
+ Some(fm) => {
+ let args = self.function_args_to_expr(function.args, schema)?;
+
+ Ok(Expr::ScalarUDF { fun: fm, args })
+ }
+ None => match self.schema_provider.get_aggregate_meta(&name) {
+ Some(fm) => {
+ let args = self.function_args_to_expr(function.args, schema)?;
+ Ok(Expr::AggregateUDF {
+ fun: fm,
+ args,
+ filter: None,
+ })
+ }
+ _ => Err(DataFusionError::Plan(format!("Invalid function '{name}'"))),
+ },
+ }
+ }
+
+ pub(super) fn sql_named_function_to_expr(
+ &self,
+ expr: SQLExpr,
+ fun: BuiltinScalarFunction,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ let args = vec![self.sql_expr_to_logical_expr(expr, schema, planner_context)?];
+ Ok(Expr::ScalarFunction { fun, args })
+ }
+
+ pub(super) fn find_window_func(&self, name: &str) -> Result<WindowFunction> {
+ window_function::find_df_window_func(name)
+ .or_else(|| {
+ self.schema_provider
+ .get_aggregate_meta(name)
+ .map(WindowFunction::AggregateUDF)
+ })
+ .ok_or_else(|| {
+ DataFusionError::Plan(format!("There is no window function named {name}"))
+ })
+ }
+
+ fn sql_fn_arg_to_logical_expr(
+ &self,
+ sql: FunctionArg,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ match sql {
+ FunctionArg::Named {
+ name: _,
+ arg: FunctionArgExpr::Expr(arg),
+ } => self.sql_expr_to_logical_expr(arg, schema, planner_context),
+ FunctionArg::Named {
+ name: _,
+ arg: FunctionArgExpr::Wildcard,
+ } => Ok(Expr::Wildcard),
+ FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
+ self.sql_expr_to_logical_expr(arg, schema, planner_context)
+ }
+ FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => Ok(Expr::Wildcard),
+ _ => Err(DataFusionError::NotImplemented(format!(
+ "Unsupported qualified wildcard argument: {sql:?}"
+ ))),
+ }
+ }
+
+ pub(super) fn function_args_to_expr(
+ &self,
+ args: Vec<FunctionArg>,
+ schema: &DFSchema,
+ ) -> Result<Vec<Expr>> {
+ args.into_iter()
+ .map(|a| {
+ self.sql_fn_arg_to_logical_expr(a, schema, &mut PlannerContext::new())
+ })
+ .collect::<Result<Vec<Expr>>>()
+ }
+
+ pub(super) fn aggregate_fn_to_expr(
+ &self,
+ fun: AggregateFunction,
+ args: Vec<FunctionArg>,
+ schema: &DFSchema,
+ ) -> Result<(AggregateFunction, Vec<Expr>)> {
+ let args = match fun {
+ // Special case rewrite COUNT(*) to COUNT(constant)
+ AggregateFunction::Count => args
+ .into_iter()
+ .map(|a| match a {
+ FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
+ Ok(Expr::Literal(COUNT_STAR_EXPANSION.clone()))
+ }
+ _ => self.sql_fn_arg_to_logical_expr(
+ a,
+ schema,
+ &mut PlannerContext::new(),
+ ),
+ })
+ .collect::<Result<Vec<Expr>>>()?,
+ _ => self.function_args_to_expr(args, schema)?,
+ };
+
+ Ok((fun, args))
+ }
+}
diff --git a/datafusion/sql/src/expr/grouping_set.rs b/datafusion/sql/src/expr/grouping_set.rs
new file mode 100644
index 000000000..c5a0b6da7
--- /dev/null
+++ b/datafusion/sql/src/expr/grouping_set.rs
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use datafusion_common::{DFSchema, DataFusionError, Result};
+use datafusion_expr::{Expr, GroupingSet};
+use sqlparser::ast::Expr as SQLExpr;
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+ pub(super) fn sql_grouping_sets_to_expr(
+ &self,
+ exprs: Vec<Vec<SQLExpr>>,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ let args: Result<Vec<Vec<_>>> = exprs
+ .into_iter()
+ .map(|v| {
+ v.into_iter()
+ .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
+ .collect()
+ })
+ .collect();
+ Ok(Expr::GroupingSet(GroupingSet::GroupingSets(args?)))
+ }
+
+ pub(super) fn sql_rollup_to_expr(
+ &self,
+ exprs: Vec<Vec<SQLExpr>>,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ let args: Result<Vec<_>> = exprs
+ .into_iter()
+ .map(|v| {
+ if v.len() != 1 {
+ Err(DataFusionError::Internal(
+ "Tuple expressions are not supported for Rollup expressions"
+ .to_string(),
+ ))
+ } else {
+ self.sql_expr_to_logical_expr(v[0].clone(), schema, planner_context)
+ }
+ })
+ .collect();
+ Ok(Expr::GroupingSet(GroupingSet::Rollup(args?)))
+ }
+
+ pub(super) fn sql_cube_to_expr(
+ &self,
+ exprs: Vec<Vec<SQLExpr>>,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ let args: Result<Vec<_>> = exprs
+ .into_iter()
+ .map(|v| {
+ if v.len() != 1 {
+ Err(DataFusionError::Internal(
+ "Tuple expressions not are supported for Cube expressions"
+ .to_string(),
+ ))
+ } else {
+ self.sql_expr_to_logical_expr(v[0].clone(), schema, planner_context)
+ }
+ })
+ .collect();
+ Ok(Expr::GroupingSet(GroupingSet::Cube(args?)))
+ }
+}
diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs
new file mode 100644
index 000000000..5a5c4004e
--- /dev/null
+++ b/datafusion/sql/src/expr/identifier.rs
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{
+ idents_to_table_reference, ContextProvider, PlannerContext, SqlToRel,
+};
+use crate::utils::normalize_ident;
+use datafusion_common::{
+ Column, DFSchema, DataFusionError, OwnedTableReference, Result, ScalarValue,
+};
+use datafusion_expr::{Case, Expr, GetIndexedField};
+use sqlparser::ast::{Expr as SQLExpr, Ident};
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+ pub(super) fn sql_identifier_to_expr(&self, id: Ident) -> Result<Expr> {
+ if id.value.starts_with('@') {
+ // TODO: figure out if ScalarVariables should be insensitive.
+ let var_names = vec![id.value];
+ let ty = self
+ .schema_provider
+ .get_variable_type(&var_names)
+ .ok_or_else(|| {
+ DataFusionError::Execution(format!(
+ "variable {var_names:?} has no type information"
+ ))
+ })?;
+ Ok(Expr::ScalarVariable(ty, var_names))
+ } else {
+ // Don't use `col()` here because it will try to
+ // interpret names with '.' as if they were
+ // compound identifiers, but this is not a compound
+ // identifier. (e.g. it is "foo.bar" not foo.bar)
+
+ Ok(Expr::Column(Column {
+ relation: None,
+ name: normalize_ident(id),
+ }))
+ }
+ }
+
+ pub(super) fn sql_compound_identifier_to_expr(
+ &self,
+ ids: Vec<Ident>,
+ schema: &DFSchema,
+ ) -> Result<Expr> {
+ if ids[0].value.starts_with('@') {
+ let var_names: Vec<_> = ids.into_iter().map(normalize_ident).collect();
+ let ty = self
+ .schema_provider
+ .get_variable_type(&var_names)
+ .ok_or_else(|| {
+ DataFusionError::Execution(format!(
+ "variable {var_names:?} has no type information"
+ ))
+ })?;
+ Ok(Expr::ScalarVariable(ty, var_names))
+ } else {
+ // only support "schema.table" type identifiers here
+ let (name, relation) = match idents_to_table_reference(ids)? {
+ OwnedTableReference::Partial { schema, table } => (table, schema),
+ r @ OwnedTableReference::Bare { .. }
+ | r @ OwnedTableReference::Full { .. } => {
+ return Err(DataFusionError::Plan(format!(
+ "Unsupported compound identifier '{r:?}'",
+ )));
+ }
+ };
+
+ // Try and find the reference in schema
+ match schema.field_with_qualified_name(&relation, &name) {
+ Ok(_) => {
+ // found an exact match on a qualified name so this is a table.column identifier
+ Ok(Expr::Column(Column {
+ relation: Some(relation),
+ name,
+ }))
+ }
+ Err(_) => {
+ if let Some(field) =
+ schema.fields().iter().find(|f| f.name().eq(&relation))
+ {
+ // Access to a field of a column which is a structure, example: SELECT my_struct.key
+ Ok(Expr::GetIndexedField(GetIndexedField::new(
+ Box::new(Expr::Column(field.qualified_column())),
+ ScalarValue::Utf8(Some(name)),
+ )))
+ } else {
+ // table.column identifier
+ Ok(Expr::Column(Column {
+ relation: Some(relation),
+ name,
+ }))
+ }
+ }
+ }
+ }
+ }
+
+ pub(super) fn sql_case_identifier_to_expr(
+ &self,
+ operand: Option<Box<SQLExpr>>,
+ conditions: Vec<SQLExpr>,
+ results: Vec<SQLExpr>,
+ else_result: Option<Box<SQLExpr>>,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ let expr = if let Some(e) = operand {
+ Some(Box::new(self.sql_expr_to_logical_expr(
+ *e,
+ schema,
+ planner_context,
+ )?))
+ } else {
+ None
+ };
+ let when_expr = conditions
+ .into_iter()
+ .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
+ .collect::<Result<Vec<_>>>()?;
+ let then_expr = results
+ .into_iter()
+ .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
+ .collect::<Result<Vec<_>>>()?;
+ let else_expr = if let Some(e) = else_result {
+ Some(Box::new(self.sql_expr_to_logical_expr(
+ *e,
+ schema,
+ planner_context,
+ )?))
+ } else {
+ None
+ };
+
+ Ok(Expr::Case(Case::new(
+ expr,
+ when_expr
+ .iter()
+ .zip(then_expr.iter())
+ .map(|(w, t)| (Box::new(w.to_owned()), Box::new(t.to_owned())))
+ .collect(),
+ else_expr,
+ )))
+ }
+}
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
new file mode 100644
index 000000000..4accb451f
--- /dev/null
+++ b/datafusion/sql/src/expr/mod.rs
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod binary_op;
+mod function;
+mod grouping_set;
+mod identifier;
+mod order_by;
+mod subquery;
+mod substring;
+mod unary_op;
+mod value;
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use crate::utils::normalize_ident;
+use arrow_schema::DataType;
+use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue};
+use datafusion_expr::{
+ col, expr, lit, AggregateFunction, Between, BinaryExpr, BuiltinScalarFunction, Cast,
+ Expr, ExprSchemable, GetIndexedField, Like, Operator, TryCast,
+};
+use sqlparser::ast::{ArrayAgg, Expr as SQLExpr, TrimWhereField, Value};
+use sqlparser::parser::ParserError::ParserError;
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+ pub(crate) fn sql_expr_to_logical_expr(
+ &self,
+ sql: SQLExpr,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ // Workaround for https://github.com/apache/arrow-datafusion/issues/4065
+ //
+ // Minimize stack space required in debug builds to plan
+ // deeply nested binary operators by keeping the stack space
+ // needed for sql_expr_to_logical_expr minimal for BinaryOp
+ //
+ // The reason this reduces stack size in debug builds is
+ // explained in the "Technical Backstory" heading of
+ // https://github.com/apache/arrow-datafusion/pull/1047
+ //
+ // A likely better way to support deeply nested expressions
+ // would be to avoid recursion all together and use an
+ // iterative algorithm.
+ match sql {
+ SQLExpr::BinaryOp { left, op, right } => {
+ self.parse_sql_binary_op(*left, op, *right, schema, planner_context)
+ }
+ // since this function requires more space per frame
+ // avoid calling it for binary ops
+ _ => self.sql_expr_to_logical_expr_internal(sql, schema, planner_context),
+ }
+ }
+
+ /// Internal implementation. Use
+ /// [`Self::sql_expr_to_logical_expr`] to plan exprs.
+ fn sql_expr_to_logical_expr_internal(
+ &self,
+ sql: SQLExpr,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ match sql {
+ SQLExpr::Value(value) => {
+ self.parse_value(value, &planner_context.prepare_param_data_types)
+ }
+ SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction {
+ fun: BuiltinScalarFunction::DatePart,
+ args: vec![
+ Expr::Literal(ScalarValue::Utf8(Some(format!("{field}")))),
+ self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
+ ],
+ }),
+
+ SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema),
+ SQLExpr::Interval {
+ value,
+ leading_field,
+ leading_precision,
+ last_field,
+ fractional_seconds_precision,
+ } => self.sql_interval_to_expr(
+ *value,
+ leading_field,
+ leading_precision,
+ last_field,
+ fractional_seconds_precision,
+ ),
+ SQLExpr::Identifier(id) => self.sql_identifier_to_expr(id),
+
+ SQLExpr::MapAccess { column, keys } => {
+ if let SQLExpr::Identifier(id) = *column {
+ plan_indexed(col(normalize_ident(id)), keys)
+ } else {
+ Err(DataFusionError::NotImplemented(format!(
+ "map access requires an identifier, found column {column} instead"
+ )))
+ }
+ }
+
+ SQLExpr::ArrayIndex { obj, indexes } => {
+ let expr = self.sql_expr_to_logical_expr(*obj, schema, planner_context)?;
+ plan_indexed(expr, indexes)
+ }
+
+ SQLExpr::CompoundIdentifier(ids) => self.sql_compound_identifier_to_expr(ids, schema),
+
+ SQLExpr::Case {
+ operand,
+ conditions,
+ results,
+ else_result,
+ } => self.sql_case_identifier_to_expr(operand, conditions, results, else_result, schema, planner_context),
+
+ SQLExpr::Cast {
+ expr,
+ data_type,
+ } => Ok(Expr::Cast(Cast::new(
+ Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?),
+ self.convert_data_type(&data_type)?,
+ ))),
+
+ SQLExpr::TryCast {
+ expr,
+ data_type,
+ } => Ok(Expr::TryCast(TryCast::new(
+ Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?),
+ self.convert_data_type(&data_type)?,
+ ))),
+
+ SQLExpr::TypedString {
+ data_type,
+ value,
+ } => Ok(Expr::Cast(Cast::new(
+ Box::new(lit(value)),
+ self.convert_data_type(&data_type)?,
+ ))),
+
+ SQLExpr::IsNull(expr) => Ok(Expr::IsNull(Box::new(
+ self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
+ ))),
+
+ SQLExpr::IsNotNull(expr) => Ok(Expr::IsNotNull(Box::new(
+ self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
+ ))),
+
+ SQLExpr::IsDistinctFrom(left, right) => Ok(Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(self.sql_expr_to_logical_expr(*left, schema, planner_context)?),
+ Operator::IsDistinctFrom,
+ Box::new(self.sql_expr_to_logical_expr(*right, schema, planner_context)?),
+ ))),
+
+ SQLExpr::IsNotDistinctFrom(left, right) => Ok(Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(self.sql_expr_to_logical_expr(*left, schema, planner_context)?),
+ Operator::IsNotDistinctFrom,
+ Box::new(self.sql_expr_to_logical_expr(*right, schema, planner_context)?),
+ ))),
+
+ SQLExpr::IsTrue(expr) => Ok(Expr::IsTrue(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
+
+ SQLExpr::IsFalse(expr) => Ok(Expr::IsFalse(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
+
+ SQLExpr::IsNotTrue(expr) => Ok(Expr::IsNotTrue(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
+
+ SQLExpr::IsNotFalse(expr) => Ok(Expr::IsNotFalse(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
+
+ SQLExpr::IsUnknown(expr) => Ok(Expr::IsUnknown(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
+
+ SQLExpr::IsNotUnknown(expr) => Ok(Expr::IsNotUnknown(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
+
+ SQLExpr::UnaryOp { op, expr } => self.parse_sql_unary_op(op, *expr, schema, planner_context),
+
+ SQLExpr::Between {
+ expr,
+ negated,
+ low,
+ high,
+ } => Ok(Expr::Between(Between::new(
+ Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?),
+ negated,
+ Box::new(self.sql_expr_to_logical_expr(*low, schema, planner_context)?),
+ Box::new(self.sql_expr_to_logical_expr(*high, schema, planner_context)?),
+ ))),
+
+ SQLExpr::InList {
+ expr,
+ list,
+ negated,
+ } => self.sql_in_list_to_expr(*expr, list, negated, schema, planner_context),
+
+ SQLExpr::Like { negated, expr, pattern, escape_char } => self.sql_like_to_expr(negated, *expr, *pattern, escape_char, schema, planner_context),
+
+ SQLExpr::ILike { negated, expr, pattern, escape_char } => self.sql_ilike_to_expr(negated, *expr, *pattern, escape_char, schema, planner_context),
+
+ SQLExpr::SimilarTo { negated, expr, pattern, escape_char } => self.sql_similarto_to_expr(negated, *expr, *pattern, escape_char, schema, planner_context),
+
+ SQLExpr::BinaryOp {
+ ..
+ } => {
+ Err(DataFusionError::Internal(
+ "binary_op should be handled by sql_expr_to_logical_expr.".to_string()
+ ))
+ }
+
+ #[cfg(feature = "unicode_expressions")]
+ SQLExpr::Substring {
+ expr,
+ substring_from,
+ substring_for,
+ } => self.sql_substring_to_expr(expr, substring_from, substring_for, schema, planner_context),
+
+ #[cfg(not(feature = "unicode_expressions"))]
+ SQLExpr::Substring {
+ ..
+ } => {
+ Err(DataFusionError::Internal(
+ "statement substring requires compilation with feature flag: unicode_expressions.".to_string()
+ ))
+ }
+
+ SQLExpr::Trim { expr, trim_where, trim_what } => self.sql_trim_to_expr(*expr, trim_where, trim_what, schema, planner_context),
+
+ SQLExpr::AggregateExpressionWithFilter { expr, filter } => self.sql_agg_with_filter_to_expr(*expr, *filter, schema, planner_context),
+
+ SQLExpr::Function(function) => self.sql_function_to_expr(function, schema, planner_context),
+
+ SQLExpr::Rollup(exprs) => self.sql_rollup_to_expr(exprs, schema, planner_context),
+ SQLExpr::Cube(exprs) => self.sql_cube_to_expr(exprs,schema, planner_context),
+ SQLExpr::GroupingSets(exprs) => self.sql_grouping_sets_to_expr(exprs, schema, planner_context),
+
+ SQLExpr::Floor { expr, field: _field } => self.sql_named_function_to_expr(*expr, BuiltinScalarFunction::Floor, schema, planner_context),
+ SQLExpr::Ceil { expr, field: _field } => self.sql_named_function_to_expr(*expr, BuiltinScalarFunction::Ceil, schema, planner_context),
+
+ SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(*e, schema, planner_context),
+
+ SQLExpr::Exists { subquery, negated } => self.parse_exists_subquery(*subquery, negated, schema, planner_context),
+ SQLExpr::InSubquery { expr, subquery, negated } => self.parse_in_subquery(*expr, *subquery, negated, schema, planner_context),
+ SQLExpr::Subquery(subquery) => self.parse_scalar_subquery(*subquery, schema, planner_context),
+
+ SQLExpr::ArrayAgg(array_agg) => self.parse_array_agg(array_agg, schema, planner_context),
+
+ _ => Err(DataFusionError::NotImplemented(format!(
+ "Unsupported ast node in sqltorel: {sql:?}"
+ ))),
+ }
+ }
+
+ fn parse_array_agg(
+ &self,
+ array_agg: ArrayAgg,
+ input_schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ // Some dialects have special syntax for array_agg. DataFusion only supports it like a function.
+ let ArrayAgg {
+ distinct,
+ expr,
+ order_by,
+ limit,
+ within_group,
+ } = array_agg;
+
+ if let Some(order_by) = order_by {
+ return Err(DataFusionError::NotImplemented(format!(
+ "ORDER BY not supported in ARRAY_AGG: {order_by}"
+ )));
+ }
+
+ if let Some(limit) = limit {
+ return Err(DataFusionError::NotImplemented(format!(
+ "LIMIT not supported in ARRAY_AGG: {limit}"
+ )));
+ }
+
+ if within_group {
+ return Err(DataFusionError::NotImplemented(
+ "WITHIN GROUP not supported in ARRAY_AGG".to_string(),
+ ));
+ }
+
+ let args =
+ vec![self.sql_expr_to_logical_expr(*expr, input_schema, planner_context)?];
+ // next, aggregate built-ins
+ let fun = AggregateFunction::ArrayAgg;
+
+ Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
+ fun, args, distinct, None,
+ )))
+ }
+
+ fn sql_in_list_to_expr(
+ &self,
+ expr: SQLExpr,
+ list: Vec<SQLExpr>,
+ negated: bool,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ let list_expr = list
+ .into_iter()
+ .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(Expr::InList {
+ expr: Box::new(self.sql_expr_to_logical_expr(
+ expr,
+ schema,
+ planner_context,
+ )?),
+ list: list_expr,
+ negated,
+ })
+ }
+
+ fn sql_like_to_expr(
+ &self,
+ negated: bool,
+ expr: SQLExpr,
+ pattern: SQLExpr,
+ escape_char: Option<char>,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
+ let pattern_type = pattern.get_type(schema)?;
+ if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
+ return Err(DataFusionError::Plan(
+ "Invalid pattern in LIKE expression".to_string(),
+ ));
+ }
+ Ok(Expr::Like(Like::new(
+ negated,
+ Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
+ Box::new(pattern),
+ escape_char,
+ )))
+ }
+
+ fn sql_ilike_to_expr(
+ &self,
+ negated: bool,
+ expr: SQLExpr,
+ pattern: SQLExpr,
+ escape_char: Option<char>,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
+ let pattern_type = pattern.get_type(schema)?;
+ if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
+ return Err(DataFusionError::Plan(
+ "Invalid pattern in ILIKE expression".to_string(),
+ ));
+ }
+ Ok(Expr::ILike(Like::new(
+ negated,
+ Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
+ Box::new(pattern),
+ escape_char,
+ )))
+ }
+
+ fn sql_similarto_to_expr(
+ &self,
+ negated: bool,
+ expr: SQLExpr,
+ pattern: SQLExpr,
+ escape_char: Option<char>,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
+ let pattern_type = pattern.get_type(schema)?;
+ if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
+ return Err(DataFusionError::Plan(
+ "Invalid pattern in SIMILAR TO expression".to_string(),
+ ));
+ }
+ Ok(Expr::SimilarTo(Like::new(
+ negated,
+ Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
+ Box::new(pattern),
+ escape_char,
+ )))
+ }
+
+ fn sql_trim_to_expr(
+ &self,
+ expr: SQLExpr,
+ trim_where: Option<TrimWhereField>,
+ trim_what: Option<Box<SQLExpr>>,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ let fun = match trim_where {
+ Some(TrimWhereField::Leading) => BuiltinScalarFunction::Ltrim,
+ Some(TrimWhereField::Trailing) => BuiltinScalarFunction::Rtrim,
+ Some(TrimWhereField::Both) => BuiltinScalarFunction::Btrim,
+ None => BuiltinScalarFunction::Trim,
+ };
+ let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
+ let args = match trim_what {
+ Some(to_trim) => {
+ let to_trim =
+ self.sql_expr_to_logical_expr(*to_trim, schema, planner_context)?;
+ vec![arg, to_trim]
+ }
+ None => vec![arg],
+ };
+ Ok(Expr::ScalarFunction { fun, args })
+ }
+
+ fn sql_agg_with_filter_to_expr(
+ &self,
+ expr: SQLExpr,
+ filter: SQLExpr,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ match self.sql_expr_to_logical_expr(expr, schema, planner_context)? {
+ Expr::AggregateFunction(expr::AggregateFunction {
+ fun,
+ args,
+ distinct,
+ ..
+ }) => Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
+ fun,
+ args,
+ distinct,
+ Some(Box::new(self.sql_expr_to_logical_expr(
+ filter,
+ schema,
+ planner_context,
+ )?)),
+ ))),
+ _ => Err(DataFusionError::Internal(
+ "AggregateExpressionWithFilter expression was not an AggregateFunction"
+ .to_string(),
+ )),
+ }
+ }
+}
+
+fn plan_key(key: SQLExpr) -> Result<ScalarValue> {
+ let scalar = match key {
+ SQLExpr::Value(Value::Number(s, _)) => ScalarValue::Int64(Some(
+ s.parse()
+ .map_err(|_| ParserError(format!("Cannot parse {s} as i64.")))?,
+ )),
+ SQLExpr::Value(Value::SingleQuotedString(s) | Value::DoubleQuotedString(s)) => {
+ ScalarValue::Utf8(Some(s))
+ }
+ _ => {
+ return Err(DataFusionError::SQL(ParserError(format!(
+ "Unsuported index key expression: {key:?}"
+ ))));
+ }
+ };
+
+ Ok(scalar)
+}
+
+fn plan_indexed(expr: Expr, mut keys: Vec<SQLExpr>) -> Result<Expr> {
+ let key = keys.pop().ok_or_else(|| {
+ ParserError("Internal error: Missing index key expression".to_string())
+ })?;
+
+ let expr = if !keys.is_empty() {
+ plan_indexed(expr, keys)?
+ } else {
+ expr
+ };
+
+ Ok(Expr::GetIndexedField(GetIndexedField::new(
+ Box::new(expr),
+ plan_key(key)?,
+ )))
+}
diff --git a/datafusion/sql/src/expr/order_by.rs b/datafusion/sql/src/expr/order_by.rs
new file mode 100644
index 000000000..f3a4f2b04
--- /dev/null
+++ b/datafusion/sql/src/expr/order_by.rs
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use datafusion_common::{DFSchema, DataFusionError, Result};
+use datafusion_expr::expr::Sort;
+use datafusion_expr::Expr;
+use sqlparser::ast::{Expr as SQLExpr, OrderByExpr, Value};
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+ /// convert sql OrderByExpr to Expr::Sort
+ pub(crate) fn order_by_to_sort_expr(
+ &self,
+ e: OrderByExpr,
+ schema: &DFSchema,
+ ) -> Result<Expr> {
+ let OrderByExpr {
+ asc,
+ expr,
+ nulls_first,
+ } = e;
+
+ let expr = match expr {
+ SQLExpr::Value(Value::Number(v, _)) => {
+ let field_index = v
+ .parse::<usize>()
+ .map_err(|err| DataFusionError::Plan(err.to_string()))?;
+
+ if field_index == 0 {
+ return Err(DataFusionError::Plan(
+ "Order by index starts at 1 for column indexes".to_string(),
+ ));
+ } else if schema.fields().len() < field_index {
+ return Err(DataFusionError::Plan(format!(
+ "Order by column out of bounds, specified: {}, max: {}",
+ field_index,
+ schema.fields().len()
+ )));
+ }
+
+ let field = schema.field(field_index - 1);
+ Expr::Column(field.qualified_column())
+ }
+ e => self.sql_expr_to_logical_expr(e, schema, &mut PlannerContext::new())?,
+ };
+ Ok({
+ let asc = asc.unwrap_or(true);
+ Expr::Sort(Sort::new(
+ Box::new(expr),
+ asc,
+ // when asc is true, by default nulls last to be consistent with postgres
+ // postgres rule: https://www.postgresql.org/docs/current/queries-order.html
+ nulls_first.unwrap_or(!asc),
+ ))
+ })
+ }
+}
diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs
new file mode 100644
index 000000000..2d2213c86
--- /dev/null
+++ b/datafusion/sql/src/expr/subquery.rs
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use datafusion_common::{DFSchema, Result};
+use datafusion_expr::{Expr, Subquery};
+use sqlparser::ast::Expr as SQLExpr;
+use sqlparser::ast::Query;
+use std::sync::Arc;
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+ pub(super) fn parse_exists_subquery(
+ &self,
+ subquery: Query,
+ negated: bool,
+ input_schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ Ok(Expr::Exists {
+ subquery: Subquery {
+ subquery: Arc::new(self.subquery_to_plan(
+ subquery,
+ planner_context,
+ input_schema,
+ )?),
+ },
+ negated,
+ })
+ }
+
+ pub(super) fn parse_in_subquery(
+ &self,
+ expr: SQLExpr,
+ subquery: Query,
+ negated: bool,
+ input_schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ Ok(Expr::InSubquery {
+ expr: Box::new(self.sql_to_rex(expr, input_schema, planner_context)?),
+ subquery: Subquery {
+ subquery: Arc::new(self.subquery_to_plan(
+ subquery,
+ planner_context,
+ input_schema,
+ )?),
+ },
+ negated,
+ })
+ }
+
+ pub(super) fn parse_scalar_subquery(
+ &self,
+ subquery: Query,
+ input_schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ Ok(Expr::ScalarSubquery(Subquery {
+ subquery: Arc::new(self.subquery_to_plan(
+ subquery,
+ planner_context,
+ input_schema,
+ )?),
+ }))
+ }
+}
diff --git a/datafusion/sql/src/expr/substring.rs b/datafusion/sql/src/expr/substring.rs
new file mode 100644
index 000000000..991f82a67
--- /dev/null
+++ b/datafusion/sql/src/expr/substring.rs
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue};
+use datafusion_expr::{BuiltinScalarFunction, Expr};
+use sqlparser::ast::Expr as SQLExpr;
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+ pub(super) fn sql_substring_to_expr(
+ &self,
+ expr: Box<SQLExpr>,
+ substring_from: Option<Box<SQLExpr>>,
+ substring_for: Option<Box<SQLExpr>>,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ let args = match (substring_from, substring_for) {
+ (Some(from_expr), Some(for_expr)) => {
+ let arg =
+ self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
+ let from_logic =
+ self.sql_expr_to_logical_expr(*from_expr, schema, planner_context)?;
+ let for_logic =
+ self.sql_expr_to_logical_expr(*for_expr, schema, planner_context)?;
+ vec![arg, from_logic, for_logic]
+ }
+ (Some(from_expr), None) => {
+ let arg =
+ self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
+ let from_logic =
+ self.sql_expr_to_logical_expr(*from_expr, schema, planner_context)?;
+ vec![arg, from_logic]
+ }
+ (None, Some(for_expr)) => {
+ let arg =
+ self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
+ let from_logic = Expr::Literal(ScalarValue::Int64(Some(1)));
+ let for_logic =
+ self.sql_expr_to_logical_expr(*for_expr, schema, planner_context)?;
+ vec![arg, from_logic, for_logic]
+ }
+ (None, None) => {
+ let orig_sql = SQLExpr::Substring {
+ expr,
+ substring_from: None,
+ substring_for: None,
+ };
+
+ return Err(DataFusionError::Plan(format!(
+ "Substring without for/from is not valid {orig_sql:?}"
+ )));
+ }
+ };
+
+ Ok(Expr::ScalarFunction {
+ fun: BuiltinScalarFunction::Substr,
+ args,
+ })
+ }
+}
diff --git a/datafusion/sql/src/expr/unary_op.rs b/datafusion/sql/src/expr/unary_op.rs
new file mode 100644
index 000000000..d24fc7154
--- /dev/null
+++ b/datafusion/sql/src/expr/unary_op.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use datafusion_common::{DFSchema, DataFusionError, Result};
+use datafusion_expr::{lit, Expr};
+use sqlparser::ast::{Expr as SQLExpr, UnaryOperator, Value};
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+ pub(crate) fn parse_sql_unary_op(
+ &self,
+ op: UnaryOperator,
+ expr: SQLExpr,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ match op {
+ UnaryOperator::Not => Ok(Expr::Not(Box::new(
+ self.sql_expr_to_logical_expr(expr, schema, planner_context)?,
+ ))),
+ UnaryOperator::Plus => {
+ Ok(self.sql_expr_to_logical_expr(expr, schema, planner_context)?)
+ }
+ UnaryOperator::Minus => {
+ match expr {
+ // optimization: if it's a number literal, we apply the negative operator
+ // here directly to calculate the new literal.
+ SQLExpr::Value(Value::Number(n, _)) => match n.parse::<i64>() {
+ Ok(n) => Ok(lit(-n)),
+ Err(_) => Ok(lit(-n
+ .parse::<f64>()
+ .map_err(|_e| {
+ DataFusionError::Internal(format!(
+ "negative operator can be only applied to integer and float operands, got: {n}"))
+ })?)),
+ },
+ // not a literal, apply negative operator on expression
+ _ => Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?))),
+ }
+ }
+ _ => Err(DataFusionError::NotImplemented(format!(
+ "Unsupported SQL unary operator {op:?}"
+ ))),
+ }
+ }
+}
diff --git a/datafusion/sql/src/expr/value.rs b/datafusion/sql/src/expr/value.rs
new file mode 100644
index 000000000..86ca28d4c
--- /dev/null
+++ b/datafusion/sql/src/expr/value.rs
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
+use arrow_schema::DataType;
+use datafusion_common::{parse_interval, DFSchema, DataFusionError, Result, ScalarValue};
+use datafusion_expr::{lit, Expr};
+use log::debug;
+use sqlparser::ast::{DateTimeField, Expr as SQLExpr, Value};
+use sqlparser::parser::ParserError::ParserError;
+use std::collections::HashSet;
+
+impl<'a, S: ContextProvider> SqlToRel<'a, S> {
+ pub(crate) fn parse_value(
+ &self,
+ value: Value,
+ param_data_types: &[DataType],
+ ) -> Result<Expr> {
+ match value {
+ Value::Number(n, _) => self.parse_sql_number(&n),
+ Value::SingleQuotedString(s) | Value::DoubleQuotedString(s) => Ok(lit(s)),
+ Value::Null => Ok(Expr::Literal(ScalarValue::Null)),
+ Value::Boolean(n) => Ok(lit(n)),
+ Value::Placeholder(param) => {
+ Self::create_placeholder_expr(param, param_data_types)
+ }
+ _ => Err(DataFusionError::Plan(format!(
+ "Unsupported Value '{:?}'",
+ value,
+ ))),
+ }
+ }
+
+ /// Parse number in sql string, convert to Expr::Literal
+ fn parse_sql_number(&self, n: &str) -> Result<Expr> {
+ if n.find('E').is_some() {
+ // not implemented yet
+ // https://github.com/apache/arrow-datafusion/issues/3448
+ Err(DataFusionError::NotImplemented(
+ "sql numeric literals in scientific notation are not supported"
+ .to_string(),
+ ))
+ } else if let Ok(n) = n.parse::<i64>() {
+ Ok(lit(n))
+ } else if self.options.parse_float_as_decimal {
+ // remove leading zeroes
+ let str = n.trim_start_matches('0');
+ if let Some(i) = str.find('.') {
+ let p = str.len() - 1;
+ let s = str.len() - i - 1;
+ let str = str.replace('.', "");
+ let n = str.parse::<i128>().map_err(|_| {
+ DataFusionError::from(ParserError(format!(
+ "Cannot parse {str} as i128 when building decimal"
+ )))
+ })?;
+ Ok(Expr::Literal(ScalarValue::Decimal128(
+ Some(n),
+ p as u8,
+ s as i8,
+ )))
+ } else {
+ let number = n.parse::<i128>().map_err(|_| {
+ DataFusionError::from(ParserError(format!(
+ "Cannot parse {n} as i128 when building decimal"
+ )))
+ })?;
+ Ok(Expr::Literal(ScalarValue::Decimal128(Some(number), 38, 0)))
+ }
+ } else {
+ n.parse::<f64>().map(lit).map_err(|_| {
+ DataFusionError::from(ParserError(format!("Cannot parse {n} as f64")))
+ })
+ }
+ }
+
+ /// Create a placeholder expression
+ /// This is the same as Postgres's prepare statement syntax in which a placeholder starts with `$` sign and then
+ /// number 1, 2, ... etc. For example, `$1` is the first placeholder; $2 is the second one and so on.
+ fn create_placeholder_expr(
+ param: String,
+ param_data_types: &[DataType],
+ ) -> Result<Expr> {
+ // Parse the placeholder as a number because it is the only support from sqlparser and postgres
+ let index = param[1..].parse::<usize>();
+ let idx = match index {
+ Ok(index) => index - 1,
+ Err(_) => {
+ return Err(DataFusionError::Internal(format!(
+ "Invalid placeholder, not a number: {param}"
+ )));
+ }
+ };
+ // Check if the placeholder is in the parameter list
+ if param_data_types.len() <= idx {
+ return Err(DataFusionError::Internal(format!(
+ "Placehoder {param} does not exist in the parameter list: {param_data_types:?}"
+ )));
+ }
+ // Data type of the parameter
+ let param_type = param_data_types[idx].clone();
+ debug!(
+ "type of param {} param_data_types[idx]: {:?}",
+ param, param_type
+ );
+
+ Ok(Expr::Placeholder {
+ id: param,
+ data_type: param_type,
+ })
+ }
+
+ pub(super) fn sql_array_literal(
+ &self,
+ elements: Vec<SQLExpr>,
+ schema: &DFSchema,
+ ) -> Result<Expr> {
+ let mut values = Vec::with_capacity(elements.len());
+
+ for element in elements {
+ let value = self.sql_expr_to_logical_expr(
+ element,
+ schema,
+ &mut PlannerContext::new(),
+ )?;
+ match value {
+ Expr::Literal(scalar) => {
+ values.push(scalar);
+ }
+ _ => {
+ return Err(DataFusionError::NotImplemented(format!(
+ "Arrays with elements other than literal are not supported: {value}"
+ )));
+ }
+ }
+ }
+
+ let data_types: HashSet<DataType> =
+ values.iter().map(|e| e.get_datatype()).collect();
+
+ if data_types.is_empty() {
+ Ok(lit(ScalarValue::new_list(None, DataType::Utf8)))
+ } else if data_types.len() > 1 {
+ Err(DataFusionError::NotImplemented(format!(
+ "Arrays with different types are not supported: {data_types:?}",
+ )))
+ } else {
+ let data_type = values[0].get_datatype();
+
+ Ok(lit(ScalarValue::new_list(Some(values), data_type)))
+ }
+ }
+
+ pub(super) fn sql_interval_to_expr(
+ &self,
+ value: SQLExpr,
+ leading_field: Option<DateTimeField>,
+ leading_precision: Option<u64>,
+ last_field: Option<DateTimeField>,
+ fractional_seconds_precision: Option<u64>,
+ ) -> Result<Expr> {
+ if leading_precision.is_some() {
+ return Err(DataFusionError::NotImplemented(format!(
+ "Unsupported Interval Expression with leading_precision {leading_precision:?}"
+ )));
+ }
+
+ if last_field.is_some() {
+ return Err(DataFusionError::NotImplemented(format!(
+ "Unsupported Interval Expression with last_field {last_field:?}"
+ )));
+ }
+
+ if fractional_seconds_precision.is_some() {
+ return Err(DataFusionError::NotImplemented(format!(
+ "Unsupported Interval Expression with fractional_seconds_precision {fractional_seconds_precision:?}"
+ )));
+ }
+
+ // Only handle string exprs for now
+ let value = match value {
+ SQLExpr::Value(
+ Value::SingleQuotedString(s) | Value::DoubleQuotedString(s),
+ ) => s,
+ _ => {
+ return Err(DataFusionError::NotImplemented(format!(
+ "Unsupported interval argument. Expected string literal, got: {value:?}"
+ )));
+ }
+ };
+
+ let leading_field = leading_field
+ .as_ref()
+ .map(|dt| dt.to_string())
+ .unwrap_or_else(|| "second".to_string());
+
+ Ok(lit(parse_interval(&leading_field, &value)?))
+ }
+}
diff --git a/datafusion/sql/src/lib.rs b/datafusion/sql/src/lib.rs
index 3b6526da8..1f18c0e3b 100644
--- a/datafusion/sql/src/lib.rs
+++ b/datafusion/sql/src/lib.rs
@@ -18,6 +18,7 @@
//! This module provides a SQL parser that translates SQL queries into an abstract syntax
//! tree (AST), and a SQL query planner that creates a logical plan from the AST.
+mod expr;
pub mod parser;
pub mod planner;
pub mod utils;
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 4d1b2c310..39ce2106b 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -16,35 +16,30 @@
// under the License.
//! SQL Query Planner (produces logical plan from SQL AST)
-use log::debug;
use std::collections::{HashMap, HashSet};
-use std::str::FromStr;
use std::sync::Arc;
-use std::{convert::TryInto, vec};
+use std::vec;
use arrow_schema::*;
-use sqlparser::ast::{ArrayAgg, ExactNumberInfo, SetQuantifier};
+use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{
- BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr,
- Function as SQLFunction, FunctionArg, FunctionArgExpr, Ident, Join, JoinConstraint,
- JoinOperator, ObjectName, Offset as SQLOffset, Query, Select, SelectItem, SetExpr,
- SetOperator, ShowCreateObject, ShowStatementFilter, TableAlias, TableFactor,
- TableWithJoins, TrimWhereField, UnaryOperator, Value, Values as SQLValues,
+ DataType as SQLDataType, Expr as SQLExpr, Ident, Join, JoinConstraint, JoinOperator,
+ ObjectName, Offset as SQLOffset, Query, Select, SelectItem, SetExpr, SetOperator,
+ ShowCreateObject, ShowStatementFilter, TableAlias, TableFactor, TableWithJoins,
+ UnaryOperator, Value, Values as SQLValues,
};
-use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
+use sqlparser::ast::{ExactNumberInfo, SetQuantifier};
use sqlparser::ast::{ObjectType, OrderByExpr, Statement};
use sqlparser::ast::{TimezoneInfo, WildcardAdditionalOptions};
use sqlparser::parser::ParserError::ParserError;
-use datafusion_common::parsers::{parse_interval, CompressionTypeVariant};
+use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::ToDFSchema;
use datafusion_common::{
field_not_found, Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
};
use datafusion_common::{OwnedTableReference, TableReference};
-use datafusion_expr::expr::{
- self, Between, BinaryExpr, Case, Cast, GroupingSet, Like, Sort, TryCast,
-};
+use datafusion_expr::expr::{Cast, GroupingSet};
use datafusion_expr::expr_rewriter::normalize_col;
use datafusion_expr::expr_rewriter::normalize_col_with_schemas;
use datafusion_expr::logical_plan::builder::project;
@@ -56,20 +51,14 @@ use datafusion_expr::logical_plan::{
DropTable, DropView, Explain, JoinType, LogicalPlan, LogicalPlanBuilder,
Partitioning, PlanType, SetVariable, ToStringifiedPlan,
};
-use datafusion_expr::logical_plan::{Filter, Prepare, Subquery};
+use datafusion_expr::logical_plan::{Filter, Prepare};
use datafusion_expr::utils::{
expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns,
- find_aggregate_exprs, find_column_exprs, find_window_exprs, COUNT_STAR_EXPANSION,
+ find_aggregate_exprs, find_column_exprs, find_window_exprs,
};
use datafusion_expr::Expr::Alias;
-use datafusion_expr::{
- cast, col, lit, AggregateFunction, AggregateUDF, Expr, ExprSchemable,
- GetIndexedField, Operator, ScalarUDF, SubqueryAlias, WindowFrame, WindowFrameUnits,
-};
-use datafusion_expr::{
- window_function::{self, WindowFunction},
- BuiltinScalarFunction, TableSource,
-};
+use datafusion_expr::TableSource;
+use datafusion_expr::{cast, col, lit, AggregateUDF, Expr, ScalarUDF, SubqueryAlias};
use crate::parser::{CreateExternalTable, DescribeTable, Statement as DFStatement};
use crate::utils::{make_decimal_type, normalize_ident, resolve_columns};
@@ -100,7 +89,7 @@ pub trait ContextProvider {
/// SQL parser options
#[derive(Debug, Default)]
pub struct ParserOptions {
- parse_float_as_decimal: bool,
+ pub(crate) parse_float_as_decimal: bool,
}
#[derive(Debug, Clone)]
@@ -141,44 +130,8 @@ impl PlannerContext {
/// SQL query planner
pub struct SqlToRel<'a, S: ContextProvider> {
- schema_provider: &'a S,
- options: ParserOptions,
-}
-
-fn plan_key(key: SQLExpr) -> Result<ScalarValue> {
- let scalar = match key {
- SQLExpr::Value(Value::Number(s, _)) => ScalarValue::Int64(Some(
- s.parse()
- .map_err(|_| ParserError(format!("Cannot parse {s} as i64.")))?,
- )),
- SQLExpr::Value(Value::SingleQuotedString(s) | Value::DoubleQuotedString(s)) => {
- ScalarValue::Utf8(Some(s))
- }
- _ => {
- return Err(DataFusionError::SQL(ParserError(format!(
- "Unsuported index key expression: {key:?}"
- ))));
- }
- };
-
- Ok(scalar)
-}
-
-fn plan_indexed(expr: Expr, mut keys: Vec<SQLExpr>) -> Result<Expr> {
- let key = keys.pop().ok_or_else(|| {
- ParserError("Internal error: Missing index key expression".to_string())
- })?;
-
- let expr = if !keys.is_empty() {
- plan_indexed(expr, keys)?
- } else {
- expr
- };
-
- Ok(Expr::GetIndexedField(GetIndexedField::new(
- Box::new(expr),
- plan_key(key)?,
- )))
+ pub(crate) schema_provider: &'a S,
+ pub(crate) options: ParserOptions,
}
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
@@ -1413,49 +1366,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
LogicalPlanBuilder::from(plan).sort(order_by_rex)?.build()
}
- /// convert sql OrderByExpr to Expr::Sort
- fn order_by_to_sort_expr(&self, e: OrderByExpr, schema: &DFSchema) -> Result<Expr> {
- let OrderByExpr {
- asc,
- expr,
- nulls_first,
- } = e;
-
- let expr = match expr {
- SQLExpr::Value(Value::Number(v, _)) => {
- let field_index = v
- .parse::<usize>()
- .map_err(|err| DataFusionError::Plan(err.to_string()))?;
-
- if field_index == 0 {
- return Err(DataFusionError::Plan(
- "Order by index starts at 1 for column indexes".to_string(),
- ));
- } else if schema.fields().len() < field_index {
- return Err(DataFusionError::Plan(format!(
- "Order by column out of bounds, specified: {}, max: {}",
- field_index,
- schema.fields().len()
- )));
- }
-
- let field = schema.field(field_index - 1);
- Expr::Column(field.qualified_column())
- }
- e => self.sql_expr_to_logical_expr(e, schema, &mut PlannerContext::new())?,
- };
- Ok({
- let asc = asc.unwrap_or(true);
- Expr::Sort(Sort::new(
- Box::new(expr),
- asc,
- // when asc is true, by default nulls last to be consistent with postgres
- // postgres rule: https://www.postgresql.org/docs/current/queries-order.html
- nulls_first.unwrap_or(!asc),
- ))
- })
- }
-
/// Validate the schema provides all of the columns referenced in the expressions.
fn validate_schema_satisfies_exprs(
&self,
@@ -1618,128 +1528,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}
- fn sql_fn_arg_to_logical_expr(
- &self,
- sql: FunctionArg,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- match sql {
- FunctionArg::Named {
- name: _,
- arg: FunctionArgExpr::Expr(arg),
- } => self.sql_expr_to_logical_expr(arg, schema, planner_context),
- FunctionArg::Named {
- name: _,
- arg: FunctionArgExpr::Wildcard,
- } => Ok(Expr::Wildcard),
- FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
- self.sql_expr_to_logical_expr(arg, schema, planner_context)
- }
- FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => Ok(Expr::Wildcard),
- _ => Err(DataFusionError::NotImplemented(format!(
- "Unsupported qualified wildcard argument: {sql:?}"
- ))),
- }
- }
-
- fn parse_sql_binary_op(
- &self,
- left: SQLExpr,
- op: BinaryOperator,
- right: SQLExpr,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- let operator = match op {
- BinaryOperator::Gt => Ok(Operator::Gt),
- BinaryOperator::GtEq => Ok(Operator::GtEq),
- BinaryOperator::Lt => Ok(Operator::Lt),
- BinaryOperator::LtEq => Ok(Operator::LtEq),
- BinaryOperator::Eq => Ok(Operator::Eq),
- BinaryOperator::NotEq => Ok(Operator::NotEq),
- BinaryOperator::Plus => Ok(Operator::Plus),
- BinaryOperator::Minus => Ok(Operator::Minus),
- BinaryOperator::Multiply => Ok(Operator::Multiply),
- BinaryOperator::Divide => Ok(Operator::Divide),
- BinaryOperator::Modulo => Ok(Operator::Modulo),
- BinaryOperator::And => Ok(Operator::And),
- BinaryOperator::Or => Ok(Operator::Or),
- BinaryOperator::PGRegexMatch => Ok(Operator::RegexMatch),
- BinaryOperator::PGRegexIMatch => Ok(Operator::RegexIMatch),
- BinaryOperator::PGRegexNotMatch => Ok(Operator::RegexNotMatch),
- BinaryOperator::PGRegexNotIMatch => Ok(Operator::RegexNotIMatch),
- BinaryOperator::BitwiseAnd => Ok(Operator::BitwiseAnd),
- BinaryOperator::BitwiseOr => Ok(Operator::BitwiseOr),
- BinaryOperator::BitwiseXor => Ok(Operator::BitwiseXor),
- BinaryOperator::PGBitwiseShiftRight => Ok(Operator::BitwiseShiftRight),
- BinaryOperator::PGBitwiseShiftLeft => Ok(Operator::BitwiseShiftLeft),
- BinaryOperator::StringConcat => Ok(Operator::StringConcat),
- _ => Err(DataFusionError::NotImplemented(format!(
- "Unsupported SQL binary operator {op:?}"
- ))),
- }?;
-
- Ok(Expr::BinaryExpr(BinaryExpr::new(
- Box::new(self.sql_expr_to_logical_expr(left, schema, planner_context)?),
- operator,
- Box::new(self.sql_expr_to_logical_expr(right, schema, planner_context)?),
- )))
- }
-
- fn parse_sql_unary_op(
- &self,
- op: UnaryOperator,
- expr: SQLExpr,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- match op {
- UnaryOperator::Not => Ok(Expr::Not(Box::new(
- self.sql_expr_to_logical_expr(expr, schema, planner_context)?,
- ))),
- UnaryOperator::Plus => {
- Ok(self.sql_expr_to_logical_expr(expr, schema, planner_context)?)
- }
- UnaryOperator::Minus => {
- match expr {
- // optimization: if it's a number literal, we apply the negative operator
- // here directly to calculate the new literal.
- SQLExpr::Value(Value::Number(n, _)) => match n.parse::<i64>() {
- Ok(n) => Ok(lit(-n)),
- Err(_) => Ok(lit(-n
- .parse::<f64>()
- .map_err(|_e| {
- DataFusionError::Internal(format!(
- "negative operator can be only applied to integer and float operands, got: {n}"))
- })?)),
- },
- // not a literal, apply negative operator on expression
- _ => Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?))),
- }
- }
- _ => Err(DataFusionError::NotImplemented(format!(
- "Unsupported SQL unary operator {op:?}"
- ))),
- }
- }
-
- fn parse_value(&self, value: Value, param_data_types: &[DataType]) -> Result<Expr> {
- match value {
- Value::Number(n, _) => self.parse_sql_number(&n),
- Value::SingleQuotedString(s) | Value::DoubleQuotedString(s) => Ok(lit(s)),
- Value::Null => Ok(Expr::Literal(ScalarValue::Null)),
- Value::Boolean(n) => Ok(lit(n)),
- Value::Placeholder(param) => {
- Self::create_placeholder_expr(param, param_data_types)
- }
- _ => Err(DataFusionError::Plan(format!(
- "Unsupported Value '{:?}'",
- value,
- ))),
- }
- }
-
fn sql_values_to_plan(
&self,
values: SQLValues,
@@ -1798,978 +1586,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
LogicalPlanBuilder::values(values)?.build()
}
- /// Create a placeholder expression
- /// This is the same as Postgres's prepare statement syntax in which a placeholder starts with `$` sign and then
- /// number 1, 2, ... etc. For example, `$1` is the first placeholder; $2 is the second one and so on.
- fn create_placeholder_expr(
- param: String,
- param_data_types: &[DataType],
- ) -> Result<Expr> {
- // Parse the placeholder as a number because it is the only support from sqlparser and postgres
- let index = param[1..].parse::<usize>();
- let idx = match index {
- Ok(index) => index - 1,
- Err(_) => {
- return Err(DataFusionError::Internal(format!(
- "Invalid placeholder, not a number: {param}"
- )));
- }
- };
- // Check if the placeholder is in the parameter list
- if param_data_types.len() <= idx {
- return Err(DataFusionError::Internal(format!(
- "Placehoder {param} does not exist in the parameter list: {param_data_types:?}"
- )));
- }
- // Data type of the parameter
- let param_type = param_data_types[idx].clone();
- debug!(
- "type of param {} param_data_types[idx]: {:?}",
- param, param_type
- );
-
- Ok(Expr::Placeholder {
- id: param,
- data_type: param_type,
- })
- }
-
- fn sql_expr_to_logical_expr(
- &self,
- sql: SQLExpr,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- // Workaround for https://github.com/apache/arrow-datafusion/issues/4065
- //
- // Minimize stack space required in debug builds to plan
- // deeply nested binary operators by keeping the stack space
- // needed for sql_expr_to_logical_expr minimal for BinaryOp
- //
- // The reason this reduces stack size in debug builds is
- // explained in the "Technical Backstory" heading of
- // https://github.com/apache/arrow-datafusion/pull/1047
- //
- // A likely better way to support deeply nested expressions
- // would be to avoid recursion all together and use an
- // iterative algorithm.
- match sql {
- SQLExpr::BinaryOp { left, op, right } => {
- self.parse_sql_binary_op(*left, op, *right, schema, planner_context)
- }
- // since this function requires more space per frame
- // avoid calling it for binary ops
- _ => self.sql_expr_to_logical_expr_internal(sql, schema, planner_context),
- }
- }
-
- /// Internal implementation. Use
- /// [`Self::sql_expr_to_logical_expr`] to plan exprs.
- fn sql_expr_to_logical_expr_internal(
- &self,
- sql: SQLExpr,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- match sql {
- SQLExpr::Value(value) => {
- self.parse_value(value, &planner_context.prepare_param_data_types)
- }
- SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction {
- fun: BuiltinScalarFunction::DatePart,
- args: vec![
- Expr::Literal(ScalarValue::Utf8(Some(format!("{field}")))),
- self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
- ],
- }),
-
- SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema),
- SQLExpr::Interval {
- value,
- leading_field,
- leading_precision,
- last_field,
- fractional_seconds_precision,
- } => self.sql_interval_to_expr(
- *value,
- leading_field,
- leading_precision,
- last_field,
- fractional_seconds_precision,
- ),
- SQLExpr::Identifier(id) => self.sql_identifier_to_expr(id),
-
- SQLExpr::MapAccess { column, keys } => {
- if let SQLExpr::Identifier(id) = *column {
- plan_indexed(col(normalize_ident(id)), keys)
- } else {
- Err(DataFusionError::NotImplemented(format!(
- "map access requires an identifier, found column {column} instead"
- )))
- }
- }
-
- SQLExpr::ArrayIndex { obj, indexes } => {
- let expr = self.sql_expr_to_logical_expr(*obj, schema, planner_context)?;
- plan_indexed(expr, indexes)
- }
-
- SQLExpr::CompoundIdentifier(ids) => self.sql_compound_identifier_to_expr(ids, schema),
-
- SQLExpr::Case {
- operand,
- conditions,
- results,
- else_result,
- } => self.sql_case_identifier_to_expr(operand, conditions, results, else_result, schema, planner_context),
-
- SQLExpr::Cast {
- expr,
- data_type,
- } => Ok(Expr::Cast(Cast::new(
- Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?),
- self.convert_data_type(&data_type)?,
- ))),
-
- SQLExpr::TryCast {
- expr,
- data_type,
- } => Ok(Expr::TryCast(TryCast::new(
- Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?),
- self.convert_data_type(&data_type)?,
- ))),
-
- SQLExpr::TypedString {
- data_type,
- value,
- } => Ok(Expr::Cast(Cast::new(
- Box::new(lit(value)),
- self.convert_data_type(&data_type)?,
- ))),
-
- SQLExpr::IsNull(expr) => Ok(Expr::IsNull(Box::new(
- self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
- ))),
-
- SQLExpr::IsNotNull(expr) => Ok(Expr::IsNotNull(Box::new(
- self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
- ))),
-
- SQLExpr::IsDistinctFrom(left, right) => Ok(Expr::BinaryExpr(BinaryExpr::new(
- Box::new(self.sql_expr_to_logical_expr(*left, schema, planner_context)?),
- Operator::IsDistinctFrom,
- Box::new(self.sql_expr_to_logical_expr(*right, schema, planner_context)?),
- ))),
-
- SQLExpr::IsNotDistinctFrom(left, right) => Ok(Expr::BinaryExpr(BinaryExpr::new(
- Box::new(self.sql_expr_to_logical_expr(*left, schema, planner_context)?),
- Operator::IsNotDistinctFrom,
- Box::new(self.sql_expr_to_logical_expr(*right, schema, planner_context)?),
- ))),
-
- SQLExpr::IsTrue(expr) => Ok(Expr::IsTrue(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
-
- SQLExpr::IsFalse(expr) => Ok(Expr::IsFalse(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
-
- SQLExpr::IsNotTrue(expr) => Ok(Expr::IsNotTrue(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
-
- SQLExpr::IsNotFalse(expr) => Ok(Expr::IsNotFalse(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
-
- SQLExpr::IsUnknown(expr) => Ok(Expr::IsUnknown(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
-
- SQLExpr::IsNotUnknown(expr) => Ok(Expr::IsNotUnknown(Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?))),
-
- SQLExpr::UnaryOp { op, expr } => self.parse_sql_unary_op(op, *expr, schema, planner_context),
-
- SQLExpr::Between {
- expr,
- negated,
- low,
- high,
- } => Ok(Expr::Between(Between::new(
- Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?),
- negated,
- Box::new(self.sql_expr_to_logical_expr(*low, schema, planner_context)?),
- Box::new(self.sql_expr_to_logical_expr(*high, schema, planner_context)?),
- ))),
-
- SQLExpr::InList {
- expr,
- list,
- negated,
- } => self.sql_in_list_to_expr(*expr, list, negated, schema, planner_context),
-
- SQLExpr::Like { negated, expr, pattern, escape_char } => self.sql_like_to_expr(negated, *expr, *pattern, escape_char, schema, planner_context),
-
- SQLExpr::ILike { negated, expr, pattern, escape_char } => self.sql_ilike_to_expr(negated, *expr, *pattern, escape_char, schema, planner_context),
-
- SQLExpr::SimilarTo { negated, expr, pattern, escape_char } => self.sql_similarto_to_expr(negated, *expr, *pattern, escape_char, schema, planner_context),
-
-
- SQLExpr::BinaryOp {
- ..
- } => {
- Err(DataFusionError::Internal(
- "binary_op should be handled by sql_expr_to_logical_expr.".to_string()
- ))
- }
-
-
- #[cfg(feature = "unicode_expressions")]
- SQLExpr::Substring {
- expr,
- substring_from,
- substring_for,
- } => self.sql_substring_to_expr(expr, substring_from, substring_for, schema, planner_context),
-
- #[cfg(not(feature = "unicode_expressions"))]
- SQLExpr::Substring {
- ..
- } => {
- Err(DataFusionError::Internal(
- "statement substring requires compilation with feature flag: unicode_expressions.".to_string()
- ))
- }
-
- SQLExpr::Trim { expr, trim_where, trim_what } => self.sql_trim_to_expr(*expr, trim_where, trim_what, schema, planner_context),
-
- SQLExpr::AggregateExpressionWithFilter { expr, filter } => self.sql_agg_with_filter_to_expr(*expr, *filter, schema, planner_context),
-
- SQLExpr::Function(function) => self.sql_function_to_expr(function, schema, planner_context),
-
- SQLExpr::Rollup(exprs) => self.sql_rollup_to_expr(exprs, schema, planner_context),
- SQLExpr::Cube(exprs) => self.sql_cube_to_expr(exprs,schema, planner_context),
- SQLExpr::GroupingSets(exprs) => self.sql_grouping_sets_to_expr(exprs, schema, planner_context),
-
- SQLExpr::Floor { expr, field: _field } => self.sql_named_function_to_expr(*expr, BuiltinScalarFunction::Floor, schema, planner_context),
-
- SQLExpr::Ceil { expr, field: _field } => self.sql_named_function_to_expr(*expr, BuiltinScalarFunction::Ceil, schema, planner_context),
-
- SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(*e, schema, planner_context),
-
- SQLExpr::Exists { subquery, negated } => self.parse_exists_subquery(*subquery, negated, schema, planner_context),
-
- SQLExpr::InSubquery { expr, subquery, negated } => self.parse_in_subquery(*expr, *subquery, negated, schema, planner_context),
-
- SQLExpr::Subquery(subquery) => self.parse_scalar_subquery(*subquery, schema, planner_context),
-
- SQLExpr::ArrayAgg(array_agg) => self.parse_array_agg(array_agg, schema, planner_context),
-
- _ => Err(DataFusionError::NotImplemented(format!(
- "Unsupported ast node in sqltorel: {sql:?}"
- ))),
- }
- }
-
- fn sql_function_to_expr(
- &self,
- mut function: SQLFunction,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- let name = if function.name.0.len() > 1 {
- // DF doesn't handle compound identifiers
- // (e.g. "foo.bar") for function names yet
- function.name.to_string()
- } else {
- normalize_ident(function.name.0[0].clone())
- };
-
- // next, scalar built-in
- if let Ok(fun) = BuiltinScalarFunction::from_str(&name) {
- let args = self.function_args_to_expr(function.args, schema)?;
- return Ok(Expr::ScalarFunction { fun, args });
- };
-
- // then, window function
- if let Some(window) = function.over.take() {
- let partition_by = window
- .partition_by
- .into_iter()
- .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
- .collect::<Result<Vec<_>>>()?;
- let order_by = window
- .order_by
- .into_iter()
- .map(|e| self.order_by_to_sort_expr(e, schema))
- .collect::<Result<Vec<_>>>()?;
- let window_frame = window
- .window_frame
- .as_ref()
- .map(|window_frame| {
- let window_frame: WindowFrame = window_frame.clone().try_into()?;
- if WindowFrameUnits::Range == window_frame.units
- && order_by.len() != 1
- {
- Err(DataFusionError::Plan(format!(
- "With window frame of type RANGE, the order by expression must be of length 1, got {}", order_by.len())))
- } else {
- Ok(window_frame)
- }
- })
- .transpose()?;
- let window_frame = if let Some(window_frame) = window_frame {
- window_frame
- } else {
- WindowFrame::new(!order_by.is_empty())
- };
- let fun = self.find_window_func(&name)?;
- let expr = match fun {
- WindowFunction::AggregateFunction(aggregate_fun) => {
- let (aggregate_fun, args) =
- self.aggregate_fn_to_expr(aggregate_fun, function.args, schema)?;
-
- Expr::WindowFunction(expr::WindowFunction::new(
- WindowFunction::AggregateFunction(aggregate_fun),
- args,
- partition_by,
- order_by,
- window_frame,
- ))
- }
- _ => Expr::WindowFunction(expr::WindowFunction::new(
- fun,
- self.function_args_to_expr(function.args, schema)?,
- partition_by,
- order_by,
- window_frame,
- )),
- };
- return Ok(expr);
- }
-
- // next, aggregate built-ins
- if let Ok(fun) = AggregateFunction::from_str(&name) {
- let distinct = function.distinct;
- let (fun, args) = self.aggregate_fn_to_expr(fun, function.args, schema)?;
- return Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
- fun, args, distinct, None,
- )));
- };
-
- // finally, user-defined functions (UDF) and UDAF
- match self.schema_provider.get_function_meta(&name) {
- Some(fm) => {
- let args = self.function_args_to_expr(function.args, schema)?;
-
- Ok(Expr::ScalarUDF { fun: fm, args })
- }
- None => match self.schema_provider.get_aggregate_meta(&name) {
- Some(fm) => {
- let args = self.function_args_to_expr(function.args, schema)?;
- Ok(Expr::AggregateUDF {
- fun: fm,
- args,
- filter: None,
- })
- }
- _ => Err(DataFusionError::Plan(format!("Invalid function '{name}'"))),
- },
- }
- }
-
- fn sql_named_function_to_expr(
- &self,
- expr: SQLExpr,
- fun: BuiltinScalarFunction,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- let args = vec![self.sql_expr_to_logical_expr(expr, schema, planner_context)?];
- Ok(Expr::ScalarFunction { fun, args })
- }
-
- fn sql_substring_to_expr(
- &self,
- expr: Box<SQLExpr>,
- substring_from: Option<Box<SQLExpr>>,
- substring_for: Option<Box<SQLExpr>>,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- let args = match (substring_from, substring_for) {
- (Some(from_expr), Some(for_expr)) => {
- let arg =
- self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
- let from_logic =
- self.sql_expr_to_logical_expr(*from_expr, schema, planner_context)?;
- let for_logic =
- self.sql_expr_to_logical_expr(*for_expr, schema, planner_context)?;
- vec![arg, from_logic, for_logic]
- }
- (Some(from_expr), None) => {
- let arg =
- self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
- let from_logic =
- self.sql_expr_to_logical_expr(*from_expr, schema, planner_context)?;
- vec![arg, from_logic]
- }
- (None, Some(for_expr)) => {
- let arg =
- self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
- let from_logic = Expr::Literal(ScalarValue::Int64(Some(1)));
- let for_logic =
- self.sql_expr_to_logical_expr(*for_expr, schema, planner_context)?;
- vec![arg, from_logic, for_logic]
- }
- (None, None) => {
- let orig_sql = SQLExpr::Substring {
- expr,
- substring_from: None,
- substring_for: None,
- };
-
- return Err(DataFusionError::Plan(format!(
- "Substring without for/from is not valid {orig_sql:?}"
- )));
- }
- };
-
- Ok(Expr::ScalarFunction {
- fun: BuiltinScalarFunction::Substr,
- args,
- })
- }
-
- fn sql_in_list_to_expr(
- &self,
- expr: SQLExpr,
- list: Vec<SQLExpr>,
- negated: bool,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- let list_expr = list
- .into_iter()
- .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
- .collect::<Result<Vec<_>>>()?;
-
- Ok(Expr::InList {
- expr: Box::new(self.sql_expr_to_logical_expr(
- expr,
- schema,
- planner_context,
- )?),
- list: list_expr,
- negated,
- })
- }
-
- fn sql_like_to_expr(
- &self,
- negated: bool,
- expr: SQLExpr,
- pattern: SQLExpr,
- escape_char: Option<char>,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
- let pattern_type = pattern.get_type(schema)?;
- if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
- return Err(DataFusionError::Plan(
- "Invalid pattern in LIKE expression".to_string(),
- ));
- }
- Ok(Expr::Like(Like::new(
- negated,
- Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
- Box::new(pattern),
- escape_char,
- )))
- }
-
- fn sql_ilike_to_expr(
- &self,
- negated: bool,
- expr: SQLExpr,
- pattern: SQLExpr,
- escape_char: Option<char>,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
- let pattern_type = pattern.get_type(schema)?;
- if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
- return Err(DataFusionError::Plan(
- "Invalid pattern in ILIKE expression".to_string(),
- ));
- }
- Ok(Expr::ILike(Like::new(
- negated,
- Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
- Box::new(pattern),
- escape_char,
- )))
- }
-
- fn sql_similarto_to_expr(
- &self,
- negated: bool,
- expr: SQLExpr,
- pattern: SQLExpr,
- escape_char: Option<char>,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
- let pattern_type = pattern.get_type(schema)?;
- if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
- return Err(DataFusionError::Plan(
- "Invalid pattern in SIMILAR TO expression".to_string(),
- ));
- }
- Ok(Expr::SimilarTo(Like::new(
- negated,
- Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
- Box::new(pattern),
- escape_char,
- )))
- }
-
- fn sql_trim_to_expr(
- &self,
- expr: SQLExpr,
- trim_where: Option<TrimWhereField>,
- trim_what: Option<Box<SQLExpr>>,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- let fun = match trim_where {
- Some(TrimWhereField::Leading) => BuiltinScalarFunction::Ltrim,
- Some(TrimWhereField::Trailing) => BuiltinScalarFunction::Rtrim,
- Some(TrimWhereField::Both) => BuiltinScalarFunction::Btrim,
- None => BuiltinScalarFunction::Trim,
- };
- let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
- let args = match trim_what {
- Some(to_trim) => {
- let to_trim =
- self.sql_expr_to_logical_expr(*to_trim, schema, planner_context)?;
- vec![arg, to_trim]
- }
- None => vec![arg],
- };
- Ok(Expr::ScalarFunction { fun, args })
- }
-
- fn sql_agg_with_filter_to_expr(
- &self,
- expr: SQLExpr,
- filter: SQLExpr,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- match self.sql_expr_to_logical_expr(expr, schema, planner_context)? {
- Expr::AggregateFunction(expr::AggregateFunction {
- fun,
- args,
- distinct,
- ..
- }) => Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
- fun,
- args,
- distinct,
- Some(Box::new(self.sql_expr_to_logical_expr(
- filter,
- schema,
- planner_context,
- )?)),
- ))),
- _ => Err(DataFusionError::Internal(
- "AggregateExpressionWithFilter expression was not an AggregateFunction"
- .to_string(),
- )),
- }
- }
-
- fn sql_identifier_to_expr(&self, id: Ident) -> Result<Expr> {
- if id.value.starts_with('@') {
- // TODO: figure out if ScalarVariables should be insensitive.
- let var_names = vec![id.value];
- let ty = self
- .schema_provider
- .get_variable_type(&var_names)
- .ok_or_else(|| {
- DataFusionError::Execution(format!(
- "variable {var_names:?} has no type information"
- ))
- })?;
- Ok(Expr::ScalarVariable(ty, var_names))
- } else {
- // Don't use `col()` here because it will try to
- // interpret names with '.' as if they were
- // compound identifiers, but this is not a compound
- // identifier. (e.g. it is "foo.bar" not foo.bar)
-
- Ok(Expr::Column(Column {
- relation: None,
- name: normalize_ident(id),
- }))
- }
- }
-
- fn sql_compound_identifier_to_expr(
- &self,
- ids: Vec<Ident>,
- schema: &DFSchema,
- ) -> Result<Expr> {
- if ids[0].value.starts_with('@') {
- let var_names: Vec<_> = ids.into_iter().map(normalize_ident).collect();
- let ty = self
- .schema_provider
- .get_variable_type(&var_names)
- .ok_or_else(|| {
- DataFusionError::Execution(format!(
- "variable {var_names:?} has no type information"
- ))
- })?;
- Ok(Expr::ScalarVariable(ty, var_names))
- } else {
- // only support "schema.table" type identifiers here
- let (name, relation) = match idents_to_table_reference(ids)? {
- OwnedTableReference::Partial { schema, table } => (table, schema),
- r @ OwnedTableReference::Bare { .. }
- | r @ OwnedTableReference::Full { .. } => {
- return Err(DataFusionError::Plan(format!(
- "Unsupported compound identifier '{r:?}'",
- )));
- }
- };
-
- // Try and find the reference in schema
- match schema.field_with_qualified_name(&relation, &name) {
- Ok(_) => {
- // found an exact match on a qualified name so this is a table.column identifier
- Ok(Expr::Column(Column {
- relation: Some(relation),
- name,
- }))
- }
- Err(_) => {
- if let Some(field) =
- schema.fields().iter().find(|f| f.name().eq(&relation))
- {
- // Access to a field of a column which is a structure, example: SELECT my_struct.key
- Ok(Expr::GetIndexedField(GetIndexedField::new(
- Box::new(Expr::Column(field.qualified_column())),
- ScalarValue::Utf8(Some(name)),
- )))
- } else {
- // table.column identifier
- Ok(Expr::Column(Column {
- relation: Some(relation),
- name,
- }))
- }
- }
- }
- }
- }
-
- fn sql_case_identifier_to_expr(
- &self,
- operand: Option<Box<SQLExpr>>,
- conditions: Vec<SQLExpr>,
- results: Vec<SQLExpr>,
- else_result: Option<Box<SQLExpr>>,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- let expr = if let Some(e) = operand {
- Some(Box::new(self.sql_expr_to_logical_expr(
- *e,
- schema,
- planner_context,
- )?))
- } else {
- None
- };
- let when_expr = conditions
- .into_iter()
- .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
- .collect::<Result<Vec<_>>>()?;
- let then_expr = results
- .into_iter()
- .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
- .collect::<Result<Vec<_>>>()?;
- let else_expr = if let Some(e) = else_result {
- Some(Box::new(self.sql_expr_to_logical_expr(
- *e,
- schema,
- planner_context,
- )?))
- } else {
- None
- };
-
- Ok(Expr::Case(Case::new(
- expr,
- when_expr
- .iter()
- .zip(then_expr.iter())
- .map(|(w, t)| (Box::new(w.to_owned()), Box::new(t.to_owned())))
- .collect(),
- else_expr,
- )))
- }
-
- fn find_window_func(&self, name: &str) -> Result<WindowFunction> {
- window_function::find_df_window_func(name)
- .or_else(|| {
- self.schema_provider
- .get_aggregate_meta(name)
- .map(WindowFunction::AggregateUDF)
- })
- .ok_or_else(|| {
- DataFusionError::Plan(format!("There is no window function named {name}"))
- })
- }
-
- fn sql_rollup_to_expr(
- &self,
- exprs: Vec<Vec<SQLExpr>>,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- let args: Result<Vec<_>> = exprs
- .into_iter()
- .map(|v| {
- if v.len() != 1 {
- Err(DataFusionError::Internal(
- "Tuple expressions are not supported for Rollup expressions"
- .to_string(),
- ))
- } else {
- self.sql_expr_to_logical_expr(v[0].clone(), schema, planner_context)
- }
- })
- .collect();
- Ok(Expr::GroupingSet(GroupingSet::Rollup(args?)))
- }
-
- fn sql_cube_to_expr(
- &self,
- exprs: Vec<Vec<SQLExpr>>,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- let args: Result<Vec<_>> = exprs
- .into_iter()
- .map(|v| {
- if v.len() != 1 {
- Err(DataFusionError::Internal(
- "Tuple expressions not are supported for Cube expressions"
- .to_string(),
- ))
- } else {
- self.sql_expr_to_logical_expr(v[0].clone(), schema, planner_context)
- }
- })
- .collect();
- Ok(Expr::GroupingSet(GroupingSet::Cube(args?)))
- }
-
- fn sql_grouping_sets_to_expr(
- &self,
- exprs: Vec<Vec<SQLExpr>>,
- schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- let args: Result<Vec<Vec<_>>> = exprs
- .into_iter()
- .map(|v| {
- v.into_iter()
- .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
- .collect()
- })
- .collect();
- Ok(Expr::GroupingSet(GroupingSet::GroupingSets(args?)))
- }
-
- fn parse_exists_subquery(
- &self,
- subquery: Query,
- negated: bool,
- input_schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- Ok(Expr::Exists {
- subquery: Subquery {
- subquery: Arc::new(self.subquery_to_plan(
- subquery,
- planner_context,
- input_schema,
- )?),
- },
- negated,
- })
- }
-
- fn parse_in_subquery(
- &self,
- expr: SQLExpr,
- subquery: Query,
- negated: bool,
- input_schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- Ok(Expr::InSubquery {
- expr: Box::new(self.sql_to_rex(expr, input_schema, planner_context)?),
- subquery: Subquery {
- subquery: Arc::new(self.subquery_to_plan(
- subquery,
- planner_context,
- input_schema,
- )?),
- },
- negated,
- })
- }
-
- fn parse_scalar_subquery(
- &self,
- subquery: Query,
- input_schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- Ok(Expr::ScalarSubquery(Subquery {
- subquery: Arc::new(self.subquery_to_plan(
- subquery,
- planner_context,
- input_schema,
- )?),
- }))
- }
-
- fn parse_array_agg(
- &self,
- array_agg: ArrayAgg,
- input_schema: &DFSchema,
- planner_context: &mut PlannerContext,
- ) -> Result<Expr> {
- // Some dialects have special syntax for array_agg. DataFusion only supports it like a function.
- let ArrayAgg {
- distinct,
- expr,
- order_by,
- limit,
- within_group,
- } = array_agg;
-
- if let Some(order_by) = order_by {
- return Err(DataFusionError::NotImplemented(format!(
- "ORDER BY not supported in ARRAY_AGG: {order_by}"
- )));
- }
-
- if let Some(limit) = limit {
- return Err(DataFusionError::NotImplemented(format!(
- "LIMIT not supported in ARRAY_AGG: {limit}"
- )));
- }
-
- if within_group {
- return Err(DataFusionError::NotImplemented(
- "WITHIN GROUP not supported in ARRAY_AGG".to_string(),
- ));
- }
-
- let args =
- vec![self.sql_expr_to_logical_expr(*expr, input_schema, planner_context)?];
- // next, aggregate built-ins
- let fun = AggregateFunction::ArrayAgg;
-
- Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
- fun, args, distinct, None,
- )))
- }
-
- fn function_args_to_expr(
- &self,
- args: Vec<FunctionArg>,
- schema: &DFSchema,
- ) -> Result<Vec<Expr>> {
- args.into_iter()
- .map(|a| {
- self.sql_fn_arg_to_logical_expr(a, schema, &mut PlannerContext::new())
- })
- .collect::<Result<Vec<Expr>>>()
- }
-
- fn aggregate_fn_to_expr(
- &self,
- fun: AggregateFunction,
- args: Vec<FunctionArg>,
- schema: &DFSchema,
- ) -> Result<(AggregateFunction, Vec<Expr>)> {
- let args = match fun {
- // Special case rewrite COUNT(*) to COUNT(constant)
- AggregateFunction::Count => args
- .into_iter()
- .map(|a| match a {
- FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
- Ok(Expr::Literal(COUNT_STAR_EXPANSION.clone()))
- }
- _ => self.sql_fn_arg_to_logical_expr(
- a,
- schema,
- &mut PlannerContext::new(),
- ),
- })
- .collect::<Result<Vec<Expr>>>()?,
- _ => self.function_args_to_expr(args, schema)?,
- };
-
- Ok((fun, args))
- }
-
- fn sql_interval_to_expr(
- &self,
- value: SQLExpr,
- leading_field: Option<DateTimeField>,
- leading_precision: Option<u64>,
- last_field: Option<DateTimeField>,
- fractional_seconds_precision: Option<u64>,
- ) -> Result<Expr> {
- if leading_precision.is_some() {
- return Err(DataFusionError::NotImplemented(format!(
- "Unsupported Interval Expression with leading_precision {leading_precision:?}"
- )));
- }
-
- if last_field.is_some() {
- return Err(DataFusionError::NotImplemented(format!(
- "Unsupported Interval Expression with last_field {last_field:?}"
- )));
- }
-
- if fractional_seconds_precision.is_some() {
- return Err(DataFusionError::NotImplemented(format!(
- "Unsupported Interval Expression with fractional_seconds_precision {fractional_seconds_precision:?}"
- )));
- }
-
- // Only handle string exprs for now
- let value = match value {
- SQLExpr::Value(
- Value::SingleQuotedString(s) | Value::DoubleQuotedString(s),
- ) => s,
- _ => {
- return Err(DataFusionError::NotImplemented(format!(
- "Unsupported interval argument. Expected string literal, got: {value:?}"
- )));
- }
- };
-
- let leading_field = leading_field
- .as_ref()
- .map(|dt| dt.to_string())
- .unwrap_or_else(|| "second".to_string());
-
- Ok(lit(parse_interval(&leading_field, &value)?))
- }
-
fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
let variable = ObjectName(variable.to_vec()).to_string();
@@ -2955,91 +1771,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.is_ok()
}
- fn sql_array_literal(
- &self,
- elements: Vec<SQLExpr>,
- schema: &DFSchema,
- ) -> Result<Expr> {
- let mut values = Vec::with_capacity(elements.len());
-
- for element in elements {
- let value = self.sql_expr_to_logical_expr(
- element,
- schema,
- &mut PlannerContext::new(),
- )?;
- match value {
- Expr::Literal(scalar) => {
- values.push(scalar);
- }
- _ => {
- return Err(DataFusionError::NotImplemented(format!(
- "Arrays with elements other than literal are not supported: {value}"
- )));
- }
- }
- }
-
- let data_types: HashSet<DataType> =
- values.iter().map(|e| e.get_datatype()).collect();
-
- if data_types.is_empty() {
- Ok(lit(ScalarValue::new_list(None, DataType::Utf8)))
- } else if data_types.len() > 1 {
- Err(DataFusionError::NotImplemented(format!(
- "Arrays with different types are not supported: {data_types:?}",
- )))
- } else {
- let data_type = values[0].get_datatype();
-
- Ok(lit(ScalarValue::new_list(Some(values), data_type)))
- }
- }
-
- /// Parse number in sql string, convert to Expr::Literal
- fn parse_sql_number(&self, n: &str) -> Result<Expr> {
- if n.find('E').is_some() {
- // not implemented yet
- // https://github.com/apache/arrow-datafusion/issues/3448
- Err(DataFusionError::NotImplemented(
- "sql numeric literals in scientific notation are not supported"
- .to_string(),
- ))
- } else if let Ok(n) = n.parse::<i64>() {
- Ok(lit(n))
- } else if self.options.parse_float_as_decimal {
- // remove leading zeroes
- let str = n.trim_start_matches('0');
- if let Some(i) = str.find('.') {
- let p = str.len() - 1;
- let s = str.len() - i - 1;
- let str = str.replace('.', "");
- let n = str.parse::<i128>().map_err(|_| {
- DataFusionError::from(ParserError(format!(
- "Cannot parse {str} as i128 when building decimal"
- )))
- })?;
- Ok(Expr::Literal(ScalarValue::Decimal128(
- Some(n),
- p as u8,
- s as i8,
- )))
- } else {
- let number = n.parse::<i128>().map_err(|_| {
- DataFusionError::from(ParserError(format!(
- "Cannot parse {n} as i128 when building decimal"
- )))
- })?;
- Ok(Expr::Literal(ScalarValue::Decimal128(Some(number), 38, 0)))
- }
- } else {
- n.parse::<f64>().map(lit).map_err(|_| {
- DataFusionError::from(ParserError(format!("Cannot parse {n} as f64")))
- })
- }
- }
-
- fn convert_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
+ pub(crate) fn convert_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
match sql_type {
SQLDataType::Array(Some(inner_sql_type)) => {
let data_type = self.convert_simple_data_type(inner_sql_type)?;
@@ -3181,7 +1913,9 @@ pub fn object_name_to_table_reference(
}
/// Create a [`OwnedTableReference`] after normalizing the specified identifier
-fn idents_to_table_reference(idents: Vec<Ident>) -> Result<OwnedTableReference> {
+pub(crate) fn idents_to_table_reference(
+ idents: Vec<Ident>,
+) -> Result<OwnedTableReference> {
struct IdentTaker(Vec<Ident>);
/// take the next identifier from the back of idents, panic'ing if
/// there are none left