You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/30 14:24:35 UTC
[arrow-datafusion] branch master updated: Move subquery alias assignment onto rules (#4767)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new cf45eb902 Move subquery alias assignment onto rules (#4767)
cf45eb902 is described below
commit cf45eb9020092943b96653d70fafb143cc362e19
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri Dec 30 14:24:30 2022 +0000
Move subquery alias assignment onto rules (#4767)
---
benchmarks/expected-plans/q11.txt | 4 +-
benchmarks/expected-plans/q15.txt | 4 +-
benchmarks/expected-plans/q16.txt | 4 +-
benchmarks/expected-plans/q17.txt | 13 +-
benchmarks/expected-plans/q18.txt | 4 +-
benchmarks/expected-plans/q2.txt | 4 +-
benchmarks/expected-plans/q20.txt | 14 +-
benchmarks/expected-plans/q22.txt | 4 +-
benchmarks/src/bin/tpch.rs | 12 +-
datafusion/core/tests/sql/subqueries.rs | 226 ++++++++++-----------
datafusion/optimizer/src/{lib.rs => alias.rs} | 51 +++--
datafusion/optimizer/src/decorrelate_where_in.rs | 97 +++++----
datafusion/optimizer/src/lib.rs | 1 +
datafusion/optimizer/src/optimizer.rs | 15 --
.../optimizer/src/scalar_subquery_to_join.rs | 73 +++----
datafusion/optimizer/tests/integration-test.rs | 4 +-
16 files changed, 258 insertions(+), 272 deletions(-)
diff --git a/benchmarks/expected-plans/q11.txt b/benchmarks/expected-plans/q11.txt
index bb5493828..7d8e14548 100644
--- a/benchmarks/expected-plans/q11.txt
+++ b/benchmarks/expected-plans/q11.txt
@@ -1,6 +1,6 @@
Sort: value DESC NULLS FIRST
Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value
- Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__sq_1.__value AS Decimal128(38, 15))
+ Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__scalar_sq_1.__value AS Decimal128(38, 15))
CrossJoin:
Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
Inner Join: supplier.s_nationkey = nation.n_nationkey
@@ -9,7 +9,7 @@ Sort: value DESC NULLS FIRST
TableScan: supplier projection=[s_suppkey, s_nationkey]
Filter: nation.n_name = Utf8("GERMANY")
TableScan: nation projection=[n_nationkey, n_name]
- SubqueryAlias: __sq_1
+ SubqueryAlias: __scalar_sq_1
Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value
Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
Inner Join: supplier.s_nationkey = nation.n_nationkey
diff --git a/benchmarks/expected-plans/q15.txt b/benchmarks/expected-plans/q15.txt
index 96401dd7b..f4e053f8d 100644
--- a/benchmarks/expected-plans/q15.txt
+++ b/benchmarks/expected-plans/q15.txt
@@ -1,7 +1,7 @@
EmptyRelation
Sort: supplier.s_suppkey ASC NULLS LAST
Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue
- Inner Join: revenue0.total_revenue = __sq_1.__value
+ Inner Join: revenue0.total_revenue = __scalar_sq_1.__value
Inner Join: supplier.s_suppkey = revenue0.supplier_no
TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone]
SubqueryAlias: revenue0
@@ -10,7 +10,7 @@ Sort: supplier.s_suppkey ASC NULLS LAST
Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
- SubqueryAlias: __sq_1
+ SubqueryAlias: __scalar_sq_1
Projection: MAX(revenue0.total_revenue) AS __value
Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]]
SubqueryAlias: revenue0
diff --git a/benchmarks/expected-plans/q16.txt b/benchmarks/expected-plans/q16.txt
index 60ef26933..6af486a2a 100644
--- a/benchmarks/expected-plans/q16.txt
+++ b/benchmarks/expected-plans/q16.txt
@@ -3,12 +3,12 @@ Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS LAST, part.p_type AS
Projection: group_alias_0 AS part.p_brand, group_alias_1 AS part.p_type, group_alias_2 AS part.p_size, COUNT(alias1) AS COUNT(DISTINCT partsupp.ps_suppkey)
Aggregate: groupBy=[[group_alias_0, group_alias_1, group_alias_2]], aggr=[[COUNT(alias1)]]
Aggregate: groupBy=[[part.p_brand AS group_alias_0, part.p_type AS group_alias_1, part.p_size AS group_alias_2, partsupp.ps_suppkey AS alias1]], aggr=[[]]
- LeftAnti Join: partsupp.ps_suppkey = __sq_1.s_suppkey
+ LeftAnti Join: partsupp.ps_suppkey = __correlated_sq_1.s_suppkey
Inner Join: partsupp.ps_partkey = part.p_partkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey]
Filter: part.p_brand != Utf8("Brand#45") AND part.p_type NOT LIKE Utf8("MEDIUM POLISHED%") AND part.p_size IN ([Int32(49), Int32(14), Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)])
TableScan: part projection=[p_partkey, p_brand, p_type, p_size]
- SubqueryAlias: __sq_1
+ SubqueryAlias: __correlated_sq_1
Projection: supplier.s_suppkey AS s_suppkey
Filter: supplier.s_comment LIKE Utf8("%Customer%Complaints%")
TableScan: supplier projection=[s_suppkey, s_comment]
diff --git a/benchmarks/expected-plans/q17.txt b/benchmarks/expected-plans/q17.txt
index 17b8e9698..755311c5e 100644
--- a/benchmarks/expected-plans/q17.txt
+++ b/benchmarks/expected-plans/q17.txt
@@ -1,11 +1,12 @@
-Projection: CAST(SUM(lineitem.l_extendedprice) AS Decimal128(38, 33)) / Decimal128(Some(7000000000000000195487369212723200),38,33) AS avg_yearly
+Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly
Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]]
- Filter: CAST(lineitem.l_quantity AS Decimal128(38, 21)) < __sq_1.__value
- Inner Join: part.p_partkey = __sq_1.l_partkey
+ Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15))
+ Inner Join: part.p_partkey = __scalar_sq_1.l_partkey, lineitem.l_partkey = __scalar_sq_1.l_partkey
Inner Join: lineitem.l_partkey = part.p_partkey
TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]
Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX")
TableScan: part projection=[p_partkey, p_brand, p_container]
- Projection: lineitem.l_partkey, Decimal128(Some(200000000000000000000),38,21) * CAST(AVG(lineitem.l_quantity) AS Decimal128(38, 21)) AS __value, alias=__sq_1
- Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]]
- TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]
\ No newline at end of file
+ SubqueryAlias: __scalar_sq_1
+ Projection: lineitem.l_partkey, Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS __value
+ Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]]
+ TableScan: lineitem projection=[l_partkey, l_quantity]
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q18.txt b/benchmarks/expected-plans/q18.txt
index 4017722c5..639598725 100644
--- a/benchmarks/expected-plans/q18.txt
+++ b/benchmarks/expected-plans/q18.txt
@@ -1,13 +1,13 @@
Sort: orders.o_totalprice DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST
Projection: customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, SUM(lineitem.l_quantity)
Aggregate: groupBy=[[customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice]], aggr=[[SUM(lineitem.l_quantity)]]
- LeftSemi Join: orders.o_orderkey = __sq_1.l_orderkey
+ LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey
Inner Join: orders.o_orderkey = lineitem.l_orderkey
Inner Join: customer.c_custkey = orders.o_custkey
TableScan: customer projection=[c_custkey, c_name]
TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate]
TableScan: lineitem projection=[l_orderkey, l_quantity]
- SubqueryAlias: __sq_1
+ SubqueryAlias: __correlated_sq_1
Projection: lineitem.l_orderkey AS l_orderkey
Filter: SUM(lineitem.l_quantity) > Decimal128(Some(30000),25,2)
Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_quantity)]]
diff --git a/benchmarks/expected-plans/q2.txt b/benchmarks/expected-plans/q2.txt
index 34fb1e09a..571c320e9 100644
--- a/benchmarks/expected-plans/q2.txt
+++ b/benchmarks/expected-plans/q2.txt
@@ -1,7 +1,7 @@
Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST
Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment
Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name
- Inner Join: part.p_partkey = __sq_1.ps_partkey, partsupp.ps_supplycost = __sq_1.__value
+ Inner Join: part.p_partkey = __scalar_sq_1.ps_partkey, partsupp.ps_supplycost = __scalar_sq_1.__value
Inner Join: nation.n_regionkey = region.r_regionkey
Inner Join: supplier.s_nationkey = nation.n_nationkey
Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
@@ -13,7 +13,7 @@ Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplie
TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
Filter: region.r_name = Utf8("EUROPE")
TableScan: region projection=[r_regionkey, r_name]
- SubqueryAlias: __sq_1
+ SubqueryAlias: __scalar_sq_1
Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value
Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]
Inner Join: nation.n_regionkey = region.r_regionkey
diff --git a/benchmarks/expected-plans/q20.txt b/benchmarks/expected-plans/q20.txt
index b2676f61f..b7ecb9a09 100644
--- a/benchmarks/expected-plans/q20.txt
+++ b/benchmarks/expected-plans/q20.txt
@@ -1,21 +1,21 @@
Sort: supplier.s_name ASC NULLS LAST
Projection: supplier.s_name, supplier.s_address
- LeftSemi Join: supplier.s_suppkey = __sq_1.ps_suppkey
+ LeftSemi Join: supplier.s_suppkey = __correlated_sq_1.ps_suppkey
Inner Join: supplier.s_nationkey = nation.n_nationkey
TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey]
Filter: nation.n_name = Utf8("CANADA")
TableScan: nation projection=[n_nationkey, n_name]
- SubqueryAlias: __sq_1
+ SubqueryAlias: __correlated_sq_1
Projection: partsupp.ps_suppkey AS ps_suppkey
- Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value
- Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey
- LeftSemi Join: partsupp.ps_partkey = __sq_2.p_partkey
+ Filter: CAST(partsupp.ps_availqty AS Float64) > __scalar_sq_1.__value
+ Inner Join: partsupp.ps_partkey = __scalar_sq_1.l_partkey, partsupp.ps_suppkey = __scalar_sq_1.l_suppkey
+ LeftSemi Join: partsupp.ps_partkey = __correlated_sq_2.p_partkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]
- SubqueryAlias: __sq_2
+ SubqueryAlias: __correlated_sq_2
Projection: part.p_partkey AS p_partkey
Filter: part.p_name LIKE Utf8("forest%")
TableScan: part projection=[p_partkey, p_name]
- SubqueryAlias: __sq_3
+ SubqueryAlias: __scalar_sq_1
Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value
Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]
Filter: lineitem.l_shipdate >= Date32("8766") AND lineitem.l_shipdate < Date32("9131")
diff --git a/benchmarks/expected-plans/q22.txt b/benchmarks/expected-plans/q22.txt
index 82060bd59..0fd7a590a 100644
--- a/benchmarks/expected-plans/q22.txt
+++ b/benchmarks/expected-plans/q22.txt
@@ -3,13 +3,13 @@ Sort: custsale.cntrycode ASC NULLS LAST
Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]]
SubqueryAlias: custsale
Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal
- Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value
+ Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __scalar_sq_1.__value
CrossJoin:
LeftAnti Join: customer.c_custkey = orders.o_custkey
Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
TableScan: customer projection=[c_custkey, c_phone, c_acctbal]
TableScan: orders projection=[o_custkey]
- SubqueryAlias: __sq_1
+ SubqueryAlias: __scalar_sq_1
Projection: AVG(customer.c_acctbal) AS __value
Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]
Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 1b1bb876c..bff7999cd 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -594,17 +594,7 @@ mod tests {
expected_plan(16).await
}
- /// This query produces different plans depending on operating system. The difference is
- /// due to re-writing the following expression:
- ///
- /// `sum(l_extendedprice) / 7.0 as avg_yearly`
- ///
- /// Linux: Decimal128(Some(7000000000000000195487369212723200),38,33)
- /// Windows: Decimal128(Some(6999999999999999042565864605876224),38,33)
- ///
- /// See https://github.com/apache/arrow-datafusion/issues/3791
- #[tokio::test]
- #[ignore]
+ #[tokio::test]
async fn q17_expected_plan() -> Result<()> {
expected_plan(17).await
}
diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs
index 3a4dadd7f..3fff5ba3e 100644
--- a/datafusion/core/tests/sql/subqueries.rs
+++ b/datafusion/core/tests/sql/subqueries.rs
@@ -50,21 +50,21 @@ where c_acctbal < (
let plan = dataframe.into_optimized_plan().unwrap();
let actual = format!("{}", plan.display_indent());
- let expected = "Sort: customer.c_custkey ASC NULLS LAST\
- \n Projection: customer.c_custkey\
- \n Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __sq_1.__value\
- \n Inner Join: customer.c_custkey = __sq_1.o_custkey\
- \n TableScan: customer projection=[c_custkey, c_acctbal]\
- \n SubqueryAlias: __sq_1\
- \n Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value\
- \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]]\
- \n Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __sq_2.__value\
- \n Inner Join: orders.o_orderkey = __sq_2.l_orderkey\
- \n TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice]\
- \n SubqueryAlias: __sq_2\
- \n Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS price AS __value\
- \n Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]]\
- \n TableScan: lineitem projection=[l_orderkey, l_extendedprice]";
+ let expected = r#"Sort: customer.c_custkey ASC NULLS LAST
+ Projection: customer.c_custkey
+ Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_1.__value
+ Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey
+ TableScan: customer projection=[c_custkey, c_acctbal]
+ SubqueryAlias: __scalar_sq_1
+ Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value
+ Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]]
+ Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __scalar_sq_2.__value
+ Inner Join: orders.o_orderkey = __scalar_sq_2.l_orderkey
+ TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice]
+ SubqueryAlias: __scalar_sq_2
+ Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS price AS __value
+ Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]]
+ TableScan: lineitem projection=[l_orderkey, l_extendedprice]"#;
assert_eq!(actual, expected);
Ok(())
@@ -94,12 +94,12 @@ where o_orderstatus in (
let dataframe = ctx.sql(sql).await.unwrap();
let plan = dataframe.into_optimized_plan().unwrap();
let actual = format!("{}", plan.display_indent());
- let expected = "Projection: orders.o_orderkey\
- \n LeftSemi Join: orders.o_orderstatus = __sq_1.l_linestatus, orders.o_orderkey = __sq_1.l_orderkey\
- \n TableScan: orders projection=[o_orderkey, o_orderstatus]\
- \n SubqueryAlias: __sq_1\
- \n Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey\
- \n TableScan: lineitem projection=[l_orderkey, l_linestatus]";
+ let expected = r#"Projection: orders.o_orderkey
+ LeftSemi Join: orders.o_orderstatus = __correlated_sq_1.l_linestatus, orders.o_orderkey = __correlated_sq_1.l_orderkey
+ TableScan: orders projection=[o_orderkey, o_orderstatus]
+ SubqueryAlias: __correlated_sq_1
+ Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey
+ TableScan: lineitem projection=[l_orderkey, l_linestatus]"#;
assert_eq!(actual, expected);
// assert data
@@ -140,32 +140,32 @@ order by s_acctbal desc, n_name, s_name, p_partkey;"#;
let dataframe = ctx.sql(sql).await.unwrap();
let plan = dataframe.into_optimized_plan().unwrap();
let actual = format!("{}", plan.display_indent());
- let expected = "Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST\
- \n Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment\
- \n Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name\
- \n Inner Join: part.p_partkey = __sq_1.ps_partkey, partsupp.ps_supplycost = __sq_1.__value\
- \n Inner Join: nation.n_regionkey = region.r_regionkey\
- \n Inner Join: supplier.s_nationkey = nation.n_nationkey\
- \n Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\
- \n Inner Join: part.p_partkey = partsupp.ps_partkey\
- \n Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8(\"%BRASS\")\
- \n TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE Utf8(\"%BRASS\")]\
- \n TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]\
- \n TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]\
- \n TableScan: nation projection=[n_nationkey, n_name, n_regionkey]\
- \n Filter: region.r_name = Utf8(\"EUROPE\")\
- \n TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8(\"EUROPE\")]\
- \n SubqueryAlias: __sq_1\
- \n Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value\
- \n Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]\
- \n Inner Join: nation.n_regionkey = region.r_regionkey\
- \n Inner Join: supplier.s_nationkey = nation.n_nationkey\
- \n Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\
- \n TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]\
- \n TableScan: supplier projection=[s_suppkey, s_nationkey]\
- \n TableScan: nation projection=[n_nationkey, n_regionkey]\
- \n Filter: region.r_name = Utf8(\"EUROPE\")\
- \n TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8(\"EUROPE\")]";
+ let expected = r#"Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST
+ Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment
+ Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name
+ Inner Join: part.p_partkey = __scalar_sq_1.ps_partkey, partsupp.ps_supplycost = __scalar_sq_1.__value
+ Inner Join: nation.n_regionkey = region.r_regionkey
+ Inner Join: supplier.s_nationkey = nation.n_nationkey
+ Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
+ Inner Join: part.p_partkey = partsupp.ps_partkey
+ Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8("%BRASS")
+ TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE Utf8("%BRASS")]
+ TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
+ TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]
+ TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
+ Filter: region.r_name = Utf8("EUROPE")
+ TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]
+ SubqueryAlias: __scalar_sq_1
+ Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value
+ Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]
+ Inner Join: nation.n_regionkey = region.r_regionkey
+ Inner Join: supplier.s_nationkey = nation.n_nationkey
+ Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
+ TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
+ TableScan: supplier projection=[s_suppkey, s_nationkey]
+ TableScan: nation projection=[n_nationkey, n_regionkey]
+ Filter: region.r_name = Utf8("EUROPE")
+ TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]"#;
assert_eq!(actual, expected);
// assert data
@@ -230,7 +230,6 @@ async fn tpch_q4_correlated() -> Result<()> {
Ok(())
}
-#[ignore] // https://github.com/apache/arrow-datafusion/issues/3437
#[tokio::test]
async fn tpch_q17_correlated() -> Result<()> {
let parts = r#"63700,goldenrod lavender spring chocolate lace,Manufacturer#1,Brand#23,PROMO BURNISHED COPPER,7,MED BOX,901.00,ly. slyly ironi
@@ -255,17 +254,18 @@ async fn tpch_q17_correlated() -> Result<()> {
let dataframe = ctx.sql(sql).await.unwrap();
let plan = dataframe.into_optimized_plan().unwrap();
let actual = format!("{}", plan.display_indent());
- let expected = r#"Projection: CAST(SUM(lineitem.l_extendedprice) AS Decimal128(38, 33)) / CAST(Float64(7) AS Decimal128(38, 33)) AS avg_yearly
+ let expected = r#"Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly
Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]]
- Filter: CAST(lineitem.l_quantity AS Decimal128(38, 21)) < __sq_1.__value
- Inner Join: part.p_partkey = __sq_1.l_partkey
+ Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15))
+ Inner Join: part.p_partkey = __scalar_sq_1.l_partkey, lineitem.l_partkey = __scalar_sq_1.l_partkey
Inner Join: lineitem.l_partkey = part.p_partkey
TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]
Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX")
TableScan: part projection=[p_partkey, p_brand, p_container]
- Projection: lineitem.l_partkey, CAST(Float64(0.2) AS Decimal128(38, 21)) * CAST(AVG(lineitem.l_quantity) AS Decimal128(38, 21)) AS __value, alias=__sq_1
- Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]]
- TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]"#
+ SubqueryAlias: __scalar_sq_1
+ Projection: lineitem.l_partkey, Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS __value
+ Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]]
+ TableScan: lineitem projection=[l_partkey, l_quantity]"#
.to_string();
assert_eq!(actual, expected);
@@ -275,7 +275,7 @@ async fn tpch_q17_correlated() -> Result<()> {
"+--------------------+",
"| avg_yearly |",
"+--------------------+",
- "| 1901.3714285714286 |",
+ "| 190.13714285714286 |",
"+--------------------+",
];
assert_batches_eq!(expected, &results);
@@ -309,28 +309,28 @@ order by s_name;
let dataframe = ctx.sql(sql).await.unwrap();
let plan = dataframe.into_optimized_plan().unwrap();
let actual = format!("{}", plan.display_indent());
- let expected = "Sort: supplier.s_name ASC NULLS LAST\
- \n Projection: supplier.s_name, supplier.s_address\
- \n LeftSemi Join: supplier.s_suppkey = __sq_1.ps_suppkey\
- \n Inner Join: supplier.s_nationkey = nation.n_nationkey\
- \n TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey]\
- \n Filter: nation.n_name = Utf8(\"CANADA\")\
- \n TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"CANADA\")]\
- \n SubqueryAlias: __sq_1\
- \n Projection: partsupp.ps_suppkey AS ps_suppkey\
- \n Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value\
- \n Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey\
- \n LeftSemi Join: partsupp.ps_partkey = __sq_2.p_partkey\
- \n TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]\
- \n SubqueryAlias: __sq_2\
- \n Projection: part.p_partkey AS p_partkey\
- \n Filter: part.p_name LIKE Utf8(\"forest%\")\
- \n TableScan: part projection=[p_partkey, p_name], partial_filters=[part.p_name LIKE Utf8(\"forest%\")]\
- \n SubqueryAlias: __sq_3\
- \n Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value\
- \n Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]\
- \n Filter: lineitem.l_shipdate >= Date32(\"8766\")\
- \n TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], partial_filters=[lineitem.l_shipdate >= Date32(\"8766\")]";
+ let expected = r#"Sort: supplier.s_name ASC NULLS LAST
+ Projection: supplier.s_name, supplier.s_address
+ LeftSemi Join: supplier.s_suppkey = __correlated_sq_1.ps_suppkey
+ Inner Join: supplier.s_nationkey = nation.n_nationkey
+ TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey]
+ Filter: nation.n_name = Utf8("CANADA")
+ TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("CANADA")]
+ SubqueryAlias: __correlated_sq_1
+ Projection: partsupp.ps_suppkey AS ps_suppkey
+ Filter: CAST(partsupp.ps_availqty AS Float64) > __scalar_sq_1.__value
+ Inner Join: partsupp.ps_partkey = __scalar_sq_1.l_partkey, partsupp.ps_suppkey = __scalar_sq_1.l_suppkey
+ LeftSemi Join: partsupp.ps_partkey = __correlated_sq_2.p_partkey
+ TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]
+ SubqueryAlias: __correlated_sq_2
+ Projection: part.p_partkey AS p_partkey
+ Filter: part.p_name LIKE Utf8("forest%")
+ TableScan: part projection=[p_partkey, p_name], partial_filters=[part.p_name LIKE Utf8("forest%")]
+ SubqueryAlias: __scalar_sq_1
+ Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value
+ Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]
+ Filter: lineitem.l_shipdate >= Date32("8766")
+ TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], partial_filters=[lineitem.l_shipdate >= Date32("8766")]"#;
assert_eq!(actual, expected);
// assert data
@@ -364,22 +364,22 @@ order by cntrycode;"#;
let dataframe = ctx.sql(sql).await.unwrap();
let plan = dataframe.into_optimized_plan().unwrap();
let actual = format!("{}", plan.display_indent());
- let expected = "Sort: custsale.cntrycode ASC NULLS LAST\
- \n Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal\
- \n Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]]\
- \n SubqueryAlias: custsale\
- \n Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal\
- \n Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value\
- \n CrossJoin:\
- \n LeftAnti Join: customer.c_custkey = orders.o_custkey\
- \n Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])\
- \n TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])]\
- \n TableScan: orders projection=[o_custkey]\
- \n SubqueryAlias: __sq_1\
- \n Projection: AVG(customer.c_acctbal) AS __value\
- \n Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]\
- \n Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])\
- \n TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[customer.c_acctbal > Decimal128(Some(0),15,2) AS customer.c_acctbal > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")]), customer.c_acctbal > Decimal128(Some(0),15,2)]";
+ let expected = r#"Sort: custsale.cntrycode ASC NULLS LAST
+ Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal
+ Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]]
+ SubqueryAlias: custsale
+ Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal
+ Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __scalar_sq_1.__value
+ CrossJoin:
+ LeftAnti Join: customer.c_custkey = orders.o_custkey
+ Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
+ TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])]
+ TableScan: orders projection=[o_custkey]
+ SubqueryAlias: __scalar_sq_1
+ Projection: AVG(customer.c_acctbal) AS __value
+ Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]
+ Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
+ TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[customer.c_acctbal > Decimal128(Some(0),15,2) AS customer.c_acctbal > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]), customer.c_acctbal > Decimal128(Some(0),15,2)]"#;
assert_eq!(expected, actual);
// assert data
@@ -420,26 +420,26 @@ order by value desc;
let dataframe = ctx.sql(sql).await.unwrap();
let plan = dataframe.into_optimized_plan().unwrap();
let actual = format!("{}", plan.display_indent());
- let expected = "Sort: value DESC NULLS FIRST\
- \n Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value\
- \n Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__sq_1.__value AS Decimal128(38, 15))\
- \n CrossJoin:\
- \n Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]\
- \n Inner Join: supplier.s_nationkey = nation.n_nationkey\
- \n Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\
- \n TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]\
- \n TableScan: supplier projection=[s_suppkey, s_nationkey]\
- \n Filter: nation.n_name = Utf8(\"GERMANY\")\
- \n TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"GERMANY\")]\
- \n SubqueryAlias: __sq_1\
- \n Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value\
- \n Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]\
- \n Inner Join: supplier.s_nationkey = nation.n_nationkey\
- \n Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\
- \n TableScan: partsupp projection=[ps_suppkey, ps_availqty, ps_supplycost]\
- \n TableScan: supplier projection=[s_suppkey, s_nationkey]\
- \n Filter: nation.n_name = Utf8(\"GERMANY\")\
- \n TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"GERMANY\")]";
+ let expected = r#"Sort: value DESC NULLS FIRST
+ Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value
+ Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__scalar_sq_1.__value AS Decimal128(38, 15))
+ CrossJoin:
+ Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
+ Inner Join: supplier.s_nationkey = nation.n_nationkey
+ Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
+ TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
+ TableScan: supplier projection=[s_suppkey, s_nationkey]
+ Filter: nation.n_name = Utf8("GERMANY")
+ TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]
+ SubqueryAlias: __scalar_sq_1
+ Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value
+ Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
+ Inner Join: supplier.s_nationkey = nation.n_nationkey
+ Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
+ TableScan: partsupp projection=[ps_suppkey, ps_availqty, ps_supplycost]
+ TableScan: supplier projection=[s_suppkey, s_nationkey]
+ Filter: nation.n_name = Utf8("GERMANY")
+ TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]"#;
assert_eq!(actual, expected);
// assert data
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/alias.rs
similarity index 50%
copy from datafusion/optimizer/src/lib.rs
copy to datafusion/optimizer/src/alias.rs
index 27e6dff08..70fdeb7ab 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/alias.rs
@@ -15,31 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-pub mod common_subexpr_eliminate;
-pub mod decorrelate_where_exists;
-pub mod decorrelate_where_in;
-pub mod eliminate_cross_join;
-pub mod eliminate_filter;
-pub mod eliminate_limit;
-pub mod eliminate_outer_join;
-pub mod extract_equijoin_predicate;
-pub mod filter_null_join_keys;
-pub mod inline_table_scan;
-pub mod optimizer;
-pub mod propagate_empty_relation;
-pub mod push_down_filter;
-pub mod push_down_limit;
-pub mod push_down_projection;
-pub mod scalar_subquery_to_join;
-pub mod simplify_expressions;
-pub mod single_distinct_to_groupby;
-pub mod type_coercion;
-pub mod utils;
+use std::sync::atomic::{AtomicUsize, Ordering};
-pub mod rewrite_disjunctive_predicate;
-#[cfg(test)]
-pub mod test;
-pub mod unwrap_cast_in_comparison;
+/// A utility struct that can be used to generate unique aliases when optimizing queries
+pub struct AliasGenerator {
+ next_id: AtomicUsize,
+}
-pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
-pub use utils::optimize_children;
+impl Default for AliasGenerator {
+ fn default() -> Self {
+ Self {
+ next_id: AtomicUsize::new(1),
+ }
+ }
+}
+
+impl AliasGenerator {
+ /// Create a new [`AliasGenerator`]
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Return a unique alias with the provided prefix
+ pub fn next(&self, prefix: &str) -> String {
+ let id = self.next_id.fetch_add(1, Ordering::Relaxed);
+ format!("{}_{}", prefix, id)
+ }
+}
diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs
index c2a80ac2b..1aa976ce8 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::alias::AliasGenerator;
use crate::optimizer::ApplyOrder;
use crate::utils::{
alias_cols, conjunction, exprs_to_join_cols, find_join_exprs, merge_cols,
@@ -28,12 +29,14 @@ use log::debug;
use std::sync::Arc;
#[derive(Default)]
-pub struct DecorrelateWhereIn {}
+pub struct DecorrelateWhereIn {
+ alias: AliasGenerator,
+}
impl DecorrelateWhereIn {
#[allow(missing_docs)]
pub fn new() -> Self {
- Self {}
+ Self::default()
}
/// Finds expressions that have a where in subquery (and recurses when found)
@@ -96,8 +99,12 @@ impl OptimizerRule for DecorrelateWhereIn {
// iterate through all exists clauses in predicate, turning each into a join
let mut cur_input = filter.input.as_ref().clone();
for subquery in subqueries {
- cur_input =
- optimize_where_in(&subquery, &cur_input, &other_exprs, config)?;
+ cur_input = optimize_where_in(
+ &subquery,
+ &cur_input,
+ &other_exprs,
+ &self.alias,
+ )?;
}
Ok(Some(cur_input))
}
@@ -118,8 +125,8 @@ fn optimize_where_in(
query_info: &SubqueryInfo,
outer_input: &LogicalPlan,
outer_other_exprs: &[Expr],
- config: &dyn OptimizerConfig,
-) -> datafusion_common::Result<LogicalPlan> {
+ alias: &AliasGenerator,
+) -> Result<LogicalPlan> {
let proj = Projection::try_from_plan(&query_info.query.subquery)
.map_err(|e| context!("a projection is required", e))?;
let mut subqry_input = proj.input.clone();
@@ -161,7 +168,7 @@ fn optimize_where_in(
merge_cols((&[subquery_col], &subqry_cols), (&[outer_col], &outer_cols));
// build subquery side of join - the thing the subquery was querying
- let subqry_alias = format!("__sq_{}", config.next_id());
+ let subqry_alias = alias.next("__correlated_sq");
let mut subqry_plan = LogicalPlanBuilder::from((*subqry_input).clone());
if let Some(expr) = conjunction(other_subqry_exprs) {
// if the subquery had additional expressions, restore them
@@ -256,13 +263,13 @@ mod tests {
.build()?;
let expected = "Projection: test.b [b:UInt32]\
- \n LeftSemi Join: test.b = __sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
- \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.b = __correlated_sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n SubqueryAlias: __sq_1 [c:UInt32]\
+ \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
\n Projection: sq_1.c AS c [c:UInt32]\
\n TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\
- \n SubqueryAlias: __sq_2 [c:UInt32]\
+ \n SubqueryAlias: __correlated_sq_2 [c:UInt32]\
\n Projection: sq_2.c AS c [c:UInt32]\
\n TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]";
@@ -286,9 +293,9 @@ mod tests {
let expected = "Projection: test.b [b:UInt32]\
\n Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\
- \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n SubqueryAlias: __sq_1 [c:UInt32]\
+ \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
\n Projection: sq.c AS c [c:UInt32]\
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
@@ -340,9 +347,9 @@ mod tests {
\n Subquery: [c:UInt32]\
\n Projection: sq1.c [c:UInt32]\
\n TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\
- \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n SubqueryAlias: __sq_1 [c:UInt32]\
+ \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
\n Projection: sq2.c AS c [c:UInt32]\
\n TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]";
@@ -365,13 +372,13 @@ mod tests {
.build()?;
let expected = "Projection: test.b [b:UInt32]\
- \n LeftSemi Join: test.b = __sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.b = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n SubqueryAlias: __sq_1 [a:UInt32]\
+ \n SubqueryAlias: __correlated_sq_1 [a:UInt32]\
\n Projection: sq.a AS a [a:UInt32]\
- \n LeftSemi Join: sq.a = __sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: sq.a = __correlated_sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
- \n SubqueryAlias: __sq_2 [c:UInt32]\
+ \n SubqueryAlias: __correlated_sq_2 [c:UInt32]\
\n Projection: sq_nested.c AS c [c:UInt32]\
\n TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]";
@@ -401,9 +408,9 @@ mod tests {
\n TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\
\n SubqueryAlias: wrapped [b:UInt32, c:UInt32]\
\n Projection: test.b, test.c [b:UInt32, c:UInt32]\
- \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n SubqueryAlias: __sq_1 [c:UInt32]\
+ \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
\n Projection: sq_inner.c AS c [c:UInt32]\
\n TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]";
@@ -436,13 +443,13 @@ mod tests {
debug!("plan to optimize:\n{}", plan.display_indent());
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
- \n LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]\
- \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+ \n LeftSemi Join: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]\
+ \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [o_custkey:Int64]\
+ \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
\n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
- \n SubqueryAlias: __sq_2 [o_custkey:Int64]\n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
+ \n SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]\n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
assert_optimized_plan_eq_display_indent(
Arc::new(DecorrelateWhereIn::new()),
@@ -479,13 +486,13 @@ mod tests {
.build()?;
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
- \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+ \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [o_custkey:Int64]\
+ \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
\n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
- \n LeftSemi Join: orders.o_orderkey = __sq_2.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
+ \n LeftSemi Join: orders.o_orderkey = __correlated_sq_2.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
- \n SubqueryAlias: __sq_2 [l_orderkey:Int64]\
+ \n SubqueryAlias: __correlated_sq_2 [l_orderkey:Int64]\
\n Projection: lineitem.l_orderkey AS l_orderkey [l_orderkey:Int64]\
\n TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]";
@@ -517,9 +524,9 @@ mod tests {
.build()?;
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
- \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+ \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [o_custkey:Int64]\
+ \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
\n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
\n Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -549,9 +556,9 @@ mod tests {
// Query will fail, but we can still transform the plan
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
- \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+ \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [o_custkey:Int64]\
+ \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
\n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
\n Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -580,9 +587,9 @@ mod tests {
.build()?;
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
- \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+ \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [o_custkey:Int64]\
+ \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
\n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
\n Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -611,9 +618,9 @@ mod tests {
.build()?;
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
- \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey Filter: customer.c_custkey != orders.o_custkey [c_custkey:Int64, c_name:Utf8]\
+ \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey Filter: customer.c_custkey != orders.o_custkey [c_custkey:Int64, c_name:Utf8]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [o_custkey:Int64]\
+ \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
\n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -793,9 +800,9 @@ mod tests {
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
\n Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]\
- \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+ \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [o_custkey:Int64]\
+ \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
\n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -858,9 +865,9 @@ mod tests {
.build()?;
let expected = "Projection: test.b [b:UInt32]\
- \n LeftSemi Join: test.c = __sq_1.c, test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.c = __correlated_sq_1.c, test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n SubqueryAlias: __sq_1 [c:UInt32, a:UInt32]\
+ \n SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]\
\n Projection: sq.c AS c, sq.a AS a [c:UInt32, a:UInt32]\
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
@@ -882,9 +889,9 @@ mod tests {
.build()?;
let expected = "Projection: test.b [b:UInt32]\
- \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n SubqueryAlias: __sq_1 [c:UInt32]\
+ \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
\n Projection: sq.c AS c [c:UInt32]\
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
@@ -906,9 +913,9 @@ mod tests {
.build()?;
let expected = "Projection: test.b [b:UInt32]\
- \n LeftAnti Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftAnti Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n SubqueryAlias: __sq_1 [c:UInt32]\
+ \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
\n Projection: sq.c AS c [c:UInt32]\
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs
index 27e6dff08..cd743fcda 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/lib.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+pub mod alias;
pub mod common_subexpr_eliminate;
pub mod decorrelate_where_exists;
pub mod decorrelate_where_in;
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 5d2dd45af..7383b311c 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -41,7 +41,6 @@ use chrono::{DateTime, Utc};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::logical_plan::LogicalPlan;
use log::{debug, trace, warn};
-use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Instant;
@@ -83,11 +82,6 @@ pub trait OptimizerConfig {
/// How many times to attempt to optimize the plan
fn max_passes(&self) -> u8;
-
- /// Return a unique ID
- ///
- /// This is useful for assigning unique names to aliases
- fn next_id(&self) -> usize;
}
/// A standalone [`OptimizerConfig`] that can be used independently
@@ -97,8 +91,6 @@ pub struct OptimizerContext {
/// Query execution start time that can be used to rewrite
/// expressions such as `now()` to use a literal value instead
query_execution_start_time: DateTime<Utc>,
- /// id generator for optimizer passes
- next_id: AtomicUsize,
/// Option to skip rules that produce errors
skip_failing_rules: bool,
/// Specify whether to enable the filter_null_keys rule
@@ -112,7 +104,6 @@ impl OptimizerContext {
pub fn new() -> Self {
Self {
query_execution_start_time: Utc::now(),
- next_id: AtomicUsize::new(1),
skip_failing_rules: true,
filter_null_keys: true,
max_passes: 3,
@@ -172,12 +163,6 @@ impl OptimizerConfig for OptimizerContext {
fn max_passes(&self) -> u8 {
self.max_passes
}
-
- fn next_id(&self) -> usize {
- use std::sync::atomic::Ordering;
- // Can use relaxed ordering as not used for synchronisation
- self.next_id.fetch_add(1, Ordering::Relaxed)
- }
}
/// A rule-based optimizer.
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index c0ea975f3..51c4142af 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::alias::AliasGenerator;
use crate::optimizer::ApplyOrder;
use crate::utils::{
conjunction, exprs_to_join_cols, find_join_exprs, only_or_err, split_conjunction,
@@ -30,12 +31,14 @@ use std::sync::Arc;
/// Optimizer rule for rewriting subquery filters to joins
#[derive(Default)]
-pub struct ScalarSubqueryToJoin {}
+pub struct ScalarSubqueryToJoin {
+ alias: Arc<AliasGenerator>,
+}
impl ScalarSubqueryToJoin {
#[allow(missing_docs)]
pub fn new() -> Self {
- Self {}
+ Self::default()
}
/// Finds expressions that have a scalar subquery in them (and recurses when found)
@@ -110,7 +113,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
let mut cur_input = filter.input.as_ref().clone();
for subquery in subqueries {
if let Some(optimized_subquery) =
- optimize_scalar(&subquery, &cur_input, &other_exprs, config)?
+ optimize_scalar(&subquery, &cur_input, &other_exprs, &self.alias)?
{
cur_input = optimized_subquery;
} else {
@@ -173,7 +176,7 @@ fn optimize_scalar(
query_info: &SubqueryInfo,
filter_input: &LogicalPlan,
outer_others: &[Expr],
- config: &dyn OptimizerConfig,
+ alias: &AliasGenerator,
) -> Result<Option<LogicalPlan>> {
let subquery = query_info.query.subquery.as_ref();
debug!(
@@ -242,7 +245,7 @@ fn optimize_scalar(
}
// Only operate if one column is present and the other closed upon from outside scope
- let subqry_alias = format!("__sq_{}", config.next_id());
+ let subqry_alias = alias.next("__scalar_sq");
let group_by: Vec<_> = subqry_cols
.iter()
.map(|it| Expr::Column(it.clone()))
@@ -386,16 +389,16 @@ mod tests {
.build()?;
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
- \n Filter: Int32(1) < __sq_2.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\
- \n Inner Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\
- \n Filter: Int32(1) < __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
- \n Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+ \n Filter: Int32(1) < __scalar_sq_2.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\
+ \n Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\
+ \n Filter: Int32(1) < __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+ \n Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\
+ \n SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Int64;N]\
\n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
\n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
- \n SubqueryAlias: __sq_2 [o_custkey:Int64, __value:Int64;N]\
+ \n SubqueryAlias: __scalar_sq_2 [o_custkey:Int64, __value:Int64;N]\
\n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
\n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -439,16 +442,16 @@ mod tests {
.build()?;
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
- \n Filter: customer.c_acctbal < __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\
- \n Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\
+ \n Filter: customer.c_acctbal < __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\
+ \n Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Float64;N]\
+ \n SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Float64;N]\
\n Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value [o_custkey:Int64, __value:Float64;N]\
\n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]] [o_custkey:Int64, SUM(orders.o_totalprice):Float64;N]\
- \n Filter: orders.o_totalprice < __sq_2.__value [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\
- \n Inner Join: orders.o_orderkey = __sq_2.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\
+ \n Filter: orders.o_totalprice < __scalar_sq_2.__value [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\
+ \n Inner Join: orders.o_orderkey = __scalar_sq_2.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
- \n SubqueryAlias: __sq_2 [l_orderkey:Int64, __value:Float64;N]\
+ \n SubqueryAlias: __scalar_sq_2 [l_orderkey:Int64, __value:Float64;N]\
\n Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS __value [l_orderkey:Int64, __value:Float64;N]\
\n Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]] [l_orderkey:Int64, SUM(lineitem.l_extendedprice):Float64;N]\
\n TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]";
@@ -481,9 +484,9 @@ mod tests {
.build()?;
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
- \n Inner Join: customer.c_custkey = __sq_1.o_custkey, customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+ \n Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey, customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\
+ \n SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Int64;N]\
\n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
\n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
\n Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
@@ -515,10 +518,10 @@ mod tests {
// it will optimize, but fail for the same reason the unoptimized query would
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
- \n Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+ \n Filter: customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
\n CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [__value:Int64;N]\
+ \n SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\
\n Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
\n Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
\n Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
@@ -548,10 +551,10 @@ mod tests {
.build()?;
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
- \n Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+ \n Filter: customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
\n CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [__value:Int64;N]\
+ \n SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\
\n Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
\n Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
\n Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
@@ -739,10 +742,10 @@ mod tests {
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
\n Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
- \n Filter: customer.c_custkey >= __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
- \n Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+ \n Filter: customer.c_custkey >= __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+ \n Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\
+ \n SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Int64;N]\
\n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
\n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -776,9 +779,9 @@ mod tests {
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
\n Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
- \n Inner Join: customer.c_custkey = __sq_1.o_custkey, customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+ \n Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey, customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\
+ \n SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Int64;N]\
\n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
\n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -845,10 +848,10 @@ mod tests {
.build()?;
let expected = "Projection: test.c [c:UInt32]\
- \n Filter: test.c < __sq_1.__value [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\
- \n Inner Join: test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\
+ \n Filter: test.c < __scalar_sq_1.__value [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\
+ \n Inner Join: test.a = __scalar_sq_1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n SubqueryAlias: __sq_1 [a:UInt32, __value:UInt32;N]\
+ \n SubqueryAlias: __scalar_sq_1 [a:UInt32, __value:UInt32;N]\
\n Projection: sq.a, MIN(sq.c) AS __value [a:UInt32, __value:UInt32;N]\
\n Aggregate: groupBy=[[sq.a]], aggr=[[MIN(sq.c)]] [a:UInt32, MIN(sq.c):UInt32;N]\
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
@@ -877,10 +880,10 @@ mod tests {
.build()?;
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
- \n Filter: customer.c_custkey < __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+ \n Filter: customer.c_custkey < __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
\n CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [__value:Int64;N]\
+ \n SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\
\n Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
\n Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -908,10 +911,10 @@ mod tests {
.build()?;
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
- \n Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+ \n Filter: customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
\n CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
- \n SubqueryAlias: __sq_1 [__value:Int64;N]\
+ \n SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\
\n Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
\n Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
diff --git a/datafusion/optimizer/tests/integration-test.rs b/datafusion/optimizer/tests/integration-test.rs
index 3a09af2f2..7d5c12181 100644
--- a/datafusion/optimizer/tests/integration-test.rs
+++ b/datafusion/optimizer/tests/integration-test.rs
@@ -64,10 +64,10 @@ fn subquery_filter_with_cast() -> Result<()> {
)";
let plan = test_sql(sql)?;
let expected = "Projection: test.col_int32\
- \n Filter: CAST(test.col_int32 AS Float64) > __sq_1.__value\
+ \n Filter: CAST(test.col_int32 AS Float64) > __scalar_sq_1.__value\
\n CrossJoin:\
\n TableScan: test projection=[col_int32]\
- \n SubqueryAlias: __sq_1\
+ \n SubqueryAlias: __scalar_sq_1\
\n Projection: AVG(test.col_int32) AS __value\
\n Aggregate: groupBy=[[]], aggr=[[AVG(test.col_int32)]]\
\n Filter: test.col_utf8 >= Utf8(\"2002-05-08\") AND test.col_utf8 <= Utf8(\"2002-05-13\")\