You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/05/19 13:08:27 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6372: More scalar subqueries support

alamb commented on code in PR #6372:
URL: https://github.com/apache/arrow-datafusion/pull/6372#discussion_r1198924610


##########
benchmarks/expected-plans/q2.txt:
##########
@@ -3,12 +3,12 @@
 +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
 | logical_plan  | Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST                                                                                                                                                                    |
 |               |   Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment                                                                                                                                                   |
-|               |     Inner Join: partsupp.ps_supplycost = __scalar_sq_1.__value, part.p_partkey = __scalar_sq_1.ps_partkey                                                                                                                                                                                                 |
-|               |       Projection: part.p_partkey, part.p_mfgr, partsupp.ps_supplycost, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name                                                                                                                       |
+|               |     Inner Join: part.p_partkey = __scalar_sq_1.ps_partkey, partsupp.ps_supplycost = __scalar_sq_1.__value                                                                                                                                                                                                 |

Review Comment:
   The only difference in these plans appear to be the names / orders of the columns used internally -- the actual output and plan appear to be the same) 👍 



##########
datafusion/optimizer/src/scalar_subquery_to_join.rs:
##########
@@ -844,19 +836,20 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        // unoptimized plan because we don't support disjunctions yet
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Filter: customer.c_custkey = (<subquery>) OR customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
-    Subquery: [MAX(orders.o_custkey):Int64;N]
-      Projection: MAX(orders.o_custkey) [MAX(orders.o_custkey):Int64;N]
-        Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]
-          Filter: customer.c_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
-            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
-    TableScan: customer [c_custkey:Int64, c_name:Utf8]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\

Review Comment:
   🎉 



##########
datafusion/optimizer/src/scalar_subquery_to_join.rs:
##########
@@ -510,6 +494,7 @@ mod tests {
             vec![
                 Arc::new(ScalarSubqueryToJoin::new()),
                 Arc::new(ExtractEquijoinPredicate::new()),
+                Arc::new(EliminateCrossJoin::new()),

Review Comment:
   I think these tests are getting a little hard to understand because they use a subset of the optimizer passes that is not used for actual queries, bit is more than pass that is defined in this module. 
   
   I recommend that in a follow on PR we either
   1. remove all optimizer passes in the tests in this file other than `ScalarSubqueryToJoin` (so the tests here are focused on the code in this module)
   2. Move any query plans that we want to see the final plan after all rewrites to the end to end tests (ideally sqllogictest) files



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -690,3 +690,78 @@ async fn support_union_subquery() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn simple_uncorrelated_scalar_subquery() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
+
+    let sql = "select (select count(*) from t1) as b";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan()?;
+
+    let expected = vec![
+        "Projection: __scalar_sq_1.__value AS b [b:Int64;N]",
+        "  SubqueryAlias: __scalar_sq_1 [__value:Int64;N]",
+        "    Projection: COUNT(UInt8(1)) AS __value [__value:Int64;N]",
+        "      Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] [COUNT(UInt8(1)):Int64;N]",
+        "        TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    // assert data
+    let results = execute_to_batches(&ctx, sql).await;
+    let expected = vec!["+---+", "| b |", "+---+", "| 4 |", "+---+"];
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn simple_uncorrelated_scalar_subquery2() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
+
+    let sql = "select (select count(*) from t1) as b, (select count(1) from t2) as c";

Review Comment:
   I suggest also  adding an error case here for a query that doesn't produce a single row? Something like
   
   ```sql
   select (select t1_id from t1) as b, (select count(1) from t2) as c
   ```
   
   In which case I would expect an error



##########
datafusion/core/tests/sql/subqueries.rs:
##########
@@ -382,13 +382,13 @@ async fn aggregated_correlated_scalar_subquery() -> Result<()> {
     let plan = dataframe.into_optimized_plan()?;
 
     let expected = vec![
-        "Projection: t1.t1_id, (<subquery>) AS t2_sum [t1_id:UInt32;N, t2_sum:UInt64;N]",
-        "  Subquery: [SUM(t2.t2_int):UInt64;N]",
-        "    Projection: SUM(t2.t2_int) [SUM(t2.t2_int):UInt64;N]",
-        "      Aggregate: groupBy=[[]], aggr=[[SUM(t2.t2_int)]] [SUM(t2.t2_int):UInt64;N]",
-        "        Filter: t2.t2_id = outer_ref(t1.t1_id) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "          TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "  TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
+        "Projection: t1.t1_id, __scalar_sq_1.__value AS t2_sum [t1_id:UInt32;N, t2_sum:UInt64;N]",
+        "  Left Join: t1.t1_id = __scalar_sq_1.t2_id [t1_id:UInt32;N, t2_id:UInt32;N, __value:UInt64;N]",

Review Comment:
   this is a nice change as the query can now actually run 👍 



##########
datafusion/optimizer/src/scalar_subquery_to_join.rs:
##########
@@ -694,17 +680,19 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  Inner Join: customer.c_custkey = __scalar_sq_1.__value, customer.c_custkey = __scalar_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
-        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n    SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Int64;N]\
-        \n      Projection: orders.o_custkey, MAX(orders.o_custkey) + Int32(1) AS __value [o_custkey:Int64, __value:Int64;N]\
-        \n        Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
-        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
+        \n  Filter: customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64;N, __value:Int64;N]\

Review Comment:
   In a real query plan, this filter is probably removed, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org