You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/03/17 11:04:16 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

alamb commented on code in PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#discussion_r1140080168


##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -81,6 +81,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
             }
             Expr::Literal(_)
             | Expr::Alias(_, _)
+            | Expr::OuterReferenceColumn(_, _)

Review Comment:
   This idea makes a lot of sense to me 👍 



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -179,3 +179,117 @@ async fn in_subquery_with_same_table() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn support_agg_correlated_columns() -> Result<()> {

Review Comment:
   if possible I would recommend adding these tests as `sqllogictest` -- https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/sqllogictests 
   
   They are much easier to change / maintain in my opinion



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -179,3 +179,117 @@ async fn in_subquery_with_same_table() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn support_agg_correlated_columns() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
+
+    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT sum(t1.t1_int + t2.t2_id) FROM t2 WHERE t1.t1_name = t2.t2_name)";
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan()?;
+
+    let expected = vec![
+        "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
+        "  Subquery: [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
+        "    Projection: SUM(outer_ref(t1.t1_int) + t2.t2_id) [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
+        "      Aggregate: groupBy=[[]], aggr=[[SUM(outer_ref(t1.t1_int) + t2.t2_id)]] [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
+        "        Filter: outer_ref(t1.t1_name) = t2.t2_name [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "          TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn support_agg_correlated_columns2() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
+
+    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT count(*) FROM t2 WHERE t1.t1_name = t2.t2_name having sum(t1_int + t2_id) >0)";
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan()?;
+
+    let expected = vec![
+        "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
+        "  Subquery: [COUNT(UInt8(1)):Int64;N]",
+        "    Projection: COUNT(UInt8(1)) [COUNT(UInt8(1)):Int64;N]",
+        "      Filter: CAST(SUM(outer_ref(t1.t1_int) + t2.t2_id) AS Int64) > Int64(0) [COUNT(UInt8(1)):Int64;N, SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
+        "        Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)), SUM(outer_ref(t1.t1_int) + t2.t2_id)]] [COUNT(UInt8(1)):Int64;N, SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
+        "          Filter: outer_ref(t1.t1_name) = t2.t2_name [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "            TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn support_join_correlated_columns() -> Result<()> {
+    let ctx = create_sub_query_join_context("t0_id", "t1_id", "t2_id", true)?;
+    let sql = "SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN t2 ON(t1.t1_id = t2.t2_id and t1.t1_name = t0.t0_name))";
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan()?;
+
+    let expected = vec![
+        "Filter: EXISTS (<subquery>) [t0_id:UInt32;N, t0_name:Utf8;N]",
+        "  Subquery: [Int64(1):Int64]",
+        "    Projection: Int64(1) [Int64(1):Int64]",
+        "      Inner Join:  Filter: t1.t1_id = t2.t2_id AND t1.t1_name = outer_ref(t0.t0_name) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "        TableScan: t1 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "        TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "  TableScan: t0 projection=[t0_id, t0_name] [t0_id:UInt32;N, t0_name:Utf8;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn support_join_correlated_columns2() -> Result<()> {
+    let ctx = create_sub_query_join_context("t0_id", "t1_id", "t2_id", true)?;
+    let sql = "SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN (select * from t2 where t2.t2_name = t0.t0_name) as t2 ON(t1.t1_id = t2.t2_id ))";

Review Comment:
   This is neat to see a correlation to the (second) layer of query



##########
datafusion/sql/src/expr/identifier.rs:
##########
@@ -44,18 +49,49 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             // interpret names with '.' as if they were
             // compound identifiers, but this is not a compound
             // identifier. (e.g. it is "foo.bar" not foo.bar)
-
-            Ok(Expr::Column(Column {
-                relation: None,
-                name: normalize_ident(id),
-            }))
+            let normalize_ident = normalize_ident(id);
+            match schema.field_with_unqualified_name(normalize_ident.as_str()) {
+                Ok(_) => {
+                    // found a match without a qualified name, this is a inner table column
+                    Ok(Expr::Column(Column {
+                        relation: None,
+                        name: normalize_ident,
+                    }))
+                }
+                Err(_) => {
+                    let outer_query_schema_opt =
+                        planner_context.outer_query_schema.clone();
+                    if let Some(outer) = outer_query_schema_opt.as_ref() {

Review Comment:
   Is there a reason for this clone (of the entire schema)? Could it be more like
   
   ```suggestion
                       let outer_query_schema_opt = planner_context.outer_query_schema.as_ref();
                       if let Some(outer) = outer_query_schema_opt {
   ```



##########
datafusion/optimizer/src/utils.rs:
##########
@@ -291,46 +293,52 @@ pub fn find_join_exprs(
     let mut joins = vec![];
     let mut others = vec![];
     for filter in exprs.iter() {
-        let (left, op, right) = match filter {
-            Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
-                (*left.clone(), *op, *right.clone())
-            }
-            _ => {
-                others.push((*filter).clone());
-                continue;
-            }
-        };
-        let left = match left {
-            Expr::Column(c) => c,
-            _ => {
+        // If the expression contains correlated predicates, add it to join filters
+        if filter.contains_outer() {
+            joins.push(strip_outer_reference((*filter).clone()));
+        } else {
+            // TODO remove the logic

Review Comment:
   I don't understand this TODO -- is it meant for this PR or for some other one



##########
datafusion/sql/src/select.rs:
##########
@@ -234,28 +228,26 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         &self,
         selection: Option<SQLExpr>,
         plan: LogicalPlan,
-        outer_query_schema: Option<&DFSchema>,
         planner_context: &mut PlannerContext,
     ) -> Result<LogicalPlan> {
         match selection {
             Some(predicate_expr) => {
-                let mut join_schema = (**plan.schema()).clone();
-
                 let fallback_schemas = plan.fallback_normalize_schemas();
-                let outer_query_schema = if let Some(outer) = outer_query_schema {
-                    join_schema.merge(outer);
-                    vec![outer]
-                } else {
-                    vec![]
-                };
+                let outer_query_schema = planner_context.outer_query_schema.clone();

Review Comment:
   again here I think we can avoid a clone



##########
datafusion/sql/src/expr/identifier.rs:
##########
@@ -102,11 +138,32 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                             ScalarValue::Utf8(Some(name)),
                         )))
                     } else {
-                        // table.column identifier
-                        Ok(Expr::Column(Column {
-                            relation: Some(relation),
-                            name,
-                        }))
+                        let outer_query_schema_opt =
+                            planner_context.outer_query_schema.clone();
+                        if let Some(outer) = outer_query_schema_opt.as_ref() {
+                            match outer.field_with_qualified_name(&relation, &name) {
+                                Ok(field) => {
+                                    // found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
+                                    Ok(Expr::OuterReferenceColumn(
+                                        field.data_type().clone(),
+                                        Column {
+                                            relation: Some(relation),
+                                            name,
+                                        },
+                                    ))
+                                }
+                                Err(_) => Ok(Expr::Column(Column {
+                                    relation: Some(relation),
+                                    name,
+                                })),
+                            }
+                        } else {
+                            // table.column identifier
+                            Ok(Expr::Column(Column {
+                                relation: Some(relation),
+                                name,
+                            }))
+                        }

Review Comment:
   This pattern appears to be repeated several times -- maybe it could be refactored into a reusable function rather than being inlined several places?



##########
datafusion/sql/tests/integration_test.rs:
##########
@@ -2635,7 +2635,7 @@ fn exists_subquery() {
         \n  Filter: EXISTS (<subquery>)\
         \n    Subquery:\
         \n      Projection: person.first_name\
-        \n        Filter: person.last_name = p.last_name AND person.state = p.state\
+        \n        Filter: person.last_name = outer_ref(p.last_name) AND person.state = outer_ref(p.state)\

Review Comment:
   I find this actually much easier to read 👍 



##########
datafusion/sql/src/expr/identifier.rs:
##########
@@ -44,18 +49,49 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             // interpret names with '.' as if they were
             // compound identifiers, but this is not a compound
             // identifier. (e.g. it is "foo.bar" not foo.bar)
-
-            Ok(Expr::Column(Column {
-                relation: None,
-                name: normalize_ident(id),
-            }))
+            let normalize_ident = normalize_ident(id);
+            match schema.field_with_unqualified_name(normalize_ident.as_str()) {
+                Ok(_) => {
+                    // found a match without a qualified name, this is a inner table column
+                    Ok(Expr::Column(Column {
+                        relation: None,
+                        name: normalize_ident,
+                    }))
+                }
+                Err(_) => {
+                    let outer_query_schema_opt =
+                        planner_context.outer_query_schema.clone();
+                    if let Some(outer) = outer_query_schema_opt.as_ref() {

Review Comment:
   The same thing applies to the other places in this file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org