You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mingmwang (via GitHub)" <gi...@apache.org> on 2023/03/14 11:28:39 UTC

[GitHub] [arrow-datafusion] mingmwang opened a new pull request, #5593: Add OuterReferenceColumn to Expr to represent correlated expression

mingmwang opened a new pull request, #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #5571.
   
   # Rationale for this change
   
   This is first part to support more general and complex subqueries and subquery expressions.
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#issuecomment-1467955986

   I will add more UTs to cover more complex cases:
   
   1) Subquery with `Limit`
   2) Subquery with `Order by`
   3) Subquery with `Window Functions`
   3) Subquery with `Union`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "jackwener (via GitHub)" <gi...@apache.org>.
jackwener commented on code in PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#discussion_r1140371742


##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -179,3 +179,117 @@ async fn in_subquery_with_same_table() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn support_agg_correlated_columns() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
+
+    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT sum(t1.t1_int + t2.t2_id) FROM t2 WHERE t1.t1_name = t2.t2_name)";
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan()?;
+
+    let expected = vec![
+        "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
+        "  Subquery: [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
+        "    Projection: SUM(outer_ref(t1.t1_int) + t2.t2_id) [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
+        "      Aggregate: groupBy=[[]], aggr=[[SUM(outer_ref(t1.t1_int) + t2.t2_id)]] [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
+        "        Filter: outer_ref(t1.t1_name) = t2.t2_name [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "          TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );

Review Comment:
   test related with data, I agree with @alamb , we better put it into sqllogicaltest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#issuecomment-1471570967

   @jackwener @ygf11 @alamb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#discussion_r1140362566


##########
datafusion/optimizer/src/utils.rs:
##########
@@ -291,46 +293,52 @@ pub fn find_join_exprs(
     let mut joins = vec![];
     let mut others = vec![];
     for filter in exprs.iter() {
-        let (left, op, right) = match filter {
-            Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
-                (*left.clone(), *op, *right.clone())
-            }
-            _ => {
-                others.push((*filter).clone());
-                continue;
-            }
-        };
-        let left = match left {
-            Expr::Column(c) => c,
-            _ => {
+        // If the expression contains correlated predicates, add it to join filters
+        if filter.contains_outer() {
+            joins.push(strip_outer_reference((*filter).clone()));
+        } else {
+            // TODO remove the logic

Review Comment:
   In the original decorrelation rules, there was complex logic to find out join columns, this is unnecessary after this PR.
   I will clean up the code in the next PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ygf11 commented on a diff in pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "ygf11 (via GitHub)" <gi...@apache.org>.
ygf11 commented on code in PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#discussion_r1139948465


##########
datafusion/sql/src/expr/subquery.rs:
##########
@@ -30,13 +30,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         input_schema: &DFSchema,
         planner_context: &mut PlannerContext,
     ) -> Result<Expr> {
+        let old_outer_query_schema = planner_context.outer_query_schema.clone();
+        planner_context.outer_query_schema = Some(input_schema.clone());
+        let sub_plan = self.query_to_plan(subquery, planner_context)?;
+        let outer_ref_columns = sub_plan.all_out_ref_exprs();
+        planner_context.outer_query_schema = old_outer_query_schema;

Review Comment:
   Can we support the following subquery in the future? 
   ```sql
   # three level
   # the innermost(`t3`) reference outermost(`t1`) column
   SELECT *
   FROM t1
   WHERE EXISTS (
   	SELECT *
   	FROM t2
   	WHERE t1_id = t2_id
   		AND EXISTS (
   			SELECT *
   			FROM t3
   			WHERE t3_id > t1_id
   		)
   );
   ```
   
   I just think the optimizer rule you plan to add(https://github.com/apache/arrow-datafusion/issues/5492) can also optimize it.



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1919,14 +1947,11 @@ impl Join {
 pub struct Subquery {
     /// The subquery
     pub subquery: Arc<LogicalPlan>,
+    /// The outer references used in the subquery
+    pub outer_ref_columns: Vec<Expr>,

Review Comment:
   👍 This field can help to check if it is a correlated subquery.



##########
datafusion/expr/src/expr_fn.rs:
##########
@@ -267,15 +267,21 @@ pub fn approx_percentile_cont_with_weight(
 /// Create an EXISTS subquery expression
 pub fn exists(subquery: Arc<LogicalPlan>) -> Expr {
     Expr::Exists {
-        subquery: Subquery { subquery },
+        subquery: Subquery {
+            subquery,
+            outer_ref_columns: vec![],
+        },

Review Comment:
   Maybe we can add `outer_ref_columns` as argument?
   I find some tests depend it, and we can cover more test(I can help add this).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "jackwener (via GitHub)" <gi...@apache.org>.
jackwener commented on code in PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#discussion_r1140365145


##########
datafusion/optimizer/src/decorrelate_where_in.rs:
##########
@@ -149,6 +152,7 @@ fn optimize_where_in(
     let projection = Projection::try_from_plan(&query_info.query.subquery)
         .map_err(|e| context!("a projection is required", e))?;
     let subquery_input = projection.input.clone();
+    // TODO add the validate logic to Analyzer

Review Comment:
   😍Look forward it!
   
   In the future, I also prepare to do some check job in the Analyzer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#issuecomment-1475871017

   @alamb @jackwener 
   Please help to take a look again. Conflicts are resolved now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#issuecomment-1474822546

   Looks like this PR has some conflicts to fix but then it will be good to go


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#discussion_r1140375405


##########
datafusion/sql/src/expr/subquery.rs:
##########
@@ -30,13 +30,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         input_schema: &DFSchema,
         planner_context: &mut PlannerContext,
     ) -> Result<Expr> {
+        let old_outer_query_schema = planner_context.outer_query_schema.clone();
+        planner_context.outer_query_schema = Some(input_schema.clone());
+        let sub_plan = self.query_to_plan(subquery, planner_context)?;
+        let outer_ref_columns = sub_plan.all_out_ref_exprs();
+        planner_context.outer_query_schema = old_outer_query_schema;

Review Comment:
   I'm afraid I will not support this in recent.   I think the method mentioned in the "Unnesting Arbitrary Queries" paper can cover this, but there is additional complexity need to deal with if there are some ambiguous to the out ref columns,  because different level of out plans' schema might have ambiguous columns.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#discussion_r1140363032


##########
datafusion/sql/src/expr/identifier.rs:
##########
@@ -102,11 +138,32 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                             ScalarValue::Utf8(Some(name)),
                         )))
                     } else {
-                        // table.column identifier
-                        Ok(Expr::Column(Column {
-                            relation: Some(relation),
-                            name,
-                        }))
+                        let outer_query_schema_opt =
+                            planner_context.outer_query_schema.clone();
+                        if let Some(outer) = outer_query_schema_opt.as_ref() {
+                            match outer.field_with_qualified_name(&relation, &name) {
+                                Ok(field) => {
+                                    // found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
+                                    Ok(Expr::OuterReferenceColumn(
+                                        field.data_type().clone(),
+                                        Column {
+                                            relation: Some(relation),
+                                            name,
+                                        },
+                                    ))
+                                }
+                                Err(_) => Ok(Expr::Column(Column {
+                                    relation: Some(relation),
+                                    name,
+                                })),
+                            }
+                        } else {
+                            // table.column identifier
+                            Ok(Expr::Column(Column {
+                                relation: Some(relation),
+                                name,
+                            }))
+                        }

Review Comment:
   Sure, will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#discussion_r1140080168


##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -81,6 +81,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
             }
             Expr::Literal(_)
             | Expr::Alias(_, _)
+            | Expr::OuterReferenceColumn(_, _)

Review Comment:
   This idea makes a lot of sense to me 👍 



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -179,3 +179,117 @@ async fn in_subquery_with_same_table() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn support_agg_correlated_columns() -> Result<()> {

Review Comment:
   if possible I would recommend adding these tests as `sqllogictest` -- https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/sqllogictests 
   
   They are much easier to change / maintain in my opinion



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -179,3 +179,117 @@ async fn in_subquery_with_same_table() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn support_agg_correlated_columns() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
+
+    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT sum(t1.t1_int + t2.t2_id) FROM t2 WHERE t1.t1_name = t2.t2_name)";
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan()?;
+
+    let expected = vec![
+        "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
+        "  Subquery: [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
+        "    Projection: SUM(outer_ref(t1.t1_int) + t2.t2_id) [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
+        "      Aggregate: groupBy=[[]], aggr=[[SUM(outer_ref(t1.t1_int) + t2.t2_id)]] [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
+        "        Filter: outer_ref(t1.t1_name) = t2.t2_name [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "          TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn support_agg_correlated_columns2() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
+
+    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT count(*) FROM t2 WHERE t1.t1_name = t2.t2_name having sum(t1_int + t2_id) >0)";
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan()?;
+
+    let expected = vec![
+        "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
+        "  Subquery: [COUNT(UInt8(1)):Int64;N]",
+        "    Projection: COUNT(UInt8(1)) [COUNT(UInt8(1)):Int64;N]",
+        "      Filter: CAST(SUM(outer_ref(t1.t1_int) + t2.t2_id) AS Int64) > Int64(0) [COUNT(UInt8(1)):Int64;N, SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
+        "        Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)), SUM(outer_ref(t1.t1_int) + t2.t2_id)]] [COUNT(UInt8(1)):Int64;N, SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
+        "          Filter: outer_ref(t1.t1_name) = t2.t2_name [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "            TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn support_join_correlated_columns() -> Result<()> {
+    let ctx = create_sub_query_join_context("t0_id", "t1_id", "t2_id", true)?;
+    let sql = "SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN t2 ON(t1.t1_id = t2.t2_id and t1.t1_name = t0.t0_name))";
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan()?;
+
+    let expected = vec![
+        "Filter: EXISTS (<subquery>) [t0_id:UInt32;N, t0_name:Utf8;N]",
+        "  Subquery: [Int64(1):Int64]",
+        "    Projection: Int64(1) [Int64(1):Int64]",
+        "      Inner Join:  Filter: t1.t1_id = t2.t2_id AND t1.t1_name = outer_ref(t0.t0_name) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "        TableScan: t1 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "        TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "  TableScan: t0 projection=[t0_id, t0_name] [t0_id:UInt32;N, t0_name:Utf8;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn support_join_correlated_columns2() -> Result<()> {
+    let ctx = create_sub_query_join_context("t0_id", "t1_id", "t2_id", true)?;
+    let sql = "SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN (select * from t2 where t2.t2_name = t0.t0_name) as t2 ON(t1.t1_id = t2.t2_id ))";

Review Comment:
   This is neat to see a correlation to the (second) layer of query



##########
datafusion/sql/src/expr/identifier.rs:
##########
@@ -44,18 +49,49 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             // interpret names with '.' as if they were
             // compound identifiers, but this is not a compound
             // identifier. (e.g. it is "foo.bar" not foo.bar)
-
-            Ok(Expr::Column(Column {
-                relation: None,
-                name: normalize_ident(id),
-            }))
+            let normalize_ident = normalize_ident(id);
+            match schema.field_with_unqualified_name(normalize_ident.as_str()) {
+                Ok(_) => {
+                    // found a match without a qualified name, this is a inner table column
+                    Ok(Expr::Column(Column {
+                        relation: None,
+                        name: normalize_ident,
+                    }))
+                }
+                Err(_) => {
+                    let outer_query_schema_opt =
+                        planner_context.outer_query_schema.clone();
+                    if let Some(outer) = outer_query_schema_opt.as_ref() {

Review Comment:
   Is there a reason for this clone (of the entire schema)? Could it be more like
   
   ```suggestion
                       let outer_query_schema_opt = planner_context.outer_query_schema.as_ref();
                       if let Some(outer) = outer_query_schema_opt {
   ```



##########
datafusion/optimizer/src/utils.rs:
##########
@@ -291,46 +293,52 @@ pub fn find_join_exprs(
     let mut joins = vec![];
     let mut others = vec![];
     for filter in exprs.iter() {
-        let (left, op, right) = match filter {
-            Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
-                (*left.clone(), *op, *right.clone())
-            }
-            _ => {
-                others.push((*filter).clone());
-                continue;
-            }
-        };
-        let left = match left {
-            Expr::Column(c) => c,
-            _ => {
+        // If the expression contains correlated predicates, add it to join filters
+        if filter.contains_outer() {
+            joins.push(strip_outer_reference((*filter).clone()));
+        } else {
+            // TODO remove the logic

Review Comment:
   I don't understand this TODO -- is it meant for this PR or for some other one



##########
datafusion/sql/src/select.rs:
##########
@@ -234,28 +228,26 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         &self,
         selection: Option<SQLExpr>,
         plan: LogicalPlan,
-        outer_query_schema: Option<&DFSchema>,
         planner_context: &mut PlannerContext,
     ) -> Result<LogicalPlan> {
         match selection {
             Some(predicate_expr) => {
-                let mut join_schema = (**plan.schema()).clone();
-
                 let fallback_schemas = plan.fallback_normalize_schemas();
-                let outer_query_schema = if let Some(outer) = outer_query_schema {
-                    join_schema.merge(outer);
-                    vec![outer]
-                } else {
-                    vec![]
-                };
+                let outer_query_schema = planner_context.outer_query_schema.clone();

Review Comment:
   again here I think we can avoid a clone



##########
datafusion/sql/src/expr/identifier.rs:
##########
@@ -102,11 +138,32 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                             ScalarValue::Utf8(Some(name)),
                         )))
                     } else {
-                        // table.column identifier
-                        Ok(Expr::Column(Column {
-                            relation: Some(relation),
-                            name,
-                        }))
+                        let outer_query_schema_opt =
+                            planner_context.outer_query_schema.clone();
+                        if let Some(outer) = outer_query_schema_opt.as_ref() {
+                            match outer.field_with_qualified_name(&relation, &name) {
+                                Ok(field) => {
+                                    // found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
+                                    Ok(Expr::OuterReferenceColumn(
+                                        field.data_type().clone(),
+                                        Column {
+                                            relation: Some(relation),
+                                            name,
+                                        },
+                                    ))
+                                }
+                                Err(_) => Ok(Expr::Column(Column {
+                                    relation: Some(relation),
+                                    name,
+                                })),
+                            }
+                        } else {
+                            // table.column identifier
+                            Ok(Expr::Column(Column {
+                                relation: Some(relation),
+                                name,
+                            }))
+                        }

Review Comment:
   This pattern appears to be repeated several times -- maybe it could be refactored into a reusable function rather than being inlined several places?



##########
datafusion/sql/tests/integration_test.rs:
##########
@@ -2635,7 +2635,7 @@ fn exists_subquery() {
         \n  Filter: EXISTS (<subquery>)\
         \n    Subquery:\
         \n      Projection: person.first_name\
-        \n        Filter: person.last_name = p.last_name AND person.state = p.state\
+        \n        Filter: person.last_name = outer_ref(p.last_name) AND person.state = outer_ref(p.state)\

Review Comment:
   I find this actually much easier to read 👍 



##########
datafusion/sql/src/expr/identifier.rs:
##########
@@ -44,18 +49,49 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             // interpret names with '.' as if they were
             // compound identifiers, but this is not a compound
             // identifier. (e.g. it is "foo.bar" not foo.bar)
-
-            Ok(Expr::Column(Column {
-                relation: None,
-                name: normalize_ident(id),
-            }))
+            let normalize_ident = normalize_ident(id);
+            match schema.field_with_unqualified_name(normalize_ident.as_str()) {
+                Ok(_) => {
+                    // found a match without a qualified name, this is a inner table column
+                    Ok(Expr::Column(Column {
+                        relation: None,
+                        name: normalize_ident,
+                    }))
+                }
+                Err(_) => {
+                    let outer_query_schema_opt =
+                        planner_context.outer_query_schema.clone();
+                    if let Some(outer) = outer_query_schema_opt.as_ref() {

Review Comment:
   The same thing applies to the other places in this file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "jackwener (via GitHub)" <gi...@apache.org>.
jackwener commented on code in PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#discussion_r1140365145


##########
datafusion/optimizer/src/decorrelate_where_in.rs:
##########
@@ -149,6 +152,7 @@ fn optimize_where_in(
     let projection = Projection::try_from_plan(&query_info.query.subquery)
         .map_err(|e| context!("a projection is required", e))?;
     let subquery_input = projection.input.clone();
+    // TODO add the validate logic to Analyzer

Review Comment:
   😍Look forward it!



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -179,3 +179,117 @@ async fn in_subquery_with_same_table() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn support_agg_correlated_columns() -> Result<()> {

Review Comment:
   If test just relate with `plan tree`, I think it's also ok to keep it here.
   Because we can debug it easily, and see plan directly in the code.



##########
datafusion/sql/src/select.rs:
##########
@@ -234,28 +228,26 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         &self,
         selection: Option<SQLExpr>,
         plan: LogicalPlan,
-        outer_query_schema: Option<&DFSchema>,
         planner_context: &mut PlannerContext,
     ) -> Result<LogicalPlan> {
         match selection {
             Some(predicate_expr) => {
-                let mut join_schema = (**plan.schema()).clone();
-
                 let fallback_schemas = plan.fallback_normalize_schemas();
-                let outer_query_schema = if let Some(outer) = outer_query_schema {
-                    join_schema.merge(outer);
-                    vec![outer]
-                } else {
-                    vec![]
-                };
+                let outer_query_schema = planner_context.outer_query_schema.clone();

Review Comment:
   Agree with it.
   We can avoid a clone



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#discussion_r1140385436


##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1919,14 +1947,11 @@ impl Join {
 pub struct Subquery {
     /// The subquery
     pub subquery: Arc<LogicalPlan>,
+    /// The outer references used in the subquery
+    pub outer_ref_columns: Vec<Expr>,

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5593: Add OuterReferenceColumn to Expr to represent correlated expression

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5593:
URL: https://github.com/apache/arrow-datafusion/pull/5593#issuecomment-1476231350

   > Please help to take a look again. Conflicts are resolved now.
   
   LGTM but sadly there appear to be some more conflicts now: 
   
   <img width="969" alt="Screenshot 2023-03-20 at 9 25 26 AM" src="https://user-images.githubusercontent.com/490673/226353506-4601876f-3994-4b0e-a861-2da26cdfba25.png">
   
   I took the liberty to merge from main and resolve the new conflicts -- I plan to merge this PR when CI passes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org