You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/06/06 18:50:28 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6457: Support wider range of Subquery, handle the Count bug

alamb commented on code in PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#discussion_r1220111769


##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -352,6 +351,21 @@ async fn non_aggregated_correlated_scalar_subquery_with_single_row() -> Result<(
         "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
     );
 
+    // TODO infer nullability in the schema has bug

Review Comment:
   is this tracked by a ticket? It probably would be good to put the link URL in the code do so we don't lose track of it



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -549,24 +676,52 @@ async fn support_join_correlated_columns() -> Result<()> {
 }
 
 #[tokio::test]
-async fn support_join_correlated_columns2() -> Result<()> {
+async fn subquery_contains_join_contains_correlated_columns() -> Result<()> {
     let ctx = create_sub_query_join_context("t0_id", "t1_id", "t2_id", true)?;
     let sql = "SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN (select * from t2 where t2.t2_name = t0.t0_name) as t2 ON(t1.t1_id = t2.t2_id ))";
     let msg = format!("Creating logical plan for '{sql}'");
     let dataframe = ctx.sql(sql).await.expect(&msg);
     let plan = dataframe.into_optimized_plan()?;
 
     let expected = vec![
-        "Filter: EXISTS (<subquery>) [t0_id:UInt32;N, t0_name:Utf8;N]",
-        "  Subquery: [Int64(1):Int64]",
-        "    Projection: Int64(1) [Int64(1):Int64]",
-        "      Inner Join:  Filter: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "        TableScan: t1 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "        SubqueryAlias: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "          Projection: t2.t2_id, t2.t2_name, t2.t2_int [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "            Filter: t2.t2_name = outer_ref(t0.t0_name) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "              TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "LeftSemi Join: t0.t0_name = __correlated_sq_1.t2_name [t0_id:UInt32;N, t0_name:Utf8;N]",
+        "  TableScan: t0 projection=[t0_id, t0_name] [t0_id:UInt32;N, t0_name:Utf8;N]",
+        "  SubqueryAlias: __correlated_sq_1 [t2_name:Utf8;N]",
+        "    Projection: t2.t2_name [t2_name:Utf8;N]",
+        "      Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N]",
+        "        TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
+        "        SubqueryAlias: t2 [t2_id:UInt32;N, t2_name:Utf8;N]",
+        "          TableScan: t2 projection=[t2_id, t2_name] [t2_id:UInt32;N, t2_name:Utf8;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())

Review Comment:
   Shall we also add a new check for results for the tests changed in this file too? 
   
   Previously this query didn't run so could not be checked for results



##########
datafusion/optimizer/src/decorrelate_predicate_subquery.rs:
##########
@@ -1612,8 +1537,8 @@ mod tests {
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
                         \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
                         \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-                        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
-                        \n      Projection: orders.o_custkey [o_custkey:Int64]\
+                        \n    SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]\

Review Comment:
   the old plan looks straight up wrong, as it lost the `+ 1`. This new plan looks much better



##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2264,15 +2264,15 @@ async fn two_in_subquery_to_join_with_outer_filter() -> Result<()> {
 
     let expected = vec![
         "Explain [plan_type:Utf8, plan:Utf8]",
-        "  LeftSemi Join: CAST(t1.t1_int AS Int64) = __correlated_sq_2.CAST(t2_int AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "    LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "  LeftSemi Join: CAST(t1.t1_int AS Int64) = __correlated_sq_2.t2.t2_int + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",

Review Comment:
   do these plans actually have fewer `CAST`s or is it just an improvement in ALIAS generation?



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -397,6 +411,105 @@ async fn aggregated_correlated_scalar_subquery() -> Result<()> {
         "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
     );
 
+    // assert data
+    let results = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+-------+--------+",
+        "| t1_id | t2_sum |",
+        "+-------+--------+",
+        "| 11    | 3      |",
+        "| 22    | 1      |",
+        "| 44    | 3      |",
+        "| 33    |        |",
+        "+-------+--------+",
+    ];
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregated_correlated_scalar_subquery_with_having() -> Result<()> {

Review Comment:
   this kind of query would be great to move to the https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/sqllogictests/test_files/pg_compat suite so we can compare the results directly to postgres
   
   I manually verified a few of the results, but I didn't do so to all of them.
   
   We can do this move as a follow on PR (or never)



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -657,6 +924,73 @@ async fn support_limit_subquery() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn in_non_correlated_subquery_with_limit() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
+
+    let sql =
+        "SELECT t1_id, t1_name FROM t1 WHERE t1_id in (SELECT t2_id FROM t2 limit 10)";
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan()?;
+
+    // de-correlated, limit is kept
+    let expected = vec![
+        "LeftSemi Join: t1.t1_id = __correlated_sq_1.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
+        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
+        "  SubqueryAlias: __correlated_sq_1 [t2_id:UInt32;N]",
+        "    Limit: skip=0, fetch=10 [t2_id:UInt32;N]",
+        "      TableScan: t2 projection=[t2_id], fetch=10 [t2_id:UInt32;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+

Review Comment:
   We could check results here too?



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -606,25 +761,115 @@ async fn support_order_by_correlated_columns() -> Result<()> {
     Ok(())
 }
 
-// TODO: issue https://github.com/apache/arrow-datafusion/issues/6263
-#[ignore]
 #[tokio::test]
-async fn support_limit_subquery() -> Result<()> {
+async fn exists_subquery_with_select_null() -> Result<()> {
     let ctx = create_join_context("t1_id", "t2_id", true)?;
 
-    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 1)";
+    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT NULL)";
     let msg = format!("Creating logical plan for '{sql}'");
     let dataframe = ctx.sql(sql).await.expect(&msg);
     let plan = dataframe.into_optimized_plan()?;
 
     let expected = vec![
         "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  Subquery: [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "    Limit: skip=0, fetch=1 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      Projection: t2.t2_id, t2.t2_name, t2.t2_int [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "        Filter: t2.t2_id = outer_ref(t1.t1_id) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "          TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "  Subquery: [NULL:Null;N]",

Review Comment:
   I don't think this query will run yet -- perhaps we can file a ticket and leave a reference 



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -402,6 +403,80 @@ impl LogicalPlan {
         Ok(using_columns)
     }
 
+    /// returns the first output expression of this `LogicalPlan` node.
+    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
+        match self {
+            LogicalPlan::Projection(projection) => {
+                Ok(Some(projection.expr.as_slice()[0].clone()))
+            }
+            LogicalPlan::Aggregate(agg) => {
+                if agg.group_expr.is_empty() {
+                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
+                } else {
+                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
+                }
+            }
+            LogicalPlan::Filter(Filter { input, .. })
+            | LogicalPlan::Distinct(Distinct { input, .. })
+            | LogicalPlan::Sort(Sort { input, .. })
+            | LogicalPlan::Limit(Limit { input, .. })
+            | LogicalPlan::Repartition(Repartition { input, .. })
+            | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
+            LogicalPlan::Join(Join {
+                left,
+                right,
+                join_type,
+                ..
+            }) => match join_type {
+                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
+                    if left.schema().fields().is_empty() {
+                        right.head_output_expr()
+                    } else {
+                        left.head_output_expr()
+                    }
+                }
+                JoinType::LeftSemi | JoinType::LeftAnti => left.head_output_expr(),
+                JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(),
+            },
+            LogicalPlan::CrossJoin(cross) => {
+                if cross.left.schema().fields().is_empty() {
+                    cross.right.head_output_expr()
+                } else {
+                    cross.left.head_output_expr()
+                }
+            }
+            LogicalPlan::Union(union) => Ok(Some(Expr::Column(
+                union.schema.fields()[0].qualified_column(),
+            ))),
+            LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(
+                table.projected_schema.fields()[0].qualified_column(),
+            ))),
+            LogicalPlan::SubqueryAlias(subquery_alias) => {
+                let expr_opt = subquery_alias.input.head_output_expr()?;
+                expr_opt
+                    .map(|expr| {
+                        Ok(Expr::Column(create_col_from_scalar_expr(
+                            &expr,
+                            subquery_alias.alias.to_string(),
+                        )?))
+                    })
+                    .map_or(Ok(None), |v| v.map(Some))
+            }
+            LogicalPlan::Subquery(_) => Ok(None),
+            LogicalPlan::EmptyRelation(_)
+            | LogicalPlan::Prepare(_)
+            | LogicalPlan::Statement(_)
+            | LogicalPlan::Values(_)

Review Comment:
   why doesn't `Values` also return the first expr?



##########
datafusion/core/tests/sqllogictests/test_files/tpch/q11.slt.part:
##########
@@ -50,92 +50,89 @@ logical_plan
 Limit: skip=0, fetch=10
 --Sort: value DESC NULLS FIRST, fetch=10
 ----Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value
-------Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > __scalar_sq_1.__value
---------CrossJoin:
-----------Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(partsupp.ps_supplycost * CAST(partsupp.ps_availqty AS Decimal128(10, 0)))]]
-------------Projection: partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost
---------------Inner Join: supplier.s_nationkey = nation.n_nationkey
-----------------Projection: partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey
-------------------Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
---------------------TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
---------------------TableScan: supplier projection=[s_suppkey, s_nationkey]
-----------------Projection: nation.n_nationkey
-------------------Filter: nation.n_name = Utf8("GERMANY")
---------------------TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]
-----------SubqueryAlias: __scalar_sq_1
-------------Projection: CAST(CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS Decimal128(38, 15)) AS __value
---------------Aggregate: groupBy=[[]], aggr=[[SUM(partsupp.ps_supplycost * CAST(partsupp.ps_availqty AS Decimal128(10, 0)))]]
-----------------Projection: partsupp.ps_availqty, partsupp.ps_supplycost
-------------------Inner Join: supplier.s_nationkey = nation.n_nationkey
---------------------Projection: partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey
-----------------------Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
-------------------------TableScan: partsupp projection=[ps_suppkey, ps_availqty, ps_supplycost]
-------------------------TableScan: supplier projection=[s_suppkey, s_nationkey]
---------------------Projection: nation.n_nationkey
-----------------------Filter: nation.n_name = Utf8("GERMANY")
-------------------------TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]
+------Inner Join:  Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > __scalar_sq_1.SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)

Review Comment:
   using an InnerJoin (with ON) seems better than a CrossJoin followed by a filter 👍 



##########
datafusion/optimizer/src/scalar_subquery_to_join.rs:
##########
@@ -513,20 +584,17 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  Inner Join: customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
-        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n    SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\
-        \n      Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
-        \n        Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
-        \n          Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
-        \n            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
+        \n  Filter: customer.c_custkey = __scalar_sq_1.MAX(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, MAX(orders.o_custkey):Int64;N]\
+        \n    Left Join:  [c_custkey:Int64, c_name:Utf8, MAX(orders.o_custkey):Int64;N]\

Review Comment:
   it seems like this plan is not as good (to do a left join and then a filter rather than doing the filter as part of the join condition) -- perhaps other subsequent passes improve the filtering 🤔 
   
   But given it is  `LEFT JOIN` and the `__scalar_sq_1.MAX(orders.o_custkey)` refers to the non-preserved side, I don't think it can be done after the join
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org