You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/27 10:51:45 UTC

[arrow-datafusion] branch master updated: Add SQL planner support for EXISTS subqueries (#2344)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a1ee4e62 Add SQL planner support for EXISTS subqueries (#2344)
5a1ee4e62 is described below

commit 5a1ee4e6207658d995f37733cadeddaa276c6189
Author: Andy Grove <ag...@apache.org>
AuthorDate: Wed Apr 27 04:51:39 2022 -0600

    Add SQL planner support for EXISTS subqueries (#2344)
    
    * Add SQL planner support for EXISTS subqueries
    
    * update comments
    
    * improve formatting in test and rename outer_schema to outer_query_schema
    
    * improve formatting in test
---
 datafusion/core/src/sql/planner.rs | 191 +++++++++++++++++++++++++++++++------
 1 file changed, 160 insertions(+), 31 deletions(-)

diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index a60f8c2f8..e693d7b20 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -49,6 +49,7 @@ use arrow::datatypes::*;
 use datafusion_expr::{window_function::WindowFunction, BuiltinScalarFunction};
 use hashbrown::HashMap;
 
+use datafusion_expr::logical_plan::Subquery;
 use sqlparser::ast::{
     BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg,
     FunctionArgExpr, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query,
@@ -221,9 +222,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         }
     }
 
-    /// Generate a logic plan from an SQL query
+    /// Generate a logical plan from an SQL query
     pub fn query_to_plan(&self, query: Query) -> Result<LogicalPlan> {
-        self.query_to_plan_with_alias(query, None, &mut HashMap::new())
+        self.query_to_plan_with_alias(query, None, &mut HashMap::new(), None)
+    }
+
+    /// Generate a logical plan from a SQL subquery
+    pub fn subquery_to_plan(
+        &self,
+        query: Query,
+        outer_query_schema: &DFSchema,
+    ) -> Result<LogicalPlan> {
+        self.query_to_plan_with_alias(
+            query,
+            None,
+            &mut HashMap::new(),
+            Some(outer_query_schema),
+        )
     }
 
     /// Generate a logic plan from an SQL query with optional alias
@@ -232,6 +247,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         query: Query,
         alias: Option<String>,
         ctes: &mut HashMap<String, LogicalPlan>,
+        outer_query_schema: Option<&DFSchema>,
     ) -> Result<LogicalPlan> {
         let set_expr = query.body;
         if let Some(with) = query.with {
@@ -251,11 +267,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     cte.query,
                     Some(cte.alias.name.value.clone()),
                     &mut ctes.clone(),
+                    outer_query_schema,
                 )?;
                 ctes.insert(cte.alias.name.value, logical_plan);
             }
         }
-        let plan = self.set_expr_to_plan(set_expr, alias, ctes)?;
+        let plan = self.set_expr_to_plan(set_expr, alias, ctes, outer_query_schema)?;
 
         let plan = self.order_by(plan, query.order_by)?;
 
@@ -267,9 +284,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         set_expr: SetExpr,
         alias: Option<String>,
         ctes: &mut HashMap<String, LogicalPlan>,
+        outer_query_schema: Option<&DFSchema>,
     ) -> Result<LogicalPlan> {
         match set_expr {
-            SetExpr::Select(s) => self.select_to_plan(*s, ctes, alias),
+            SetExpr::Select(s) => {
+                self.select_to_plan(*s, ctes, alias, outer_query_schema)
+            }
             SetExpr::Values(v) => self.sql_values_to_plan(v),
             SetExpr::SetOperation {
                 op,
@@ -277,8 +297,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 right,
                 all,
             } => {
-                let left_plan = self.set_expr_to_plan(*left, None, ctes)?;
-                let right_plan = self.set_expr_to_plan(*right, None, ctes)?;
+                let left_plan =
+                    self.set_expr_to_plan(*left, None, ctes, outer_query_schema)?;
+                let right_plan =
+                    self.set_expr_to_plan(*right, None, ctes, outer_query_schema)?;
                 match (op, all) {
                     (SetOperator::Union, true) => {
                         union_with_alias(left_plan, right_plan, alias)
@@ -429,12 +451,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         &self,
         from: Vec<TableWithJoins>,
         ctes: &mut HashMap<String, LogicalPlan>,
+        outer_query_schema: Option<&DFSchema>,
     ) -> Result<Vec<LogicalPlan>> {
         match from.len() {
             0 => Ok(vec![LogicalPlanBuilder::empty(true).build()?]),
             _ => from
                 .into_iter()
-                .map(|t| self.plan_table_with_joins(t, ctes))
+                .map(|t| self.plan_table_with_joins(t, ctes, outer_query_schema))
                 .collect::<Result<Vec<_>>>(),
         }
     }
@@ -443,16 +466,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         &self,
         t: TableWithJoins,
         ctes: &mut HashMap<String, LogicalPlan>,
+        outer_query_schema: Option<&DFSchema>,
     ) -> Result<LogicalPlan> {
-        let left = self.create_relation(t.relation, ctes)?;
+        let left = self.create_relation(t.relation, ctes, outer_query_schema)?;
         match t.joins.len() {
             0 => Ok(left),
             _ => {
                 let mut joins = t.joins.into_iter();
-                let mut left =
-                    self.parse_relation_join(left, joins.next().unwrap(), ctes)?;
+                let mut left = self.parse_relation_join(
+                    left,
+                    joins.next().unwrap(),
+                    ctes,
+                    outer_query_schema,
+                )?;
                 for join in joins {
-                    left = self.parse_relation_join(left, join, ctes)?;
+                    left =
+                        self.parse_relation_join(left, join, ctes, outer_query_schema)?;
                 }
                 Ok(left)
             }
@@ -464,8 +493,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         left: LogicalPlan,
         join: Join,
         ctes: &mut HashMap<String, LogicalPlan>,
+        outer_query_schema: Option<&DFSchema>,
     ) -> Result<LogicalPlan> {
-        let right = self.create_relation(join.relation, ctes)?;
+        let right = self.create_relation(join.relation, ctes, outer_query_schema)?;
         match join.join_operator {
             JoinOperator::LeftOuter(constraint) => {
                 self.parse_join(left, right, constraint, JoinType::Left)
@@ -632,6 +662,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         &self,
         relation: TableFactor,
         ctes: &mut HashMap<String, LogicalPlan>,
+        outer_query_schema: Option<&DFSchema>,
     ) -> Result<LogicalPlan> {
         let (plan, alias) = match relation {
             TableFactor::Table {
@@ -675,6 +706,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     *subquery,
                     alias.as_ref().map(|a| a.name.value.to_string()),
                     ctes,
+                    outer_query_schema,
                 )?;
                 (
                     project_with_alias(
@@ -689,9 +721,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     alias,
                 )
             }
-            TableFactor::NestedJoin(table_with_joins) => {
-                (self.plan_table_with_joins(*table_with_joins, ctes)?, None)
-            }
+            TableFactor::NestedJoin(table_with_joins) => (
+                self.plan_table_with_joins(*table_with_joins, ctes, outer_query_schema)?,
+                None,
+            ),
             // @todo Support TableFactory::TableFunction?
             _ => {
                 return Err(DataFusionError::NotImplemented(format!(
@@ -734,8 +767,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         &self,
         selection: Option<SQLExpr>,
         plans: Vec<LogicalPlan>,
+        outer_query_schema: Option<&DFSchema>,
     ) -> Result<LogicalPlan> {
-        let plan = match selection {
+        match selection {
             Some(predicate_expr) => {
                 // build join schema
                 let mut fields = vec![];
@@ -744,6 +778,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     fields.extend_from_slice(plan.schema().fields());
                     metadata.extend(plan.schema().metadata().clone());
                 }
+                if let Some(outer) = outer_query_schema {
+                    fields.extend_from_slice(outer.fields());
+                    metadata.extend(outer.metadata().clone());
+                }
                 let join_schema = DFSchema::new_with_metadata(fields, metadata)?;
 
                 let filter_expr = self.sql_to_rex(predicate_expr, &join_schema)?;
@@ -853,8 +891,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     Ok(left)
                 }
             }
-        };
-        plan
+        }
     }
 
     /// Generate a logic plan from an SQL select
@@ -863,17 +900,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         select: Select,
         ctes: &mut HashMap<String, LogicalPlan>,
         alias: Option<String>,
+        outer_query_schema: Option<&DFSchema>,
     ) -> Result<LogicalPlan> {
         // process `from` clause
-        let plans = self.plan_from_tables(select.from, ctes)?;
+        let plans = self.plan_from_tables(select.from, ctes, outer_query_schema)?;
         let empty_from = matches!(plans.first(), Some(LogicalPlan::EmptyRelation(_)));
 
         // process `where` clause
-        let plan = self.plan_selection(select.selection, plans)?;
+        let plan = self.plan_selection(select.selection, plans, outer_query_schema)?;
 
         // process the SELECT expressions, with wildcards expanded.
-        let select_exprs =
-            self.prepare_select_exprs(&plan, select.projection, empty_from)?;
+        let select_exprs = self.prepare_select_exprs(
+            &plan,
+            select.projection,
+            empty_from,
+            outer_query_schema,
+        )?;
 
         // having and group by clause may reference aliases defined in select projection
         let projected_plan = self.project(plan.clone(), select_exprs.clone())?;
@@ -1010,10 +1052,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         plan: &LogicalPlan,
         projection: Vec<SelectItem>,
         empty_from: bool,
+        outer_query_schema: Option<&DFSchema>,
     ) -> Result<Vec<Expr>> {
         projection
             .into_iter()
-            .map(|expr| self.sql_select_to_rex(expr, plan, empty_from))
+            .map(|expr| {
+                self.sql_select_to_rex(expr, plan, empty_from, outer_query_schema)
+            })
             .flat_map(|result| match result {
                 Ok(vec) => vec.into_iter().map(Ok).collect(),
                 Err(err) => vec![Err(err)],
@@ -1208,16 +1253,25 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         sql: SelectItem,
         plan: &LogicalPlan,
         empty_from: bool,
+        outer_query_schema: Option<&DFSchema>,
     ) -> Result<Vec<Expr>> {
-        let input_schema = plan.schema();
+        let input_schema = match outer_query_schema {
+            Some(x) => {
+                let mut input_schema = plan.schema().as_ref().clone();
+                input_schema.merge(x);
+                input_schema
+            }
+            _ => plan.schema().as_ref().clone(),
+        };
+
         match sql {
             SelectItem::UnnamedExpr(expr) => {
-                let expr = self.sql_to_rex(expr, input_schema)?;
+                let expr = self.sql_to_rex(expr, &input_schema)?;
                 Ok(vec![normalize_col(expr, plan)?])
             }
             SelectItem::ExprWithAlias { expr, alias } => {
                 let expr = Alias(
-                    Box::new(self.sql_to_rex(expr, input_schema)?),
+                    Box::new(self.sql_to_rex(expr, &input_schema)?),
                     normalize_ident(alias),
                 );
                 Ok(vec![normalize_col(expr, plan)?])
@@ -1228,12 +1282,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                         "SELECT * with no tables specified is not valid".to_string(),
                     ));
                 }
-                expand_wildcard(input_schema, plan)
+                // do not expand from outer schema
+                expand_wildcard(plan.schema().as_ref(), plan)
             }
-
             SelectItem::QualifiedWildcard(ref object_name) => {
                 let qualifier = format!("{}", object_name);
-                expand_qualified_wildcard(&qualifier, input_schema, plan)
+                // do not expand from outer schema
+                expand_qualified_wildcard(&qualifier, plan.schema().as_ref(), plan)
             }
         }
     }
@@ -1588,8 +1643,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 right: Box::new(self.sql_expr_to_logical_expr(*right, schema)?),
             }),
 
-            SQLExpr::UnaryOp { op, expr } => {
-                self.parse_sql_unary_op(op, *expr, schema)
+
+            SQLExpr::UnaryOp { op, expr } => match (&op, expr.as_ref()) {
+                // The AST for Exists does not support the NOT EXISTS case so it gets
+                // wrapped in a unary NOT
+                // https://github.com/sqlparser-rs/sqlparser-rs/issues/472
+                (&UnaryOperator::Not, &SQLExpr::Exists(ref subquery)) => self.parse_exists_subquery(subquery, true, schema),
+                _ => self.parse_sql_unary_op(op, *expr, schema)
             }
 
             SQLExpr::Between {
@@ -1820,6 +1880,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
             SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(*e, schema),
 
+            SQLExpr::Exists(subquery) => self.parse_exists_subquery(&subquery, false, schema),
+
+            SQLExpr::InSubquery { .. } => Err(DataFusionError::NotImplemented(
+                "IN subqueries are not supported yet".to_owned(),
+            )),
+
+            SQLExpr::Subquery(_) => Err(DataFusionError::NotImplemented(
+                "Scalar subqueries are not supported yet".to_owned(),
+            )),
+
             _ => Err(DataFusionError::NotImplemented(format!(
                 "Unsupported ast node {:?} in sqltorel",
                 sql
@@ -1827,6 +1897,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         }
     }
 
+    fn parse_exists_subquery(
+        &self,
+        subquery: &Query,
+        negated: bool,
+        input_schema: &DFSchema,
+    ) -> Result<Expr> {
+        Ok(Expr::Exists {
+            subquery: Subquery {
+                subquery: Arc::new(
+                    self.subquery_to_plan(subquery.clone(), input_schema)?,
+                ),
+            },
+            negated,
+        })
+    }
+
     fn function_args_to_expr(
         &self,
         args: Vec<FunctionArg>,
@@ -4182,4 +4268,47 @@ mod tests {
             \n    TableScan: test projection=None";
         quick_test(sql, expected);
     }
+
+    #[test]
+    fn exists_subquery() {
+        let sql = "SELECT id FROM person p WHERE EXISTS \
+            (SELECT first_name FROM person \
+            WHERE last_name = p.last_name \
+            AND state = p.state)";
+
+        let subquery_expected = "Subquery: Projection: #person.first_name\
+        \n  Filter: #person.last_name = #p.last_name AND #person.state = #p.state\
+        \n    TableScan: person projection=None";
+
+        let expected = format!(
+            "Projection: #p.id\
+        \n  Filter: EXISTS ({})\
+        \n    SubqueryAlias: p\
+        \n      TableScan: person projection=None",
+            subquery_expected
+        );
+        quick_test(sql, &expected);
+    }
+
+    #[test]
+    fn exists_subquery_wildcard() {
+        let sql = "SELECT id FROM person p WHERE EXISTS \
+            (SELECT * FROM person \
+            WHERE last_name = p.last_name \
+            AND state = p.state)";
+
+        let subquery_expected = "Subquery: Projection: #person.id, #person.first_name, \
+        #person.last_name, #person.age, #person.state, #person.salary, #person.birth_date, #person.😀\
+            \n  Filter: #person.last_name = #p.last_name AND #person.state = #p.state\
+            \n    TableScan: person projection=None";
+
+        let expected = format!(
+            "Projection: #p.id\
+            \n  Filter: EXISTS ({})\
+            \n    SubqueryAlias: p\
+            \n      TableScan: person projection=None",
+            subquery_expected
+        );
+        quick_test(sql, &expected);
+    }
 }