You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/27 10:51:45 UTC
[arrow-datafusion] branch master updated: Add SQL planner support for EXISTS subqueries (#2344)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 5a1ee4e62 Add SQL planner support for EXISTS subqueries (#2344)
5a1ee4e62 is described below
commit 5a1ee4e6207658d995f37733cadeddaa276c6189
Author: Andy Grove <ag...@apache.org>
AuthorDate: Wed Apr 27 04:51:39 2022 -0600
Add SQL planner support for EXISTS subqueries (#2344)
* Add SQL planner support for EXISTS subqueries
* update comments
* improve formatting in test and rename outer_schema to outer_query_schema
* improve formatting in test
---
datafusion/core/src/sql/planner.rs | 191 +++++++++++++++++++++++++++++++------
1 file changed, 160 insertions(+), 31 deletions(-)
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index a60f8c2f8..e693d7b20 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -49,6 +49,7 @@ use arrow::datatypes::*;
use datafusion_expr::{window_function::WindowFunction, BuiltinScalarFunction};
use hashbrown::HashMap;
+use datafusion_expr::logical_plan::Subquery;
use sqlparser::ast::{
BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg,
FunctionArgExpr, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query,
@@ -221,9 +222,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}
- /// Generate a logic plan from an SQL query
+ /// Generate a logical plan from an SQL query
pub fn query_to_plan(&self, query: Query) -> Result<LogicalPlan> {
- self.query_to_plan_with_alias(query, None, &mut HashMap::new())
+ self.query_to_plan_with_alias(query, None, &mut HashMap::new(), None)
+ }
+
+ /// Generate a logical plan from a SQL subquery
+ pub fn subquery_to_plan(
+ &self,
+ query: Query,
+ outer_query_schema: &DFSchema,
+ ) -> Result<LogicalPlan> {
+ self.query_to_plan_with_alias(
+ query,
+ None,
+ &mut HashMap::new(),
+ Some(outer_query_schema),
+ )
}
/// Generate a logic plan from an SQL query with optional alias
@@ -232,6 +247,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
query: Query,
alias: Option<String>,
ctes: &mut HashMap<String, LogicalPlan>,
+ outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let set_expr = query.body;
if let Some(with) = query.with {
@@ -251,11 +267,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
cte.query,
Some(cte.alias.name.value.clone()),
&mut ctes.clone(),
+ outer_query_schema,
)?;
ctes.insert(cte.alias.name.value, logical_plan);
}
}
- let plan = self.set_expr_to_plan(set_expr, alias, ctes)?;
+ let plan = self.set_expr_to_plan(set_expr, alias, ctes, outer_query_schema)?;
let plan = self.order_by(plan, query.order_by)?;
@@ -267,9 +284,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
set_expr: SetExpr,
alias: Option<String>,
ctes: &mut HashMap<String, LogicalPlan>,
+ outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
match set_expr {
- SetExpr::Select(s) => self.select_to_plan(*s, ctes, alias),
+ SetExpr::Select(s) => {
+ self.select_to_plan(*s, ctes, alias, outer_query_schema)
+ }
SetExpr::Values(v) => self.sql_values_to_plan(v),
SetExpr::SetOperation {
op,
@@ -277,8 +297,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
right,
all,
} => {
- let left_plan = self.set_expr_to_plan(*left, None, ctes)?;
- let right_plan = self.set_expr_to_plan(*right, None, ctes)?;
+ let left_plan =
+ self.set_expr_to_plan(*left, None, ctes, outer_query_schema)?;
+ let right_plan =
+ self.set_expr_to_plan(*right, None, ctes, outer_query_schema)?;
match (op, all) {
(SetOperator::Union, true) => {
union_with_alias(left_plan, right_plan, alias)
@@ -429,12 +451,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
from: Vec<TableWithJoins>,
ctes: &mut HashMap<String, LogicalPlan>,
+ outer_query_schema: Option<&DFSchema>,
) -> Result<Vec<LogicalPlan>> {
match from.len() {
0 => Ok(vec![LogicalPlanBuilder::empty(true).build()?]),
_ => from
.into_iter()
- .map(|t| self.plan_table_with_joins(t, ctes))
+ .map(|t| self.plan_table_with_joins(t, ctes, outer_query_schema))
.collect::<Result<Vec<_>>>(),
}
}
@@ -443,16 +466,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
t: TableWithJoins,
ctes: &mut HashMap<String, LogicalPlan>,
+ outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
- let left = self.create_relation(t.relation, ctes)?;
+ let left = self.create_relation(t.relation, ctes, outer_query_schema)?;
match t.joins.len() {
0 => Ok(left),
_ => {
let mut joins = t.joins.into_iter();
- let mut left =
- self.parse_relation_join(left, joins.next().unwrap(), ctes)?;
+ let mut left = self.parse_relation_join(
+ left,
+ joins.next().unwrap(),
+ ctes,
+ outer_query_schema,
+ )?;
for join in joins {
- left = self.parse_relation_join(left, join, ctes)?;
+ left =
+ self.parse_relation_join(left, join, ctes, outer_query_schema)?;
}
Ok(left)
}
@@ -464,8 +493,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
left: LogicalPlan,
join: Join,
ctes: &mut HashMap<String, LogicalPlan>,
+ outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
- let right = self.create_relation(join.relation, ctes)?;
+ let right = self.create_relation(join.relation, ctes, outer_query_schema)?;
match join.join_operator {
JoinOperator::LeftOuter(constraint) => {
self.parse_join(left, right, constraint, JoinType::Left)
@@ -632,6 +662,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
relation: TableFactor,
ctes: &mut HashMap<String, LogicalPlan>,
+ outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let (plan, alias) = match relation {
TableFactor::Table {
@@ -675,6 +706,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
*subquery,
alias.as_ref().map(|a| a.name.value.to_string()),
ctes,
+ outer_query_schema,
)?;
(
project_with_alias(
@@ -689,9 +721,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
alias,
)
}
- TableFactor::NestedJoin(table_with_joins) => {
- (self.plan_table_with_joins(*table_with_joins, ctes)?, None)
- }
+ TableFactor::NestedJoin(table_with_joins) => (
+ self.plan_table_with_joins(*table_with_joins, ctes, outer_query_schema)?,
+ None,
+ ),
// @todo Support TableFactory::TableFunction?
_ => {
return Err(DataFusionError::NotImplemented(format!(
@@ -734,8 +767,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
selection: Option<SQLExpr>,
plans: Vec<LogicalPlan>,
+ outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
- let plan = match selection {
+ match selection {
Some(predicate_expr) => {
// build join schema
let mut fields = vec![];
@@ -744,6 +778,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
fields.extend_from_slice(plan.schema().fields());
metadata.extend(plan.schema().metadata().clone());
}
+ if let Some(outer) = outer_query_schema {
+ fields.extend_from_slice(outer.fields());
+ metadata.extend(outer.metadata().clone());
+ }
let join_schema = DFSchema::new_with_metadata(fields, metadata)?;
let filter_expr = self.sql_to_rex(predicate_expr, &join_schema)?;
@@ -853,8 +891,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Ok(left)
}
}
- };
- plan
+ }
}
/// Generate a logic plan from an SQL select
@@ -863,17 +900,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
select: Select,
ctes: &mut HashMap<String, LogicalPlan>,
alias: Option<String>,
+ outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
// process `from` clause
- let plans = self.plan_from_tables(select.from, ctes)?;
+ let plans = self.plan_from_tables(select.from, ctes, outer_query_schema)?;
let empty_from = matches!(plans.first(), Some(LogicalPlan::EmptyRelation(_)));
// process `where` clause
- let plan = self.plan_selection(select.selection, plans)?;
+ let plan = self.plan_selection(select.selection, plans, outer_query_schema)?;
// process the SELECT expressions, with wildcards expanded.
- let select_exprs =
- self.prepare_select_exprs(&plan, select.projection, empty_from)?;
+ let select_exprs = self.prepare_select_exprs(
+ &plan,
+ select.projection,
+ empty_from,
+ outer_query_schema,
+ )?;
// having and group by clause may reference aliases defined in select projection
let projected_plan = self.project(plan.clone(), select_exprs.clone())?;
@@ -1010,10 +1052,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan: &LogicalPlan,
projection: Vec<SelectItem>,
empty_from: bool,
+ outer_query_schema: Option<&DFSchema>,
) -> Result<Vec<Expr>> {
projection
.into_iter()
- .map(|expr| self.sql_select_to_rex(expr, plan, empty_from))
+ .map(|expr| {
+ self.sql_select_to_rex(expr, plan, empty_from, outer_query_schema)
+ })
.flat_map(|result| match result {
Ok(vec) => vec.into_iter().map(Ok).collect(),
Err(err) => vec![Err(err)],
@@ -1208,16 +1253,25 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
sql: SelectItem,
plan: &LogicalPlan,
empty_from: bool,
+ outer_query_schema: Option<&DFSchema>,
) -> Result<Vec<Expr>> {
- let input_schema = plan.schema();
+ let input_schema = match outer_query_schema {
+ Some(x) => {
+ let mut input_schema = plan.schema().as_ref().clone();
+ input_schema.merge(x);
+ input_schema
+ }
+ _ => plan.schema().as_ref().clone(),
+ };
+
match sql {
SelectItem::UnnamedExpr(expr) => {
- let expr = self.sql_to_rex(expr, input_schema)?;
+ let expr = self.sql_to_rex(expr, &input_schema)?;
Ok(vec![normalize_col(expr, plan)?])
}
SelectItem::ExprWithAlias { expr, alias } => {
let expr = Alias(
- Box::new(self.sql_to_rex(expr, input_schema)?),
+ Box::new(self.sql_to_rex(expr, &input_schema)?),
normalize_ident(alias),
);
Ok(vec![normalize_col(expr, plan)?])
@@ -1228,12 +1282,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
"SELECT * with no tables specified is not valid".to_string(),
));
}
- expand_wildcard(input_schema, plan)
+ // do not expand from outer schema
+ expand_wildcard(plan.schema().as_ref(), plan)
}
-
SelectItem::QualifiedWildcard(ref object_name) => {
let qualifier = format!("{}", object_name);
- expand_qualified_wildcard(&qualifier, input_schema, plan)
+ // do not expand from outer schema
+ expand_qualified_wildcard(&qualifier, plan.schema().as_ref(), plan)
}
}
}
@@ -1588,8 +1643,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
right: Box::new(self.sql_expr_to_logical_expr(*right, schema)?),
}),
- SQLExpr::UnaryOp { op, expr } => {
- self.parse_sql_unary_op(op, *expr, schema)
+
+ SQLExpr::UnaryOp { op, expr } => match (&op, expr.as_ref()) {
+ // The AST for Exists does not support the NOT EXISTS case so it gets
+ // wrapped in a unary NOT
+ // https://github.com/sqlparser-rs/sqlparser-rs/issues/472
+ (&UnaryOperator::Not, &SQLExpr::Exists(ref subquery)) => self.parse_exists_subquery(subquery, true, schema),
+ _ => self.parse_sql_unary_op(op, *expr, schema)
}
SQLExpr::Between {
@@ -1820,6 +1880,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(*e, schema),
+ SQLExpr::Exists(subquery) => self.parse_exists_subquery(&subquery, false, schema),
+
+ SQLExpr::InSubquery { .. } => Err(DataFusionError::NotImplemented(
+ "IN subqueries are not supported yet".to_owned(),
+ )),
+
+ SQLExpr::Subquery(_) => Err(DataFusionError::NotImplemented(
+ "Scalar subqueries are not supported yet".to_owned(),
+ )),
+
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported ast node {:?} in sqltorel",
sql
@@ -1827,6 +1897,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}
+ fn parse_exists_subquery(
+ &self,
+ subquery: &Query,
+ negated: bool,
+ input_schema: &DFSchema,
+ ) -> Result<Expr> {
+ Ok(Expr::Exists {
+ subquery: Subquery {
+ subquery: Arc::new(
+ self.subquery_to_plan(subquery.clone(), input_schema)?,
+ ),
+ },
+ negated,
+ })
+ }
+
fn function_args_to_expr(
&self,
args: Vec<FunctionArg>,
@@ -4182,4 +4268,47 @@ mod tests {
\n TableScan: test projection=None";
quick_test(sql, expected);
}
+
+ #[test]
+ fn exists_subquery() {
+ let sql = "SELECT id FROM person p WHERE EXISTS \
+ (SELECT first_name FROM person \
+ WHERE last_name = p.last_name \
+ AND state = p.state)";
+
+ let subquery_expected = "Subquery: Projection: #person.first_name\
+ \n Filter: #person.last_name = #p.last_name AND #person.state = #p.state\
+ \n TableScan: person projection=None";
+
+ let expected = format!(
+ "Projection: #p.id\
+ \n Filter: EXISTS ({})\
+ \n SubqueryAlias: p\
+ \n TableScan: person projection=None",
+ subquery_expected
+ );
+ quick_test(sql, &expected);
+ }
+
+ #[test]
+ fn exists_subquery_wildcard() {
+ let sql = "SELECT id FROM person p WHERE EXISTS \
+ (SELECT * FROM person \
+ WHERE last_name = p.last_name \
+ AND state = p.state)";
+
+ let subquery_expected = "Subquery: Projection: #person.id, #person.first_name, \
+ #person.last_name, #person.age, #person.state, #person.salary, #person.birth_date, #person.😀\
+ \n Filter: #person.last_name = #p.last_name AND #person.state = #p.state\
+ \n TableScan: person projection=None";
+
+ let expected = format!(
+ "Projection: #p.id\
+ \n Filter: EXISTS ({})\
+ \n SubqueryAlias: p\
+ \n TableScan: person projection=None",
+ subquery_expected
+ );
+ quick_test(sql, &expected);
+ }
}