You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mingmwang (via GitHub)" <gi...@apache.org> on 2023/05/26 10:00:57 UTC

[GitHub] [arrow-datafusion] mingmwang opened a new pull request, #6457: init checkin

mingmwang opened a new pull request, #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #.
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1587172200

   > I will resolve the conflicts tomorrow.
   
   I believe https://github.com/apache/arrow-datafusion/pull/6642 may also conflict -- I will try and review that one today and get it merged too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#discussion_r1227884200


##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -606,25 +761,115 @@ async fn support_order_by_correlated_columns() -> Result<()> {
     Ok(())
 }
 
-// TODO: issue https://github.com/apache/arrow-datafusion/issues/6263
-#[ignore]
 #[tokio::test]
-async fn support_limit_subquery() -> Result<()> {
+async fn exists_subquery_with_select_null() -> Result<()> {
     let ctx = create_join_context("t1_id", "t2_id", true)?;
 
-    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 1)";
+    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT NULL)";
     let msg = format!("Creating logical plan for '{sql}'");
     let dataframe = ctx.sql(sql).await.expect(&msg);
     let plan = dataframe.into_optimized_plan()?;
 
     let expected = vec![
         "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  Subquery: [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "    Limit: skip=0, fetch=1 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      Projection: t2.t2_id, t2.t2_name, t2.t2_int [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "        Filter: t2.t2_id = outer_ref(t1.t1_id) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "          TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "  Subquery: [NULL:Null;N]",

Review Comment:
   Yes, this query can not run now. This is just to verify the logic plan generated is expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1572297060

   > Why we don't implement the unnesting arbitrary subquery paper? I think it's state of art.πŸ€”
   
   What this PR and the previous PRs I implemented/refactored still belong to the simple Unnesting method,  they covers the Predicate(In/Exists) Subquery and Scalar Subquery cases in which the correlated expressions can be pull up and correlation can be converted to out joins or semi/anti joins.
   
   For other more complex cases, they can be de-correlated using the methods mentioned in the unnesting arbitrary subquery paper. I will try to implement it later this year. Why not implement the unnesting arbitrary subquery paper directly is because this method might introduce additional joins compared to the simple unnesting method. The additional join comes from the inner table join with the distinct set(magic set).
   
   You can play with the `Hyper` web interface(Hyper implemented this unnesting arbitrary subquery paper) and check the plan.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6457: Support wider range of Subquery

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#discussion_r1210020368


##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -402,6 +403,78 @@ impl LogicalPlan {
         Ok(using_columns)
     }
 
+    pub fn head_output_expr(&self) -> Result<Option<Expr>> {

Review Comment:
   Sure, will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #6457: Support wider range of Subquery

Posted by "jackwener (via GitHub)" <gi...@apache.org>.
jackwener commented on code in PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#discussion_r1209685165


##########
datafusion/optimizer/src/utils.rs:
##########
@@ -409,6 +389,235 @@ pub fn log_plan(description: &str, plan: &LogicalPlan) {
     trace!("{description}::\n{}\n", plan.display_indent_schema());
 }
 
+/// This struct rewrite the sub query plan by pull up the correlated expressions(contains outer reference columns) from the inner subquery's [Filter].
+/// It adds the inner reference columns to the 'Projection' or 'Aggregate' of the subquery if they are missing, so that they can be evaluated by the parent operator as the join condition.
+pub struct PullUpCorrelatedExpr {

Review Comment:
   πŸš€Great job.



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -402,6 +403,78 @@ impl LogicalPlan {
         Ok(using_columns)
     }
 
+    pub fn head_output_expr(&self) -> Result<Option<Expr>> {

Review Comment:
   Add comment to explain what it does



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -402,6 +403,78 @@ impl LogicalPlan {
         Ok(using_columns)
     }
 
+    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
+        match self {
+            LogicalPlan::Projection(projection) => {
+                Ok(Some(projection.expr.as_slice()[0].clone()))
+            }
+            LogicalPlan::Aggregate(agg) => {
+                if agg.group_expr.is_empty() {
+                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
+                } else {
+                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
+                }
+            }
+            LogicalPlan::Filter(filter) => filter.input.head_output_expr(),
+            LogicalPlan::Distinct(distinct) => distinct.input.head_output_expr(),
+            LogicalPlan::Sort(sort) => sort.input.head_output_expr(),
+            LogicalPlan::Limit(limit) => limit.input.head_output_expr(),
+            LogicalPlan::Join(Join {
+                left,
+                right,
+                join_type,
+                ..
+            }) => match join_type {
+                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
+                    if left.schema().fields().is_empty() {
+                        right.head_output_expr()
+                    } else {
+                        left.head_output_expr()
+                    }
+                }
+                JoinType::LeftSemi | JoinType::LeftAnti => left.head_output_expr(),
+                JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(),
+            },
+            LogicalPlan::CrossJoin(cross) => {
+                if cross.left.schema().fields().is_empty() {
+                    cross.right.head_output_expr()
+                } else {
+                    cross.left.head_output_expr()
+                }
+            }
+            LogicalPlan::Repartition(repartition) => repartition.input.head_output_expr(),
+            LogicalPlan::Window(window) => window.input.head_output_expr(),

Review Comment:
   ```suggestion
   ```



##########
datafusion/core/tests/tpcds_planning.rs:
##########
@@ -557,7 +557,6 @@ async fn tpcds_physical_q5() -> Result<()> {
     create_physical_plan(5).await
 }
 
-#[ignore] // Physical plan does not support logical expression (<subquery>)

Review Comment:
   πŸŽ‰



##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2030,7 +2030,7 @@ async fn subquery_to_join_with_both_side_expr() -> Result<()> {
         "  LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
         "    TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
         "    SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
-        "      Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
+        "      Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS t2.t2_id + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",

Review Comment:
   I don't figure out why occur double alias



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -402,6 +403,78 @@ impl LogicalPlan {
         Ok(using_columns)
     }
 
+    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
+        match self {
+            LogicalPlan::Projection(projection) => {
+                Ok(Some(projection.expr.as_slice()[0].clone()))
+            }
+            LogicalPlan::Aggregate(agg) => {
+                if agg.group_expr.is_empty() {
+                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
+                } else {
+                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
+                }
+            }
+            LogicalPlan::Filter(filter) => filter.input.head_output_expr(),
+            LogicalPlan::Distinct(distinct) => distinct.input.head_output_expr(),
+            LogicalPlan::Sort(sort) => sort.input.head_output_expr(),
+            LogicalPlan::Limit(limit) => limit.input.head_output_expr(),

Review Comment:
   ```suggestion
               LogicalPlan::Filter(Filter { input, .. })
               | LogicalPlan::Distinct(Distinct { input, .. })
               | LogicalPlan::Sort(Sort { input, .. })
               | LogicalPlan::Limit(Limit { input, .. })
               | LogicalPlan::Repartition(Repartition { input, .. })
               | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] liurenjie1024 commented on pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "liurenjie1024 (via GitHub)" <gi...@apache.org>.
liurenjie1024 commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1572040740

   Why we don't implement  the unnesting arbitrary subquery paper? I think it's state of art.πŸ€”


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6457: Support wider range of Subquery

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1569357711

   @alamb @jackwener 
   I'm going to mark this PR as draft, I need more time to think and make the `count()` aggregation handling logic more generic. Although the current correlation expression pull up and subquery rewrite is more generic, but the `count` aggregation handling is ugly. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1572309442

   > @mingmwang doris don't support `project correlated subquery`
   
   @jackwener 
   Could you please to check this query in Apache Dorics ?
   `select t1.t1_int from t1 where (select count(*) from t2 where t1.t1_id = t2.t2_id) < t1.t1_int`
   
   ```
           "Projection: t1.t1_int [t1_int:UInt32;N]",
           "  Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.COUNT(UInt8(1)) END < CAST(t1.t1_int AS Int64) [t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
           "    Projection: t1.t1_int, __scalar_sq_1.COUNT(UInt8(1)), __scalar_sq_1.__always_true [t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
           "      Left Join: t1.t1_id = __scalar_sq_1.t2_id [t1_id:UInt32;N, t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, t2_id:UInt32;N, __always_true:Boolean;N]",
           "        TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
           "        SubqueryAlias: __scalar_sq_1 [COUNT(UInt8(1)):Int64;N, t2_id:UInt32;N, __always_true:Boolean]",
           "          Projection: COUNT(UInt8(1)), t2.t2_id, __always_true [COUNT(UInt8(1)):Int64;N, t2_id:UInt32;N, __always_true:Boolean]",
           "            Aggregate: groupBy=[[t2.t2_id, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_id:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
           "              TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
   
   ```
   I think this might also lead to the `count` bug if not handled correctly even it is the predicate scalar subquery. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #6457: Support wider range of Subquery

Posted by "jackwener (via GitHub)" <gi...@apache.org>.
jackwener commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1568458131

   @mingmwang doris don't support `project correlated subquery`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#discussion_r1227892180


##########
datafusion/optimizer/src/scalar_subquery_to_join.rs:
##########
@@ -513,20 +584,17 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  Inner Join: customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
-        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n    SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\
-        \n      Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
-        \n        Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
-        \n          Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
-        \n            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
+        \n  Filter: customer.c_custkey = __scalar_sq_1.MAX(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, MAX(orders.o_custkey):Int64;N]\
+        \n    Left Join:  [c_custkey:Int64, c_name:Utf8, MAX(orders.o_custkey):Int64;N]\

Review Comment:
   Yes. you are right.  We can leverage other rules to further optimize the plan.
   And Optimizing the `Scalar Subquery` to  `Left Join` is more general,  no matter the `Scalar Subquery` is in `Projection` exprs and in the `Filter` exprs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1587088605

   @alamb 
   I will resolve the conflicts tomorrow. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1579286894

   I took the liberty of merging this branch to `main` to resolve a conflict


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1589244325

   Thanks again @mingmwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6457: Support wider range of Subquery

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#discussion_r1210039771


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2030,7 +2030,7 @@ async fn subquery_to_join_with_both_side_expr() -> Result<()> {
         "  LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
         "    TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
         "    SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
-        "      Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
+        "      Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS t2.t2_id + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",

Review Comment:
   The  double alias is not caused by the decorrelate rules. It's caused by the other logical optimization rule:
   
   ```
   new_plan LeftSemi Join:  Filter: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_3.CAST(t2_id AS Int64) + Int64(1)
     TableScan: t1
     SubqueryAlias: __correlated_sq_3
       Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1), CAST(t2.t2_id AS Int64) + Int64(1)
         TableScan: t2
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1573050115

   @alamb 
   Do you having time to review this PR next week? 
   
   I think subquery unnesting is important in TPS-DS and OLAP workload,  and support wider ranger of subqueries will make DataFusion more competitive with other products.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1574004901

   > Do you having time to review this PR next week?
   
   
   Yes, I will find time. Thank you for the contribution -- I just don't have enough review bandwidth to keep up with everything!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#discussion_r1227896111


##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -657,6 +924,73 @@ async fn support_limit_subquery() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn in_non_correlated_subquery_with_limit() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
+
+    let sql =
+        "SELECT t1_id, t1_name FROM t1 WHERE t1_id in (SELECT t2_id FROM t2 limit 10)";
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan()?;
+
+    // de-correlated, limit is kept
+    let expected = vec![
+        "LeftSemi Join: t1.t1_id = __correlated_sq_1.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
+        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
+        "  SubqueryAlias: __correlated_sq_1 [t2_id:UInt32;N]",
+        "    Limit: skip=0, fetch=10 [t2_id:UInt32;N]",
+        "      TableScan: t2 projection=[t2_id], fetch=10 [t2_id:UInt32;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+

Review Comment:
   I do not add result checking in all the new added UTs.  Some UTs are used to verify the plans are expected. 
   I added result checking for those that might have `count` bugs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1589103131

   > > I will resolve the conflicts tomorrow.
   > 
   > I believe #6642 may also conflict -- I will try and review that one today and get it merged too
   
   Conflicts resolved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#discussion_r1212984941


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2030,7 +2030,7 @@ async fn subquery_to_join_with_both_side_expr() -> Result<()> {
         "  LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
         "    TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
         "    SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
-        "      Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
+        "      Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS t2.t2_id + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",

Review Comment:
   > I don't figure out why occur double alias
   
   Fixed the alias problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1578457485

   This is on my list for review today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#discussion_r1220111769


##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -352,6 +351,21 @@ async fn non_aggregated_correlated_scalar_subquery_with_single_row() -> Result<(
         "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
     );
 
+    // TODO infer nullability in the schema has bug

Review Comment:
   is this tracked by a ticket? It probably would be good to put the link URL in the code do so we don't lose track of it



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -549,24 +676,52 @@ async fn support_join_correlated_columns() -> Result<()> {
 }
 
 #[tokio::test]
-async fn support_join_correlated_columns2() -> Result<()> {
+async fn subquery_contains_join_contains_correlated_columns() -> Result<()> {
     let ctx = create_sub_query_join_context("t0_id", "t1_id", "t2_id", true)?;
     let sql = "SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN (select * from t2 where t2.t2_name = t0.t0_name) as t2 ON(t1.t1_id = t2.t2_id ))";
     let msg = format!("Creating logical plan for '{sql}'");
     let dataframe = ctx.sql(sql).await.expect(&msg);
     let plan = dataframe.into_optimized_plan()?;
 
     let expected = vec![
-        "Filter: EXISTS (<subquery>) [t0_id:UInt32;N, t0_name:Utf8;N]",
-        "  Subquery: [Int64(1):Int64]",
-        "    Projection: Int64(1) [Int64(1):Int64]",
-        "      Inner Join:  Filter: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "        TableScan: t1 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "        SubqueryAlias: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "          Projection: t2.t2_id, t2.t2_name, t2.t2_int [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "            Filter: t2.t2_name = outer_ref(t0.t0_name) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "              TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "LeftSemi Join: t0.t0_name = __correlated_sq_1.t2_name [t0_id:UInt32;N, t0_name:Utf8;N]",
+        "  TableScan: t0 projection=[t0_id, t0_name] [t0_id:UInt32;N, t0_name:Utf8;N]",
+        "  SubqueryAlias: __correlated_sq_1 [t2_name:Utf8;N]",
+        "    Projection: t2.t2_name [t2_name:Utf8;N]",
+        "      Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N]",
+        "        TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
+        "        SubqueryAlias: t2 [t2_id:UInt32;N, t2_name:Utf8;N]",
+        "          TableScan: t2 projection=[t2_id, t2_name] [t2_id:UInt32;N, t2_name:Utf8;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())

Review Comment:
   Shall we also add a new check for results for the tests changed in this file too? 
   
   Previously this query didn't run so could not be checked for results



##########
datafusion/optimizer/src/decorrelate_predicate_subquery.rs:
##########
@@ -1612,8 +1537,8 @@ mod tests {
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
                         \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
                         \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-                        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
-                        \n      Projection: orders.o_custkey [o_custkey:Int64]\
+                        \n    SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]\

Review Comment:
   the old plan looks straight up wrong, as it lost the `+ 1`. This new plan looks much better



##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2264,15 +2264,15 @@ async fn two_in_subquery_to_join_with_outer_filter() -> Result<()> {
 
     let expected = vec![
         "Explain [plan_type:Utf8, plan:Utf8]",
-        "  LeftSemi Join: CAST(t1.t1_int AS Int64) = __correlated_sq_2.CAST(t2_int AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "    LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "  LeftSemi Join: CAST(t1.t1_int AS Int64) = __correlated_sq_2.t2.t2_int + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",

Review Comment:
   do these plans actually have fewer `CAST`s or is it just an improvement in ALIAS generation?



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -397,6 +411,105 @@ async fn aggregated_correlated_scalar_subquery() -> Result<()> {
         "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
     );
 
+    // assert data
+    let results = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+-------+--------+",
+        "| t1_id | t2_sum |",
+        "+-------+--------+",
+        "| 11    | 3      |",
+        "| 22    | 1      |",
+        "| 44    | 3      |",
+        "| 33    |        |",
+        "+-------+--------+",
+    ];
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregated_correlated_scalar_subquery_with_having() -> Result<()> {

Review Comment:
   this kind of query would be great to move to the https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/sqllogictests/test_files/pg_compat suite so we can compare the results directly to postgres
   
   I manually verified a few of the results, but I didn't do so to all of them.
   
   We can do this move as a follow on PR (or never)



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -657,6 +924,73 @@ async fn support_limit_subquery() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn in_non_correlated_subquery_with_limit() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
+
+    let sql =
+        "SELECT t1_id, t1_name FROM t1 WHERE t1_id in (SELECT t2_id FROM t2 limit 10)";
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan()?;
+
+    // de-correlated, limit is kept
+    let expected = vec![
+        "LeftSemi Join: t1.t1_id = __correlated_sq_1.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
+        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
+        "  SubqueryAlias: __correlated_sq_1 [t2_id:UInt32;N]",
+        "    Limit: skip=0, fetch=10 [t2_id:UInt32;N]",
+        "      TableScan: t2 projection=[t2_id], fetch=10 [t2_id:UInt32;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+

Review Comment:
   We could check results here too?



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -606,25 +761,115 @@ async fn support_order_by_correlated_columns() -> Result<()> {
     Ok(())
 }
 
-// TODO: issue https://github.com/apache/arrow-datafusion/issues/6263
-#[ignore]
 #[tokio::test]
-async fn support_limit_subquery() -> Result<()> {
+async fn exists_subquery_with_select_null() -> Result<()> {
     let ctx = create_join_context("t1_id", "t2_id", true)?;
 
-    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 1)";
+    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT NULL)";
     let msg = format!("Creating logical plan for '{sql}'");
     let dataframe = ctx.sql(sql).await.expect(&msg);
     let plan = dataframe.into_optimized_plan()?;
 
     let expected = vec![
         "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  Subquery: [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "    Limit: skip=0, fetch=1 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      Projection: t2.t2_id, t2.t2_name, t2.t2_int [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "        Filter: t2.t2_id = outer_ref(t1.t1_id) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "          TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "  Subquery: [NULL:Null;N]",

Review Comment:
   I don't think this query will run yet -- perhaps we can file a ticket and leave a reference 



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -402,6 +403,80 @@ impl LogicalPlan {
         Ok(using_columns)
     }
 
+    /// returns the first output expression of this `LogicalPlan` node.
+    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
+        match self {
+            LogicalPlan::Projection(projection) => {
+                Ok(Some(projection.expr.as_slice()[0].clone()))
+            }
+            LogicalPlan::Aggregate(agg) => {
+                if agg.group_expr.is_empty() {
+                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
+                } else {
+                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
+                }
+            }
+            LogicalPlan::Filter(Filter { input, .. })
+            | LogicalPlan::Distinct(Distinct { input, .. })
+            | LogicalPlan::Sort(Sort { input, .. })
+            | LogicalPlan::Limit(Limit { input, .. })
+            | LogicalPlan::Repartition(Repartition { input, .. })
+            | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
+            LogicalPlan::Join(Join {
+                left,
+                right,
+                join_type,
+                ..
+            }) => match join_type {
+                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
+                    if left.schema().fields().is_empty() {
+                        right.head_output_expr()
+                    } else {
+                        left.head_output_expr()
+                    }
+                }
+                JoinType::LeftSemi | JoinType::LeftAnti => left.head_output_expr(),
+                JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(),
+            },
+            LogicalPlan::CrossJoin(cross) => {
+                if cross.left.schema().fields().is_empty() {
+                    cross.right.head_output_expr()
+                } else {
+                    cross.left.head_output_expr()
+                }
+            }
+            LogicalPlan::Union(union) => Ok(Some(Expr::Column(
+                union.schema.fields()[0].qualified_column(),
+            ))),
+            LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(
+                table.projected_schema.fields()[0].qualified_column(),
+            ))),
+            LogicalPlan::SubqueryAlias(subquery_alias) => {
+                let expr_opt = subquery_alias.input.head_output_expr()?;
+                expr_opt
+                    .map(|expr| {
+                        Ok(Expr::Column(create_col_from_scalar_expr(
+                            &expr,
+                            subquery_alias.alias.to_string(),
+                        )?))
+                    })
+                    .map_or(Ok(None), |v| v.map(Some))
+            }
+            LogicalPlan::Subquery(_) => Ok(None),
+            LogicalPlan::EmptyRelation(_)
+            | LogicalPlan::Prepare(_)
+            | LogicalPlan::Statement(_)
+            | LogicalPlan::Values(_)

Review Comment:
   why doesn't `Values` also return the first expr?



##########
datafusion/core/tests/sqllogictests/test_files/tpch/q11.slt.part:
##########
@@ -50,92 +50,89 @@ logical_plan
 Limit: skip=0, fetch=10
 --Sort: value DESC NULLS FIRST, fetch=10
 ----Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value
-------Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > __scalar_sq_1.__value
---------CrossJoin:
-----------Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(partsupp.ps_supplycost * CAST(partsupp.ps_availqty AS Decimal128(10, 0)))]]
-------------Projection: partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost
---------------Inner Join: supplier.s_nationkey = nation.n_nationkey
-----------------Projection: partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey
-------------------Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
---------------------TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
---------------------TableScan: supplier projection=[s_suppkey, s_nationkey]
-----------------Projection: nation.n_nationkey
-------------------Filter: nation.n_name = Utf8("GERMANY")
---------------------TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]
-----------SubqueryAlias: __scalar_sq_1
-------------Projection: CAST(CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS Decimal128(38, 15)) AS __value
---------------Aggregate: groupBy=[[]], aggr=[[SUM(partsupp.ps_supplycost * CAST(partsupp.ps_availqty AS Decimal128(10, 0)))]]
-----------------Projection: partsupp.ps_availqty, partsupp.ps_supplycost
-------------------Inner Join: supplier.s_nationkey = nation.n_nationkey
---------------------Projection: partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey
-----------------------Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
-------------------------TableScan: partsupp projection=[ps_suppkey, ps_availqty, ps_supplycost]
-------------------------TableScan: supplier projection=[s_suppkey, s_nationkey]
---------------------Projection: nation.n_nationkey
-----------------------Filter: nation.n_name = Utf8("GERMANY")
-------------------------TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]
+------Inner Join:  Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > __scalar_sq_1.SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)

Review Comment:
   using an InnerJoin (with ON) seems better than a CrossJoin followed by a filter πŸ‘ 



##########
datafusion/optimizer/src/scalar_subquery_to_join.rs:
##########
@@ -513,20 +584,17 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  Inner Join: customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
-        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n    SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\
-        \n      Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
-        \n        Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
-        \n          Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
-        \n            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
+        \n  Filter: customer.c_custkey = __scalar_sq_1.MAX(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, MAX(orders.o_custkey):Int64;N]\
+        \n    Left Join:  [c_custkey:Int64, c_name:Utf8, MAX(orders.o_custkey):Int64;N]\

Review Comment:
   it seems like this plan is not as good (to do a left join and then a filter rather than doing the filter as part of the join condition) -- perhaps other subsequent passes improve the filtering πŸ€” 
   
   But given it is  `LEFT JOIN` and the `__scalar_sq_1.MAX(orders.o_custkey)` refers to the non-preserved side, I don't think it can be done after the join
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #6457: Support wider range of Subquery

Posted by "jackwener (via GitHub)" <gi...@apache.org>.
jackwener commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1564613759

   Thanks @mingmwang .
   I'm going away for the weekend and won't have time to review the code. 
   I'm going to review it next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6457: Support wider range of Subquery

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1568512344

   I am planning to focus my review efforts on other parts of DataFusion where I have more to add -- I don't plan to review this PR unless someone think that is important.
   
   Thank you @mingmwang  and @jackwener  for pushing this forward


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6457: Support wider range of Subquery

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#issuecomment-1568172608

   I added additional logic to handle the `count()` aggregations in subquery, the logic is quite ugly.
   I'm not sure whether there are other good ways to handle this.
   
   @jackwener 
   Could you please help to check how this SQL was rewritten in Apache Doris?
   
   
   `SELECT t1_id, (SELECT count(*) + 2 as _cnt FROM t2 WHERE t2.t2_int = t1.t1_int) from t1`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#discussion_r1227777967


##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -352,6 +351,21 @@ async fn non_aggregated_correlated_scalar_subquery_with_single_row() -> Result<(
         "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
     );
 
+    // TODO infer nullability in the schema has bug

Review Comment:
   Sure, will add one.



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -397,6 +411,105 @@ async fn aggregated_correlated_scalar_subquery() -> Result<()> {
         "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
     );
 
+    // assert data
+    let results = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+-------+--------+",
+        "| t1_id | t2_sum |",
+        "+-------+--------+",
+        "| 11    | 3      |",
+        "| 22    | 1      |",
+        "| 44    | 3      |",
+        "| 33    |        |",
+        "+-------+--------+",
+    ];
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregated_correlated_scalar_subquery_with_having() -> Result<()> {

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#discussion_r1227781273


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2264,15 +2264,15 @@ async fn two_in_subquery_to_join_with_outer_filter() -> Result<()> {
 
     let expected = vec![
         "Explain [plan_type:Utf8, plan:Utf8]",
-        "  LeftSemi Join: CAST(t1.t1_int AS Int64) = __correlated_sq_2.CAST(t2_int AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "    LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "  LeftSemi Join: CAST(t1.t1_int AS Int64) = __correlated_sq_2.t2.t2_int + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",

Review Comment:
   It just improves the unnecessary ALIAS generation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#discussion_r1227887303


##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -402,6 +403,80 @@ impl LogicalPlan {
         Ok(using_columns)
     }
 
+    /// returns the first output expression of this `LogicalPlan` node.
+    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
+        match self {
+            LogicalPlan::Projection(projection) => {
+                Ok(Some(projection.expr.as_slice()[0].clone()))
+            }
+            LogicalPlan::Aggregate(agg) => {
+                if agg.group_expr.is_empty() {
+                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
+                } else {
+                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
+                }
+            }
+            LogicalPlan::Filter(Filter { input, .. })
+            | LogicalPlan::Distinct(Distinct { input, .. })
+            | LogicalPlan::Sort(Sort { input, .. })
+            | LogicalPlan::Limit(Limit { input, .. })
+            | LogicalPlan::Repartition(Repartition { input, .. })
+            | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
+            LogicalPlan::Join(Join {
+                left,
+                right,
+                join_type,
+                ..
+            }) => match join_type {
+                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
+                    if left.schema().fields().is_empty() {
+                        right.head_output_expr()
+                    } else {
+                        left.head_output_expr()
+                    }
+                }
+                JoinType::LeftSemi | JoinType::LeftAnti => left.head_output_expr(),
+                JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(),
+            },
+            LogicalPlan::CrossJoin(cross) => {
+                if cross.left.schema().fields().is_empty() {
+                    cross.right.head_output_expr()
+                } else {
+                    cross.left.head_output_expr()
+                }
+            }
+            LogicalPlan::Union(union) => Ok(Some(Expr::Column(
+                union.schema.fields()[0].qualified_column(),
+            ))),
+            LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(
+                table.projected_schema.fields()[0].qualified_column(),
+            ))),
+            LogicalPlan::SubqueryAlias(subquery_alias) => {
+                let expr_opt = subquery_alias.input.head_output_expr()?;
+                expr_opt
+                    .map(|expr| {
+                        Ok(Expr::Column(create_col_from_scalar_expr(
+                            &expr,
+                            subquery_alias.alias.to_string(),
+                        )?))
+                    })
+                    .map_or(Ok(None), |v| v.map(Some))
+            }
+            LogicalPlan::Subquery(_) => Ok(None),
+            LogicalPlan::EmptyRelation(_)
+            | LogicalPlan::Prepare(_)
+            | LogicalPlan::Statement(_)
+            | LogicalPlan::Values(_)

Review Comment:
   I think currently we do not support `Values` clause inside the subquery.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6457: Support wider range of Subquery, handle the Count bug

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#discussion_r1227892180


##########
datafusion/optimizer/src/scalar_subquery_to_join.rs:
##########
@@ -513,20 +584,17 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  Inner Join: customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
-        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n    SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\
-        \n      Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
-        \n        Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
-        \n          Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
-        \n            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
+        \n  Filter: customer.c_custkey = __scalar_sq_1.MAX(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, MAX(orders.o_custkey):Int64;N]\
+        \n    Left Join:  [c_custkey:Int64, c_name:Utf8, MAX(orders.o_custkey):Int64;N]\

Review Comment:
   Yes. you are right.  We can leverage other rules to further optimize the plan.
   And Optimizing the `Scalar Subquery` to  `Left Join` is more general,  no matter the `Scalar Subquery` is in `Projection` exprs  or in the `Filter` exprs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org