You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/04/20 07:20:49 UTC
[arrow-datafusion] branch main updated: Decimal multiply kernel should not cause precision loss (#5980)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e81f54b278 Decimal multiply kernel should not cause precision loss (#5980)
e81f54b278 is described below
commit e81f54b2786a8ebdbf6dfc1f47ddbed85338be55
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Thu Apr 20 00:20:43 2023 -0700
Decimal multiply kernel should not cause precision loss (#5980)
* Init
* More
* More
* More
* Fix
* fix
* More
* Fix
* Add query comment
* Update expected plans
* Fix
* Fix clippy
* Fix
* Fix
* Fix
* Fix
* Enable verify_q6 test
---
benchmarks/expected-plans/q1.txt | 46 ++--
benchmarks/expected-plans/q10.txt | 126 ++++-----
benchmarks/expected-plans/q11.txt | 4 +-
benchmarks/expected-plans/q14.txt | 60 ++--
benchmarks/expected-plans/q15.txt | 132 ++++-----
benchmarks/expected-plans/q19.txt | 2 +-
benchmarks/expected-plans/q3.txt | 110 ++++----
benchmarks/expected-plans/q5.txt | 170 ++++++------
benchmarks/expected-plans/q7.txt | 4 +-
benchmarks/expected-plans/q8.txt | 8 +-
benchmarks/expected-plans/q9.txt | 158 +++++------
benchmarks/queries/q8.sql | 8 +-
benchmarks/src/bin/tpch.rs | 1 -
.../tests/sqllogictests/src/engines/conversion.rs | 5 +-
.../src/engines/datafusion/normalize.rs | 5 +-
.../core/tests/sqllogictests/test_files/tpch.slt | 6 +-
datafusion/expr/src/type_coercion/binary.rs | 305 +++++++++++++++++----
datafusion/expr/src/type_coercion/mod.rs | 5 +
datafusion/optimizer/src/analyzer/type_coercion.rs | 29 +-
datafusion/physical-expr/src/expressions/binary.rs | 201 ++++++++++----
.../src/expressions/binary/kernels_arrow.rs | 212 ++++++++++----
21 files changed, 1005 insertions(+), 592 deletions(-)
diff --git a/benchmarks/expected-plans/q1.txt b/benchmarks/expected-plans/q1.txt
index e02c1402a3..c45329cb11 100644
--- a/benchmarks/expected-plans/q1.txt
+++ b/benchmarks/expected-plans/q1.txt
@@ -1,23 +1,23 @@
-+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
-| plan_type | plan [...]
-+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
-| logical_plan | Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST [...]
-| | Projection: lineitem.l_returnflag, lineitem.l_linestatus, SUM(lineitem.l_quantity) AS sum_qty, SUM(lineitem.l_extendedprice) AS sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(UInt8(1)) AS co [...]
-| | Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) [...]
-| | Projection: CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(line [...]
-| | Filter: lineitem.l_shipdate <= Date32("10471") [...]
-| | TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate] [...]
-| physical_plan | SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST] [...]
-| | SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST] [...]
-| | ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, SUM(lineitem.l_extendedprice)@3 as sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, AVG(lineitem.l_quantity)@6 as avg_qty, AVG(lineitem.l_extendedprice)@7 as avg_price, AVG(lineitem.l_di [...]
-| | AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))] [...]
-| | CoalesceBatchesExec: target_batch_size=8192 [...]
-| | RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2), input_partitions=2 [...]
-| | AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))] [...]
-| | ProjectionExec: expr=[CAST(l_extendedprice@1 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@2 AS Decimal128(23, 2)) AS Decimal128(38, 4)) as CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_d [...]
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 [...]
-| | CoalesceBatchesExec: target_batch_size=8192 [...]
-| | FilterExec: l_shipdate@6 <= 10471 [...]
-| | MemoryExec: partitions=0, partition_sizes=[] [...]
-| | [...]
-+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
\ No newline at end of file
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| plan_type | plan [...]
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| logical_plan | Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST [...]
+| | Projection: lineitem.l_returnflag, lineitem.l_linestatus, SUM(lineitem.l_quantity) AS sum_qty, SUM(lineitem.l_extendedprice) AS sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(UInt8(1)) AS co [...]
+| | Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), [...]
+| | Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus [...]
+| | Filter: lineitem.l_shipdate <= Date32("10471") [...]
+| | TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate] [...]
+| physical_plan | SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST] [...]
+| | SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST] [...]
+| | ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, SUM(lineitem.l_extendedprice)@3 as sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, AVG(lineitem.l_quantity)@6 as avg_qty, AVG(lineitem.l_extendedprice)@7 as avg_price, AVG(lineitem.l_di [...]
+| | AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))] [...]
+| | CoalesceBatchesExec: target_batch_size=8192 [...]
+| | RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2), input_partitions=2 [...]
+| | AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))] [...]
+| | ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus] [...]
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 [...]
+| | CoalesceBatchesExec: target_batch_size=8192 [...]
+| | FilterExec: l_shipdate@6 <= 10471 [...]
+| | MemoryExec: partitions=0, partition_sizes=[] [...]
+| | [...]
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q10.txt b/benchmarks/expected-plans/q10.txt
index 794ab50938..3be2e9592e 100644
--- a/benchmarks/expected-plans/q10.txt
+++ b/benchmarks/expected-plans/q10.txt
@@ -1,63 +1,63 @@
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type | plan |
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan | Sort: revenue DESC NULLS FIRST |
-| | Projection: customer.c_custkey, customer.c_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue, customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone, customer.c_comment |
-| | Aggregate: groupBy=[[customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
-| | Projection: customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name |
-| | Inner Join: customer.c_nationkey = nation.n_nationkey |
-| | Projection: customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment, lineitem.l_extendedprice, lineitem.l_discount |
-| | Inner Join: orders.o_orderkey = lineitem.l_orderkey |
-| | Projection: customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment, orders.o_orderkey |
-| | Inner Join: customer.c_custkey = orders.o_custkey |
-| | TableScan: customer projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_comment] |
-| | Projection: orders.o_orderkey, orders.o_custkey |
-| | Filter: orders.o_orderdate >= Date32("8674") AND orders.o_orderdate < Date32("8766") |
-| | TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate] |
-| | Projection: lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount |
-| | Filter: lineitem.l_returnflag = Utf8("R") |
-| | TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_returnflag] |
-| | TableScan: nation projection=[n_nationkey, n_name] |
-| physical_plan | SortPreservingMergeExec: [revenue@2 DESC] |
-| | SortExec: expr=[revenue@2 DESC] |
-| | ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@7 as revenue, c_acctbal@2 as c_acctbal, n_name@4 as n_name, c_address@5 as c_address, c_phone@3 as c_phone, c_comment@6 as c_comment] |
-| | AggregateExec: mode=FinalPartitioned, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@2 as c_acctbal, c_phone@3 as c_phone, n_name@4 as n_name, c_address@5 as c_address, c_comment@6 as c_comment], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }, Column { name: "c_name", index: 1 }, Column { name: "c_acctbal", index: 2 }, Column { name: "c_phone", index: 3 }, Column { name: "n_name", index: 4 }, Column { name: "c_address", index: 5 }, Column { name: "c_comment", index: 6 }], 2), input_partitions=2 |
-| | AggregateExec: mode=Partial, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@4 as c_acctbal, c_phone@3 as c_phone, n_name@8 as n_name, c_address@2 as c_address, c_comment@5 as c_comment], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
-| | ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, l_extendedprice@7 as l_extendedprice, l_discount@8 as l_discount, n_name@10 as n_name] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_nationkey", index: 3 }, Column { name: "n_nationkey", index: 0 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "c_nationkey", index: 3 }], 2), input_partitions=2 |
-| | ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_nationkey@3 as c_nationkey, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, l_extendedprice@9 as l_extendedprice, l_discount@10 as l_discount] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 7 }, Column { name: "l_orderkey", index: 0 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 7 }], 2), input_partitions=2 |
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2 |
-| | ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_nationkey@3 as c_nationkey, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, o_orderkey@7 as o_orderkey] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2), input_partitions=0 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 2), input_partitions=2 |
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
-| | ProjectionExec: expr=[o_orderkey@0 as o_orderkey, o_custkey@1 as o_custkey] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | FilterExec: o_orderdate@2 >= 8674 AND o_orderdate@2 < 8766 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=2 |
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
-| | ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | FilterExec: l_returnflag@3 = R |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=0 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | |
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type | plan |
++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan | Sort: revenue DESC NULLS FIRST |
+| | Projection: customer.c_custkey, customer.c_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue, customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone, customer.c_comment |
+| | Aggregate: groupBy=[[customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment]], aggr=[[SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
+| | Projection: customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name |
+| | Inner Join: customer.c_nationkey = nation.n_nationkey |
+| | Projection: customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment, lineitem.l_extendedprice, lineitem.l_discount |
+| | Inner Join: orders.o_orderkey = lineitem.l_orderkey |
+| | Projection: customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment, orders.o_orderkey |
+| | Inner Join: customer.c_custkey = orders.o_custkey |
+| | TableScan: customer projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_comment] |
+| | Projection: orders.o_orderkey, orders.o_custkey |
+| | Filter: orders.o_orderdate >= Date32("8674") AND orders.o_orderdate < Date32("8766") |
+| | TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate] |
+| | Projection: lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount |
+| | Filter: lineitem.l_returnflag = Utf8("R") |
+| | TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_returnflag] |
+| | TableScan: nation projection=[n_nationkey, n_name] |
+| physical_plan | SortPreservingMergeExec: [revenue@2 DESC] |
+| | SortExec: expr=[revenue@2 DESC] |
+| | ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@7 as revenue, c_acctbal@2 as c_acctbal, n_name@4 as n_name, c_address@5 as c_address, c_phone@3 as c_phone, c_comment@6 as c_comment] |
+| | AggregateExec: mode=FinalPartitioned, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@2 as c_acctbal, c_phone@3 as c_phone, n_name@4 as n_name, c_address@5 as c_address, c_comment@6 as c_comment], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }, Column { name: "c_name", index: 1 }, Column { name: "c_acctbal", index: 2 }, Column { name: "c_phone", index: 3 }, Column { name: "n_name", index: 4 }, Column { name: "c_address", index: 5 }, Column { name: "c_comment", index: 6 }], 2), input_partitions=2 |
+| | AggregateExec: mode=Partial, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@4 as c_acctbal, c_phone@3 as c_phone, n_name@8 as n_name, c_address@2 as c_address, c_comment@5 as c_comment], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
+| | ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, l_extendedprice@7 as l_extendedprice, l_discount@8 as l_discount, n_name@10 as n_name] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_nationkey", index: 3 }, Column { name: "n_nationkey", index: 0 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "c_nationkey", index: 3 }], 2), input_partitions=2 |
+| | ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_nationkey@3 as c_nationkey, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, l_extendedprice@9 as l_extendedprice, l_discount@10 as l_discount] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 7 }, Column { name: "l_orderkey", index: 0 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 7 }], 2), input_partitions=2 |
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2 |
+| | ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_nationkey@3 as c_nationkey, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, o_orderkey@7 as o_orderkey] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2), input_partitions=0 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 2), input_partitions=2 |
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
+| | ProjectionExec: expr=[o_orderkey@0 as o_orderkey, o_custkey@1 as o_custkey] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | FilterExec: o_orderdate@2 >= 8674 AND o_orderdate@2 < 8766 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=2 |
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
+| | ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | FilterExec: l_returnflag@3 = R |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=0 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | |
++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q11.txt b/benchmarks/expected-plans/q11.txt
index 2f87f7c98e..0a732897c3 100644
--- a/benchmarks/expected-plans/q11.txt
+++ b/benchmarks/expected-plans/q11.txt
@@ -5,7 +5,7 @@
| | Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value |
| | Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > __scalar_sq_1.__value |
| | CrossJoin: |
-| | Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]] |
+| | Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(partsupp.ps_supplycost * CAST(partsupp.ps_availqty AS Decimal128(10, 0)))]] |
| | Projection: partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost |
| | Inner Join: supplier.s_nationkey = nation.n_nationkey |
| | Projection: partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey |
@@ -17,7 +17,7 @@
| | TableScan: nation projection=[n_nationkey, n_name] |
| | SubqueryAlias: __scalar_sq_1 |
| | Projection: CAST(CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS Decimal128(38, 15)) AS __value |
-| | Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]] |
+| | Aggregate: groupBy=[[]], aggr=[[SUM(partsupp.ps_supplycost * CAST(partsupp.ps_availqty AS Decimal128(10, 0)))]] |
| | Projection: partsupp.ps_availqty, partsupp.ps_supplycost |
| | Inner Join: supplier.s_nationkey = nation.n_nationkey |
| | Projection: partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey |
diff --git a/benchmarks/expected-plans/q14.txt b/benchmarks/expected-plans/q14.txt
index b1b8a423d0..43eb0ea7f5 100644
--- a/benchmarks/expected-plans/q14.txt
+++ b/benchmarks/expected-plans/q14.txt
@@ -1,30 +1,30 @@
-+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
-| plan_type | plan [...]
-+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
-| logical_plan | Projection: Float64(100) * CAST(SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END) AS Float64) / CAST(SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS Float64) AS promo_revenue [...]
-| | Aggregate: groupBy=[[]], aggr=[[SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2))CAST(lineitem.l_discount AS Decimal128(23, 2))lineitem.l [...]
-| | Projection: CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineit [...]
-| | Inner Join: lineitem.l_partkey = part.p_partkey [...]
-| | Projection: lineitem.l_partkey, lineitem.l_extendedprice, lineitem.l_discount [...]
-| | Filter: lineitem.l_shipdate >= Date32("9374") AND lineitem.l_shipdate < Date32("9404") [...]
-| | TableScan: lineitem projection=[l_partkey, l_extendedprice, l_discount, l_shipdate] [...]
-| | TableScan: part projection=[p_partkey, p_type] [...]
-| physical_plan | ProjectionExec: expr=[100 * CAST(SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END)@0 AS Float64) / CAST(SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 AS Float64) as promo_revenue] [...]
-| | AggregateExec: mode=Final, gby=[], aggr=[SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] [...]
-| | CoalescePartitionsExec [...]
-| | AggregateExec: mode=Partial, gby=[], aggr=[SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] [...]
-| | ProjectionExec: expr=[CAST(l_extendedprice@1 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@2 AS Decimal128(23, 2)) AS Decimal128(38, 4)) as CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_discoun [...]
-| | CoalesceBatchesExec: target_batch_size=8192 [...]
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })] [...]
-| | CoalesceBatchesExec: target_batch_size=8192 [...]
-| | RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=2 [...]
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 [...]
-| | ProjectionExec: expr=[l_partkey@0 as l_partkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount] [...]
-| | CoalesceBatchesExec: target_batch_size=8192 [...]
-| | FilterExec: l_shipdate@3 >= 9374 AND l_shipdate@3 < 9404 [...]
-| | MemoryExec: partitions=0, partition_sizes=[] [...]
-| | CoalesceBatchesExec: target_batch_size=8192 [...]
-| | RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=0 [...]
-| | MemoryExec: partitions=0, partition_sizes=[] [...]
-| | [...]
-+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
\ No newline at end of file
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| plan_type | plan [...]
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| logical_plan | Projection: Float64(100) * CAST(SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END) AS Float64) / CAST(SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS Float64) AS promo_revenue [...]
+| | Aggregate: groupBy=[[]], aggr=[[SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount ELSE Decimal128(Some(0),38,4) END) AS SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64( [...]
+| | Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice, part.p_type [...]
+| | Inner Join: lineitem.l_partkey = part.p_partkey [...]
+| | Projection: lineitem.l_partkey, lineitem.l_extendedprice, lineitem.l_discount [...]
+| | Filter: lineitem.l_shipdate >= Date32("9374") AND lineitem.l_shipdate < Date32("9404") [...]
+| | TableScan: lineitem projection=[l_partkey, l_extendedprice, l_discount, l_shipdate] [...]
+| | TableScan: part projection=[p_partkey, p_type] [...]
+| physical_plan | ProjectionExec: expr=[100 * CAST(SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END)@0 AS Float64) / CAST(SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 AS Float64) as promo_revenue] [...]
+| | AggregateExec: mode=Final, gby=[], aggr=[SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] [...]
+| | CoalescePartitionsExec [...]
+| | AggregateExec: mode=Partial, gby=[], aggr=[SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] [...]
+| | ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice, p_type@4 as p_type] [...]
+| | CoalesceBatchesExec: target_batch_size=8192 [...]
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })] [...]
+| | CoalesceBatchesExec: target_batch_size=8192 [...]
+| | RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=2 [...]
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 [...]
+| | ProjectionExec: expr=[l_partkey@0 as l_partkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount] [...]
+| | CoalesceBatchesExec: target_batch_size=8192 [...]
+| | FilterExec: l_shipdate@3 >= 9374 AND l_shipdate@3 < 9404 [...]
+| | MemoryExec: partitions=0, partition_sizes=[] [...]
+| | CoalesceBatchesExec: target_batch_size=8192 [...]
+| | RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=0 [...]
+| | MemoryExec: partitions=0, partition_sizes=[] [...]
+| | [...]
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q15.txt b/benchmarks/expected-plans/q15.txt
index 50cfe85418..208f4c6690 100644
--- a/benchmarks/expected-plans/q15.txt
+++ b/benchmarks/expected-plans/q15.txt
@@ -1,66 +1,66 @@
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type | plan |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan | Sort: supplier.s_suppkey ASC NULLS LAST |
-| | Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue |
-| | Inner Join: revenue0.total_revenue = __scalar_sq_1.__value |
-| | Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue |
-| | Inner Join: supplier.s_suppkey = revenue0.supplier_no |
-| | TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone] |
-| | SubqueryAlias: revenue0 |
-| | Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue |
-| | Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
-| | Projection: lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount |
-| | Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") |
-| | TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] |
-| | SubqueryAlias: __scalar_sq_1 |
-| | Projection: MAX(revenue0.total_revenue) AS __value |
-| | Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]] |
-| | SubqueryAlias: revenue0 |
-| | Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue |
-| | Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
-| | Projection: lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount |
-| | Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") |
-| | TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] |
-| physical_plan | SortPreservingMergeExec: [s_suppkey@0 ASC NULLS LAST] |
-| | SortExec: expr=[s_suppkey@0 ASC NULLS LAST] |
-| | ProjectionExec: expr=[s_suppkey@0 as s_suppkey, s_name@1 as s_name, s_address@2 as s_address, s_phone@3 as s_phone, total_revenue@4 as total_revenue] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "total_revenue", index: 4 }, Column { name: "__value", index: 0 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "total_revenue", index: 4 }], 2), input_partitions=2 |
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2 |
-| | ProjectionExec: expr=[s_suppkey@0 as s_suppkey, s_name@1 as s_name, s_address@2 as s_address, s_phone@3 as s_phone, total_revenue@5 as total_revenue] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_suppkey", index: 0 }, Column { name: "supplier_no", index: 0 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2), input_partitions=0 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | ProjectionExec: expr=[l_suppkey@0 as supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as total_revenue] |
-| | AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 0 }], 2), input_partitions=2 |
-| | AggregateExec: mode=Partial, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
-| | ProjectionExec: expr=[l_suppkey@0 as l_suppkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | FilterExec: l_shipdate@3 >= 9496 AND l_shipdate@3 < 9587 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "__value", index: 0 }], 2), input_partitions=1 |
-| | ProjectionExec: expr=[MAX(revenue0.total_revenue)@0 as __value] |
-| | AggregateExec: mode=Final, gby=[], aggr=[MAX(revenue0.total_revenue)] |
-| | CoalescePartitionsExec |
-| | AggregateExec: mode=Partial, gby=[], aggr=[MAX(revenue0.total_revenue)] |
-| | ProjectionExec: expr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as total_revenue] |
-| | AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 0 }], 2), input_partitions=2 |
-| | AggregateExec: mode=Partial, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
-| | ProjectionExec: expr=[l_suppkey@0 as l_suppkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | FilterExec: l_shipdate@3 >= 9496 AND l_shipdate@3 < 9587 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type | plan |
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan | Sort: supplier.s_suppkey ASC NULLS LAST |
+| | Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue |
+| | Inner Join: revenue0.total_revenue = __scalar_sq_1.__value |
+| | Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue |
+| | Inner Join: supplier.s_suppkey = revenue0.supplier_no |
+| | TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone] |
+| | SubqueryAlias: revenue0 |
+| | Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue |
+| | Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
+| | Projection: lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount |
+| | Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") |
+| | TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] |
+| | SubqueryAlias: __scalar_sq_1 |
+| | Projection: MAX(revenue0.total_revenue) AS __value |
+| | Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]] |
+| | SubqueryAlias: revenue0 |
+| | Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue |
+| | Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
+| | Projection: lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount |
+| | Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") |
+| | TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] |
+| physical_plan | SortPreservingMergeExec: [s_suppkey@0 ASC NULLS LAST] |
+| | SortExec: expr=[s_suppkey@0 ASC NULLS LAST] |
+| | ProjectionExec: expr=[s_suppkey@0 as s_suppkey, s_name@1 as s_name, s_address@2 as s_address, s_phone@3 as s_phone, total_revenue@4 as total_revenue] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "total_revenue", index: 4 }, Column { name: "__value", index: 0 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "total_revenue", index: 4 }], 2), input_partitions=2 |
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2 |
+| | ProjectionExec: expr=[s_suppkey@0 as s_suppkey, s_name@1 as s_name, s_address@2 as s_address, s_phone@3 as s_phone, total_revenue@5 as total_revenue] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_suppkey", index: 0 }, Column { name: "supplier_no", index: 0 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2), input_partitions=0 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | ProjectionExec: expr=[l_suppkey@0 as supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as total_revenue] |
+| | AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 0 }], 2), input_partitions=2 |
+| | AggregateExec: mode=Partial, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
+| | ProjectionExec: expr=[l_suppkey@0 as l_suppkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | FilterExec: l_shipdate@3 >= 9496 AND l_shipdate@3 < 9587 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "__value", index: 0 }], 2), input_partitions=1 |
+| | ProjectionExec: expr=[MAX(revenue0.total_revenue)@0 as __value] |
+| | AggregateExec: mode=Final, gby=[], aggr=[MAX(revenue0.total_revenue)] |
+| | CoalescePartitionsExec |
+| | AggregateExec: mode=Partial, gby=[], aggr=[MAX(revenue0.total_revenue)] |
+| | ProjectionExec: expr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as total_revenue] |
+| | AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 0 }], 2), input_partitions=2 |
+| | AggregateExec: mode=Partial, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
+| | ProjectionExec: expr=[l_suppkey@0 as l_suppkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | FilterExec: l_shipdate@3 >= 9496 AND l_shipdate@3 < 9587 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | |
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q19.txt b/benchmarks/expected-plans/q19.txt
index 2e3ccbde94..77c889d38f 100644
--- a/benchmarks/expected-plans/q19.txt
+++ b/benchmarks/expected-plans/q19.txt
@@ -2,7 +2,7 @@
| plan_type | plan [...]
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
| logical_plan | Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue [...]
-| | Aggregate: groupBy=[[]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] [...]
+| | Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] [...]
| | Projection: lineitem.l_extendedprice, lineitem.l_discount [...]
| | Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand = Utf8("Brand#12") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND lineitem.l_quan [...]
| | Projection: lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount [...]
diff --git a/benchmarks/expected-plans/q3.txt b/benchmarks/expected-plans/q3.txt
index 36ddcc8fdd..0152235a48 100644
--- a/benchmarks/expected-plans/q3.txt
+++ b/benchmarks/expected-plans/q3.txt
@@ -1,55 +1,55 @@
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type | plan |
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan | Sort: revenue DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST |
-| | Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue, orders.o_orderdate, orders.o_shippriority |
-| | Aggregate: groupBy=[[lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
-| | Projection: orders.o_orderdate, orders.o_shippriority, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount |
-| | Inner Join: orders.o_orderkey = lineitem.l_orderkey |
-| | Projection: orders.o_orderkey, orders.o_orderdate, orders.o_shippriority |
-| | Inner Join: customer.c_custkey = orders.o_custkey |
-| | Projection: customer.c_custkey |
-| | Filter: customer.c_mktsegment = Utf8("BUILDING") |
-| | TableScan: customer projection=[c_custkey, c_mktsegment] |
-| | Filter: orders.o_orderdate < Date32("9204") |
-| | TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate, o_shippriority] |
-| | Projection: lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount |
-| | Filter: lineitem.l_shipdate > Date32("9204") |
-| | TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_shipdate] |
-| physical_plan | SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST] |
-| | SortExec: expr=[revenue@1 DESC,o_orderdate@2 ASC NULLS LAST] |
-| | ProjectionExec: expr=[l_orderkey@0 as l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority] |
-| | AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, Column { name: "o_shippriority", index: 2 }], 2), input_partitions=2 |
-| | AggregateExec: mode=Partial, gby=[l_orderkey@2 as l_orderkey, o_orderdate@0 as o_orderdate, o_shippriority@1 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
-| | ProjectionExec: expr=[o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority, l_orderkey@3 as l_orderkey, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 0 }, Column { name: "l_orderkey", index: 0 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 0 }], 2), input_partitions=2 |
-| | ProjectionExec: expr=[o_orderkey@1 as o_orderkey, o_orderdate@3 as o_orderdate, o_shippriority@4 as o_shippriority] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2), input_partitions=2 |
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
-| | ProjectionExec: expr=[c_custkey@0 as c_custkey] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | FilterExec: c_mktsegment@1 = BUILDING |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 2), input_partitions=2 |
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | FilterExec: o_orderdate@2 < 9204 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=2 |
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
-| | ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | FilterExec: l_shipdate@3 > 9204 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | |
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type | plan |
++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan | Sort: revenue DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST |
+| | Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue, orders.o_orderdate, orders.o_shippriority |
+| | Aggregate: groupBy=[[lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority]], aggr=[[SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
+| | Projection: orders.o_orderdate, orders.o_shippriority, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount |
+| | Inner Join: orders.o_orderkey = lineitem.l_orderkey |
+| | Projection: orders.o_orderkey, orders.o_orderdate, orders.o_shippriority |
+| | Inner Join: customer.c_custkey = orders.o_custkey |
+| | Projection: customer.c_custkey |
+| | Filter: customer.c_mktsegment = Utf8("BUILDING") |
+| | TableScan: customer projection=[c_custkey, c_mktsegment] |
+| | Filter: orders.o_orderdate < Date32("9204") |
+| | TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate, o_shippriority] |
+| | Projection: lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount |
+| | Filter: lineitem.l_shipdate > Date32("9204") |
+| | TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_shipdate] |
+| physical_plan | SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST] |
+| | SortExec: expr=[revenue@1 DESC,o_orderdate@2 ASC NULLS LAST] |
+| | ProjectionExec: expr=[l_orderkey@0 as l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority] |
+| | AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, Column { name: "o_shippriority", index: 2 }], 2), input_partitions=2 |
+| | AggregateExec: mode=Partial, gby=[l_orderkey@2 as l_orderkey, o_orderdate@0 as o_orderdate, o_shippriority@1 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
+| | ProjectionExec: expr=[o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority, l_orderkey@3 as l_orderkey, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 0 }, Column { name: "l_orderkey", index: 0 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 0 }], 2), input_partitions=2 |
+| | ProjectionExec: expr=[o_orderkey@1 as o_orderkey, o_orderdate@3 as o_orderdate, o_shippriority@4 as o_shippriority] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2), input_partitions=2 |
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
+| | ProjectionExec: expr=[c_custkey@0 as c_custkey] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | FilterExec: c_mktsegment@1 = BUILDING |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 2), input_partitions=2 |
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | FilterExec: o_orderdate@2 < 9204 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=2 |
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
+| | ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | FilterExec: l_shipdate@3 > 9204 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | |
++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q5.txt b/benchmarks/expected-plans/q5.txt
index de6854eecd..9388a385d9 100644
--- a/benchmarks/expected-plans/q5.txt
+++ b/benchmarks/expected-plans/q5.txt
@@ -1,85 +1,85 @@
-+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type | plan |
-+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan | Sort: revenue DESC NULLS FIRST |
-| | Projection: nation.n_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue |
-| | Aggregate: groupBy=[[nation.n_name]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
-| | Projection: lineitem.l_extendedprice, lineitem.l_discount, nation.n_name |
-| | Inner Join: nation.n_regionkey = region.r_regionkey |
-| | Projection: lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, nation.n_regionkey |
-| | Inner Join: supplier.s_nationkey = nation.n_nationkey |
-| | Projection: lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey |
-| | Inner Join: lineitem.l_suppkey = supplier.s_suppkey, customer.c_nationkey = supplier.s_nationkey |
-| | Projection: customer.c_nationkey, lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount |
-| | Inner Join: orders.o_orderkey = lineitem.l_orderkey |
-| | Projection: customer.c_nationkey, orders.o_orderkey |
-| | Inner Join: customer.c_custkey = orders.o_custkey |
-| | TableScan: customer projection=[c_custkey, c_nationkey] |
-| | Projection: orders.o_orderkey, orders.o_custkey |
-| | Filter: orders.o_orderdate >= Date32("8766") AND orders.o_orderdate < Date32("9131") |
-| | TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate] |
-| | TableScan: lineitem projection=[l_orderkey, l_suppkey, l_extendedprice, l_discount] |
-| | TableScan: supplier projection=[s_suppkey, s_nationkey] |
-| | TableScan: nation projection=[n_nationkey, n_name, n_regionkey] |
-| | Projection: region.r_regionkey |
-| | Filter: region.r_name = Utf8("ASIA") |
-| | TableScan: region projection=[r_regionkey, r_name] |
-| physical_plan | SortPreservingMergeExec: [revenue@1 DESC] |
-| | SortExec: expr=[revenue@1 DESC] |
-| | ProjectionExec: expr=[n_name@0 as n_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as revenue] |
-| | AggregateExec: mode=FinalPartitioned, gby=[n_name@0 as n_name], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "n_name", index: 0 }], 2), input_partitions=2 |
-| | AggregateExec: mode=Partial, gby=[n_name@2 as n_name], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
-| | ProjectionExec: expr=[l_extendedprice@0 as l_extendedprice, l_discount@1 as l_discount, n_name@2 as n_name] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "n_regionkey", index: 3 }, Column { name: "r_regionkey", index: 0 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "n_regionkey", index: 3 }], 2), input_partitions=2 |
-| | ProjectionExec: expr=[l_extendedprice@0 as l_extendedprice, l_discount@1 as l_discount, n_name@4 as n_name, n_regionkey@5 as n_regionkey] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_nationkey", index: 2 }, Column { name: "n_nationkey", index: 0 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "s_nationkey", index: 2 }], 2), input_partitions=2 |
-| | ProjectionExec: expr=[l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount, s_nationkey@5 as s_nationkey] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_suppkey", index: 1 }, Column { name: "s_suppkey", index: 0 }), (Column { name: "c_nationkey", index: 0 }, Column { name: "s_nationkey", index: 1 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 1 }, Column { name: "c_nationkey", index: 0 }], 2), input_partitions=2 |
-| | ProjectionExec: expr=[c_nationkey@0 as c_nationkey, l_suppkey@3 as l_suppkey, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 1 }, Column { name: "l_orderkey", index: 0 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 1 }], 2), input_partitions=2 |
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2 |
-| | ProjectionExec: expr=[c_nationkey@1 as c_nationkey, o_orderkey@2 as o_orderkey] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2), input_partitions=0 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 2), input_partitions=2 |
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
-| | ProjectionExec: expr=[o_orderkey@0 as o_orderkey, o_custkey@1 as o_custkey] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | FilterExec: o_orderdate@2 >= 8766 AND o_orderdate@2 < 9131 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=0 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }, Column { name: "s_nationkey", index: 1 }], 2), input_partitions=0 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=0 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "r_regionkey", index: 0 }], 2), input_partitions=2 |
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
-| | ProjectionExec: expr=[r_regionkey@0 as r_regionkey] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | FilterExec: r_name@1 = ASIA |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | |
-+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type | plan |
++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan | Sort: revenue DESC NULLS FIRST |
+| | Projection: nation.n_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue |
+| | Aggregate: groupBy=[[nation.n_name]], aggr=[[SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
+| | Projection: lineitem.l_extendedprice, lineitem.l_discount, nation.n_name |
+| | Inner Join: nation.n_regionkey = region.r_regionkey |
+| | Projection: lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, nation.n_regionkey |
+| | Inner Join: supplier.s_nationkey = nation.n_nationkey |
+| | Projection: lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey |
+| | Inner Join: lineitem.l_suppkey = supplier.s_suppkey, customer.c_nationkey = supplier.s_nationkey |
+| | Projection: customer.c_nationkey, lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount |
+| | Inner Join: orders.o_orderkey = lineitem.l_orderkey |
+| | Projection: customer.c_nationkey, orders.o_orderkey |
+| | Inner Join: customer.c_custkey = orders.o_custkey |
+| | TableScan: customer projection=[c_custkey, c_nationkey] |
+| | Projection: orders.o_orderkey, orders.o_custkey |
+| | Filter: orders.o_orderdate >= Date32("8766") AND orders.o_orderdate < Date32("9131") |
+| | TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate] |
+| | TableScan: lineitem projection=[l_orderkey, l_suppkey, l_extendedprice, l_discount] |
+| | TableScan: supplier projection=[s_suppkey, s_nationkey] |
+| | TableScan: nation projection=[n_nationkey, n_name, n_regionkey] |
+| | Projection: region.r_regionkey |
+| | Filter: region.r_name = Utf8("ASIA") |
+| | TableScan: region projection=[r_regionkey, r_name] |
+| physical_plan | SortPreservingMergeExec: [revenue@1 DESC] |
+| | SortExec: expr=[revenue@1 DESC] |
+| | ProjectionExec: expr=[n_name@0 as n_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as revenue] |
+| | AggregateExec: mode=FinalPartitioned, gby=[n_name@0 as n_name], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "n_name", index: 0 }], 2), input_partitions=2 |
+| | AggregateExec: mode=Partial, gby=[n_name@2 as n_name], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] |
+| | ProjectionExec: expr=[l_extendedprice@0 as l_extendedprice, l_discount@1 as l_discount, n_name@2 as n_name] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "n_regionkey", index: 3 }, Column { name: "r_regionkey", index: 0 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "n_regionkey", index: 3 }], 2), input_partitions=2 |
+| | ProjectionExec: expr=[l_extendedprice@0 as l_extendedprice, l_discount@1 as l_discount, n_name@4 as n_name, n_regionkey@5 as n_regionkey] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_nationkey", index: 2 }, Column { name: "n_nationkey", index: 0 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "s_nationkey", index: 2 }], 2), input_partitions=2 |
+| | ProjectionExec: expr=[l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount, s_nationkey@5 as s_nationkey] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_suppkey", index: 1 }, Column { name: "s_suppkey", index: 0 }), (Column { name: "c_nationkey", index: 0 }, Column { name: "s_nationkey", index: 1 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 1 }, Column { name: "c_nationkey", index: 0 }], 2), input_partitions=2 |
+| | ProjectionExec: expr=[c_nationkey@0 as c_nationkey, l_suppkey@3 as l_suppkey, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 1 }, Column { name: "l_orderkey", index: 0 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 1 }], 2), input_partitions=2 |
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2 |
+| | ProjectionExec: expr=[c_nationkey@1 as c_nationkey, o_orderkey@2 as o_orderkey] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2), input_partitions=0 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 2), input_partitions=2 |
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
+| | ProjectionExec: expr=[o_orderkey@0 as o_orderkey, o_custkey@1 as o_custkey] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | FilterExec: o_orderdate@2 >= 8766 AND o_orderdate@2 < 9131 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=0 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }, Column { name: "s_nationkey", index: 1 }], 2), input_partitions=0 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=0 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "r_regionkey", index: 0 }], 2), input_partitions=2 |
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
+| | ProjectionExec: expr=[r_regionkey@0 as r_regionkey] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | FilterExec: r_name@1 = ASIA |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | |
++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q7.txt b/benchmarks/expected-plans/q7.txt
index 619ac28d0a..572ee28ff0 100644
--- a/benchmarks/expected-plans/q7.txt
+++ b/benchmarks/expected-plans/q7.txt
@@ -5,7 +5,7 @@
| | Projection: shipping.supp_nation, shipping.cust_nation, shipping.l_year, SUM(shipping.volume) AS revenue [...]
| | Aggregate: groupBy=[[shipping.supp_nation, shipping.cust_nation, shipping.l_year]], aggr=[[SUM(shipping.volume)]] [...]
| | SubqueryAlias: shipping [...]
-| | Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, datepart(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS volume [...]
+| | Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, datepart(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS volume [...]
| | Inner Join: customer.c_nationkey = n2.n_nationkey Filter: n1.n_name = Utf8("FRANCE") AND n2.n_name = Utf8("GERMANY") OR n1.n_name = Utf8("GERMANY") AND n2.n_name = Utf8("FRANCE") [...]
| | Projection: lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, customer.c_nationkey, n1.n_name [...]
| | Inner Join: supplier.s_nationkey = n1.n_nationkey [...]
@@ -33,7 +33,7 @@
| | CoalesceBatchesExec: target_batch_size=8192 [...]
| | RepartitionExec: partitioning=Hash([Column { name: "supp_nation", index: 0 }, Column { name: "cust_nation", index: 1 }, Column { name: "l_year", index: 2 }], 2), input_partitions=2 [...]
| | AggregateExec: mode=Partial, gby=[supp_nation@0 as supp_nation, cust_nation@1 as cust_nation, l_year@2 as l_year], aggr=[SUM(shipping.volume)] [...]
-| | ProjectionExec: expr=[n_name@4 as supp_nation, n_name@6 as cust_nation, datepart(YEAR, l_shipdate@2) as l_year, CAST(l_extendedprice@0 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@1 AS Decimal128(23, 2)) AS Decimal128(38, 4)) as volume] [...]
+| | ProjectionExec: expr=[n_name@4 as supp_nation, n_name@6 as cust_nation, datepart(YEAR, l_shipdate@2) as l_year, l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as volume] [...]
| | CoalesceBatchesExec: target_batch_size=8192 [...]
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_nationkey", index: 3 }, Column { name: "n_nationkey", index: 0 })], filter=BinaryExpr { left: BinaryExpr { left: BinaryExpr { left: Column { name: "n_name", index: 0 }, op: Eq, right: Literal { value: Utf8("FRANCE") } }, op: And, right: BinaryExpr { left: Column { name: "n_name", index: 1 }, op: Eq, right: Literal { value: Utf8("GERMANY") } } }, op: Or, right: BinaryExpr { left: Bi [...]
| | CoalesceBatchesExec: target_batch_size=8192 [...]
diff --git a/benchmarks/expected-plans/q8.txt b/benchmarks/expected-plans/q8.txt
index d196bcc214..b356b9ded3 100644
--- a/benchmarks/expected-plans/q8.txt
+++ b/benchmarks/expected-plans/q8.txt
@@ -2,10 +2,10 @@
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: all_nations.o_year ASC NULLS LAST |
-| | Projection: all_nations.o_year, SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END) / SUM(all_nations.volume) AS mkt_share |
+| | Projection: all_nations.o_year, CAST(CAST(SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END) AS Decimal128(12, 2)) / CAST(SUM(all_nations.volume) AS Decimal128(12, 2)) AS Decimal128(15, 2)) AS mkt_share |
| | Aggregate: groupBy=[[all_nations.o_year]], aggr=[[SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Decimal128(Some(0),38,4) END) AS SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), SUM(all_nations.volume)]] |
| | SubqueryAlias: all_nations |
-| | Projection: datepart(Utf8("YEAR"), orders.o_orderdate) AS o_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS volume, n2.n_name AS nation |
+| | Projection: datepart(Utf8("YEAR"), orders.o_orderdate) AS o_year, lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS volume, n2.n_name AS nation |
| | Inner Join: n1.n_regionkey = region.r_regionkey |
| | Projection: lineitem.l_extendedprice, lineitem.l_discount, orders.o_orderdate, n1.n_regionkey, n2.n_name |
| | Inner Join: supplier.s_nationkey = n2.n_nationkey |
@@ -36,12 +36,12 @@
| | TableScan: region projection=[r_regionkey, r_name] |
| physical_plan | SortPreservingMergeExec: [o_year@0 ASC NULLS LAST] |
| | SortExec: expr=[o_year@0 ASC NULLS LAST] |
-| | ProjectionExec: expr=[o_year@0 as o_year, SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END)@1 / SUM(all_nations.volume)@2 as mkt_share] |
+| | ProjectionExec: expr=[o_year@0 as o_year, CAST(CAST(SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END)@1 AS Decimal128(12, 2)) / CAST(SUM(all_nations.volume)@2 AS Decimal128(12, 2)) AS Decimal128(15, 2)) as mkt_share] |
| | AggregateExec: mode=FinalPartitioned, gby=[o_year@0 as o_year], aggr=[SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), SUM(all_nations.volume)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([Column { name: "o_year", index: 0 }], 2), input_partitions=2 |
| | AggregateExec: mode=Partial, gby=[o_year@0 as o_year], aggr=[SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), SUM(all_nations.volume)] |
-| | ProjectionExec: expr=[datepart(YEAR, o_orderdate@2) as o_year, CAST(l_extendedprice@0 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@1 AS Decimal128(23, 2)) AS Decimal128(38, 4)) as volume, n_name@4 as nation] |
+| | ProjectionExec: expr=[datepart(YEAR, o_orderdate@2) as o_year, l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as volume, n_name@4 as nation] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "n_regionkey", index: 3 }, Column { name: "r_regionkey", index: 0 })] |
| | CoalesceBatchesExec: target_batch_size=8192 |
diff --git a/benchmarks/expected-plans/q9.txt b/benchmarks/expected-plans/q9.txt
index 414ecf804a..24a5e7f16e 100644
--- a/benchmarks/expected-plans/q9.txt
+++ b/benchmarks/expected-plans/q9.txt
@@ -1,79 +1,79 @@
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type | plan |
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan | Sort: profit.nation ASC NULLS LAST, profit.o_year DESC NULLS FIRST |
-| | Projection: profit.nation, profit.o_year, SUM(profit.amount) AS sum_profit |
-| | Aggregate: groupBy=[[profit.nation, profit.o_year]], aggr=[[SUM(profit.amount)]] |
-| | SubqueryAlias: profit |
-| | Projection: nation.n_name AS nation, datepart(Utf8("YEAR"), orders.o_orderdate) AS o_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) - CAST(partsupp.ps_supplycost * lineitem.l_quantity AS Decimal128(38, 4)) AS amount |
-| | Inner Join: supplier.s_nationkey = nation.n_nationkey |
-| | Projection: lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey, partsupp.ps_supplycost, orders.o_orderdate |
-| | Inner Join: lineitem.l_orderkey = orders.o_orderkey |
-| | Projection: lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey, partsupp.ps_supplycost |
-| | Inner Join: lineitem.l_suppkey = partsupp.ps_suppkey, lineitem.l_partkey = partsupp.ps_partkey |
-| | Projection: lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey |
-| | Inner Join: lineitem.l_suppkey = supplier.s_suppkey |
-| | Projection: lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount |
-| | Inner Join: part.p_partkey = lineitem.l_partkey |
-| | Projection: part.p_partkey |
-| | Filter: part.p_name LIKE Utf8("%green%") |
-| | TableScan: part projection=[p_partkey, p_name] |
-| | TableScan: lineitem projection=[l_orderkey, l_partkey, l_suppkey, l_quantity, l_extendedprice, l_discount] |
-| | TableScan: supplier projection=[s_suppkey, s_nationkey] |
-| | TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost] |
-| | TableScan: orders projection=[o_orderkey, o_orderdate] |
-| | TableScan: nation projection=[n_nationkey, n_name] |
-| physical_plan | SortPreservingMergeExec: [nation@0 ASC NULLS LAST,o_year@1 DESC] |
-| | SortExec: expr=[nation@0 ASC NULLS LAST,o_year@1 DESC] |
-| | ProjectionExec: expr=[nation@0 as nation, o_year@1 as o_year, SUM(profit.amount)@2 as sum_profit] |
-| | AggregateExec: mode=FinalPartitioned, gby=[nation@0 as nation, o_year@1 as o_year], aggr=[SUM(profit.amount)] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "nation", index: 0 }, Column { name: "o_year", index: 1 }], 2), input_partitions=2 |
-| | AggregateExec: mode=Partial, gby=[nation@0 as nation, o_year@1 as o_year], aggr=[SUM(profit.amount)] |
-| | ProjectionExec: expr=[n_name@7 as nation, datepart(YEAR, o_orderdate@5) as o_year, CAST(l_extendedprice@1 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@2 AS Decimal128(23, 2)) AS Decimal128(38, 4)) - CAST(ps_supplycost@4 * l_quantity@0 AS Decimal128(38, 4)) as amount] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_nationkey", index: 3 }, Column { name: "n_nationkey", index: 0 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "s_nationkey", index: 3 }], 2), input_partitions=2 |
-| | ProjectionExec: expr=[l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount, s_nationkey@4 as s_nationkey, ps_supplycost@5 as ps_supplycost, o_orderdate@7 as o_orderdate] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=2 |
-| | ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_quantity@3 as l_quantity, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount, s_nationkey@6 as s_nationkey, ps_supplycost@9 as ps_supplycost] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_suppkey", index: 2 }, Column { name: "ps_suppkey", index: 1 }), (Column { name: "l_partkey", index: 1 }, Column { name: "ps_partkey", index: 0 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 2 }, Column { name: "l_partkey", index: 1 }], 2), input_partitions=2 |
-| | ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_partkey@1 as l_partkey, l_suppkey@2 as l_suppkey, l_quantity@3 as l_quantity, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount, s_nationkey@7 as s_nationkey] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_suppkey", index: 2 }, Column { name: "s_suppkey", index: 0 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 2 }], 2), input_partitions=2 |
-| | ProjectionExec: expr=[l_orderkey@1 as l_orderkey, l_partkey@2 as l_partkey, l_suppkey@3 as l_suppkey, l_quantity@4 as l_quantity, l_extendedprice@5 as l_extendedprice, l_discount@6 as l_discount] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "p_partkey", index: 0 }, Column { name: "l_partkey", index: 1 })] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=2 |
-| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
-| | ProjectionExec: expr=[p_partkey@0 as p_partkey] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | FilterExec: p_name@1 LIKE %green% |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 1 }], 2), input_partitions=0 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2), input_partitions=0 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "ps_suppkey", index: 1 }, Column { name: "ps_partkey", index: 0 }], 2), input_partitions=0 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 0 }], 2), input_partitions=0 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | CoalesceBatchesExec: target_batch_size=8192 |
-| | RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=0 |
-| | MemoryExec: partitions=0, partition_sizes=[] |
-| | |
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type | plan |
++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan | Sort: profit.nation ASC NULLS LAST, profit.o_year DESC NULLS FIRST |
+| | Projection: profit.nation, profit.o_year, SUM(profit.amount) AS sum_profit |
+| | Aggregate: groupBy=[[profit.nation, profit.o_year]], aggr=[[SUM(profit.amount)]] |
+| | SubqueryAlias: profit |
+| | Projection: nation.n_name AS nation, datepart(Utf8("YEAR"), orders.o_orderdate) AS o_year, lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) - partsupp.ps_supplycost * lineitem.l_quantity AS amount |
+| | Inner Join: supplier.s_nationkey = nation.n_nationkey |
+| | Projection: lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey, partsupp.ps_supplycost, orders.o_orderdate |
+| | Inner Join: lineitem.l_orderkey = orders.o_orderkey |
+| | Projection: lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey, partsupp.ps_supplycost |
+| | Inner Join: lineitem.l_suppkey = partsupp.ps_suppkey, lineitem.l_partkey = partsupp.ps_partkey |
+| | Projection: lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey |
+| | Inner Join: lineitem.l_suppkey = supplier.s_suppkey |
+| | Projection: lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount |
+| | Inner Join: part.p_partkey = lineitem.l_partkey |
+| | Projection: part.p_partkey |
+| | Filter: part.p_name LIKE Utf8("%green%") |
+| | TableScan: part projection=[p_partkey, p_name] |
+| | TableScan: lineitem projection=[l_orderkey, l_partkey, l_suppkey, l_quantity, l_extendedprice, l_discount] |
+| | TableScan: supplier projection=[s_suppkey, s_nationkey] |
+| | TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost] |
+| | TableScan: orders projection=[o_orderkey, o_orderdate] |
+| | TableScan: nation projection=[n_nationkey, n_name] |
+| physical_plan | SortPreservingMergeExec: [nation@0 ASC NULLS LAST,o_year@1 DESC] |
+| | SortExec: expr=[nation@0 ASC NULLS LAST,o_year@1 DESC] |
+| | ProjectionExec: expr=[nation@0 as nation, o_year@1 as o_year, SUM(profit.amount)@2 as sum_profit] |
+| | AggregateExec: mode=FinalPartitioned, gby=[nation@0 as nation, o_year@1 as o_year], aggr=[SUM(profit.amount)] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "nation", index: 0 }, Column { name: "o_year", index: 1 }], 2), input_partitions=2 |
+| | AggregateExec: mode=Partial, gby=[nation@0 as nation, o_year@1 as o_year], aggr=[SUM(profit.amount)] |
+| | ProjectionExec: expr=[n_name@7 as nation, datepart(YEAR, o_orderdate@5) as o_year, l_extendedprice@1 * (Some(1),20,0 - l_discount@2) - ps_supplycost@4 * l_quantity@0 as amount] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_nationkey", index: 3 }, Column { name: "n_nationkey", index: 0 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "s_nationkey", index: 3 }], 2), input_partitions=2 |
+| | ProjectionExec: expr=[l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount, s_nationkey@4 as s_nationkey, ps_supplycost@5 as ps_supplycost, o_orderdate@7 as o_orderdate] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=2 |
+| | ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_quantity@3 as l_quantity, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount, s_nationkey@6 as s_nationkey, ps_supplycost@9 as ps_supplycost] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_suppkey", index: 2 }, Column { name: "ps_suppkey", index: 1 }), (Column { name: "l_partkey", index: 1 }, Column { name: "ps_partkey", index: 0 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 2 }, Column { name: "l_partkey", index: 1 }], 2), input_partitions=2 |
+| | ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_partkey@1 as l_partkey, l_suppkey@2 as l_suppkey, l_quantity@3 as l_quantity, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount, s_nationkey@7 as s_nationkey] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_suppkey", index: 2 }, Column { name: "s_suppkey", index: 0 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 2 }], 2), input_partitions=2 |
+| | ProjectionExec: expr=[l_orderkey@1 as l_orderkey, l_partkey@2 as l_partkey, l_suppkey@3 as l_suppkey, l_quantity@4 as l_quantity, l_extendedprice@5 as l_extendedprice, l_discount@6 as l_discount] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "p_partkey", index: 0 }, Column { name: "l_partkey", index: 1 })] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=2 |
+| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 |
+| | ProjectionExec: expr=[p_partkey@0 as p_partkey] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | FilterExec: p_name@1 LIKE %green% |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 1 }], 2), input_partitions=0 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2), input_partitions=0 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "ps_suppkey", index: 1 }, Column { name: "ps_partkey", index: 0 }], 2), input_partitions=0 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 0 }], 2), input_partitions=0 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | CoalesceBatchesExec: target_batch_size=8192 |
+| | RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=0 |
+| | MemoryExec: partitions=0, partition_sizes=[] |
+| | |
++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
diff --git a/benchmarks/queries/q8.sql b/benchmarks/queries/q8.sql
index 6ddb2a6747..4f34dca6a0 100644
--- a/benchmarks/queries/q8.sql
+++ b/benchmarks/queries/q8.sql
@@ -1,9 +1,9 @@
select
o_year,
- sum(case
- when nation = 'BRAZIL' then volume
- else 0
- end) / sum(volume) as mkt_share
+ cast(cast(sum(case
+ when nation = 'BRAZIL' then volume
+ else 0
+ end) as decimal(12,2)) / cast(sum(volume) as decimal(12,2)) as decimal(15,2)) as mkt_share
from
(
select
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 1c50478522..36da520cb7 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -908,7 +908,6 @@ mod ci {
verify_query(5).await
}
- #[ignore] // https://github.com/apache/arrow-datafusion/issues/4024
#[tokio::test]
async fn verify_q6() -> Result<()> {
verify_query(6).await
diff --git a/datafusion/core/tests/sqllogictests/src/engines/conversion.rs b/datafusion/core/tests/sqllogictests/src/engines/conversion.rs
index 0d013c47b3..c069c2d4a4 100644
--- a/datafusion/core/tests/sqllogictests/src/engines/conversion.rs
+++ b/datafusion/core/tests/sqllogictests/src/engines/conversion.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use arrow::datatypes::{Decimal128Type, DecimalType};
use bigdecimal::BigDecimal;
use half::f16;
use rust_decimal::prelude::*;
@@ -74,9 +75,9 @@ pub fn f64_to_str(value: f64) -> String {
}
}
-pub fn i128_to_str(value: i128, scale: u32) -> String {
+pub fn i128_to_str(value: i128, precision: &u8, scale: &i8) -> String {
big_decimal_to_str(
- BigDecimal::from_str(&Decimal::from_i128_with_scale(value, scale).to_string())
+ BigDecimal::from_str(&Decimal128Type::format_decimal(value, *precision, *scale))
.unwrap(),
)
}
diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs
index 1f4855730c..20cd2331a4 100644
--- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs
+++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs
@@ -200,10 +200,9 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result<String> {
DataType::Float64 => {
Ok(f64_to_str(get_row_value!(array::Float64Array, col, row)))
}
- DataType::Decimal128(_, scale) => {
+ DataType::Decimal128(precision, scale) => {
let value = get_row_value!(array::Decimal128Array, col, row);
- let decimal_scale = u32::try_from((*scale).max(0)).unwrap();
- Ok(i128_to_str(value, decimal_scale))
+ Ok(i128_to_str(value, precision, scale))
}
DataType::LargeUtf8 => Ok(varchar_to_str(get_row_value!(
array::LargeStringArray,
diff --git a/datafusion/core/tests/sqllogictests/test_files/tpch.slt b/datafusion/core/tests/sqllogictests/test_files/tpch.slt
index ee9b8d92d5..619fc8e0f6 100644
--- a/datafusion/core/tests/sqllogictests/test_files/tpch.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/tpch.slt
@@ -118,6 +118,10 @@ CREATE EXTERNAL TABLE IF NOT EXISTS supplier (
# q1
+# The decimal calculation `sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge` actually causes
+# overflow on decimal multiplication. Because the kernel DataFusion uses doesn't allow precision loss, we need
+# a cast to decimal(12,2) to avoid the overflow. We can get rid of it once we can use new multiply kernel from
+# arrow-rs which allows precision-loss.
query TTRRRRRRRI
select
l_returnflag,
@@ -125,7 +129,7 @@ select
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
- sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
+ sum(cast(l_extendedprice as decimal(12,2)) * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
diff --git a/datafusion/expr/src/type_coercion/binary.rs b/datafusion/expr/src/type_coercion/binary.rs
index 7f767d9368..4983dd25d2 100644
--- a/datafusion/expr/src/type_coercion/binary.rs
+++ b/datafusion/expr/src/type_coercion/binary.rs
@@ -17,7 +17,7 @@
//! Coercion rules for matching argument types for binary operators
-use crate::type_coercion::{is_date, is_interval, is_numeric, is_timestamp};
+use crate::type_coercion::{is_date, is_decimal, is_interval, is_numeric, is_timestamp};
use crate::Operator;
use arrow::compute::can_cast_types;
use arrow::datatypes::{
@@ -35,9 +35,40 @@ pub fn binary_operator_data_type(
op: &Operator,
rhs_type: &DataType,
) -> Result<DataType> {
- // validate that it is possible to perform the operation on incoming types.
- // (or the return datatype cannot be inferred)
- let result_type = coerce_types(lhs_type, op, rhs_type)?;
+ let result_type = if !any_decimal(lhs_type, rhs_type) {
+ // validate that it is possible to perform the operation on incoming types.
+ // (or the return datatype cannot be inferred)
+ coerce_types(lhs_type, op, rhs_type)?
+ } else {
+ let (coerced_lhs_type, coerced_rhs_type) =
+ math_decimal_coercion(lhs_type, rhs_type);
+ let lhs_type = if let Some(lhs_type) = coerced_lhs_type {
+ lhs_type
+ } else {
+ lhs_type.clone()
+ };
+ let rhs_type = if let Some(rhs_type) = coerced_rhs_type {
+ rhs_type
+ } else {
+ rhs_type.clone()
+ };
+
+ match op {
+ Operator::Plus
+ | Operator::Minus
+ | Operator::Divide
+ | Operator::Multiply
+ | Operator::Modulo => decimal_op_mathematics_type(op, &lhs_type, &rhs_type)
+ .or_else(|| coerce_types(&lhs_type, op, &rhs_type).ok())
+ .ok_or_else(|| {
+ DataFusionError::Internal(format!(
+ "Could not get return type for {:?} between {:?} and {:?}",
+ op, lhs_type, rhs_type
+ ))
+ })?,
+ _ => coerce_types(&lhs_type, op, &rhs_type)?,
+ }
+ };
match op {
// operators that return a boolean
@@ -135,7 +166,7 @@ pub fn coerce_types(
| Operator::Minus
| Operator::Modulo
| Operator::Divide
- | Operator::Multiply => mathematics_numerical_coercion(op, lhs_type, rhs_type),
+ | Operator::Multiply => mathematics_numerical_coercion(lhs_type, rhs_type),
Operator::RegexMatch
| Operator::RegexIMatch
| Operator::RegexNotMatch
@@ -155,6 +186,46 @@ pub fn coerce_types(
}
}
+/// Coercion rules for mathematics operators between decimal and non-decimal types.
+pub fn math_decimal_coercion(
+ lhs_type: &DataType,
+ rhs_type: &DataType,
+) -> (Option<DataType>, Option<DataType>) {
+ use arrow::datatypes::DataType::*;
+
+ if both_decimal(lhs_type, rhs_type) {
+ return (None, None);
+ }
+
+ match (lhs_type, rhs_type) {
+ (Null, dec_type @ Decimal128(_, _)) => (Some(dec_type.clone()), None),
+ (dec_type @ Decimal128(_, _), Null) => (None, Some(dec_type.clone())),
+ (Dictionary(key_type, value_type), _) => {
+ let (value_type, rhs_type) = math_decimal_coercion(value_type, rhs_type);
+ let lhs_type = value_type
+ .map(|value_type| Dictionary(key_type.clone(), Box::new(value_type)));
+ (lhs_type, rhs_type)
+ }
+ (_, Dictionary(key_type, value_type)) => {
+ let (lhs_type, value_type) = math_decimal_coercion(lhs_type, value_type);
+ let rhs_type = value_type
+ .map(|value_type| Dictionary(key_type.clone(), Box::new(value_type)));
+ (lhs_type, rhs_type)
+ }
+ (Decimal128(_, _), Float32 | Float64) => (Some(Float64), Some(Float64)),
+ (Float32 | Float64, Decimal128(_, _)) => (Some(Float64), Some(Float64)),
+ (Decimal128(_, _), _) => {
+ let converted_decimal_type = coerce_numeric_type_to_decimal(rhs_type);
+ (None, converted_decimal_type)
+ }
+ (_, Decimal128(_, _)) => {
+ let converted_decimal_type = coerce_numeric_type_to_decimal(lhs_type);
+ (converted_decimal_type, None)
+ }
+ _ => (None, None),
+ }
+}
+
/// Returns the output type of applying bitwise operations such as
/// `&`, `|`, or `xor`to arguments of `lhs_type` and `rhs_type`.
fn bitwise_coercion(left_type: &DataType, right_type: &DataType) -> Option<DataType> {
@@ -428,7 +499,6 @@ fn coerce_numeric_type_to_decimal(numeric_type: &DataType) -> Option<DataType> {
/// Returns the output type of applying mathematics operations such as
/// `+` to arguments of `lhs_type` and `rhs_type`.
fn mathematics_numerical_coercion(
- mathematics_op: &Operator,
lhs_type: &DataType,
rhs_type: &DataType,
) -> Option<DataType> {
@@ -452,47 +522,16 @@ fn mathematics_numerical_coercion(
// these are ordered from most informative to least informative so
// that the coercion removes the least amount of information
match (lhs_type, rhs_type) {
- (Decimal128(_, _), Decimal128(_, _)) => {
- coercion_decimal_mathematics_type(mathematics_op, lhs_type, rhs_type)
- }
- (Null, dec_type @ Decimal128(_, _)) | (dec_type @ Decimal128(_, _), Null) => {
- Some(dec_type.clone())
- }
(Dictionary(_, lhs_value_type), Dictionary(_, rhs_value_type)) => {
- mathematics_numerical_coercion(mathematics_op, lhs_value_type, rhs_value_type)
+ mathematics_numerical_coercion(lhs_value_type, rhs_value_type)
}
(Dictionary(key_type, value_type), _) => {
- let value_type =
- mathematics_numerical_coercion(mathematics_op, value_type, rhs_type);
+ let value_type = mathematics_numerical_coercion(value_type, rhs_type);
value_type
.map(|value_type| Dictionary(key_type.clone(), Box::new(value_type)))
}
(_, Dictionary(_, value_type)) => {
- mathematics_numerical_coercion(mathematics_op, lhs_type, value_type)
- }
- (Decimal128(_, _), Float32 | Float64) => Some(Float64),
- (Float32 | Float64, Decimal128(_, _)) => Some(Float64),
- (Decimal128(_, _), _) => {
- let converted_decimal_type = coerce_numeric_type_to_decimal(rhs_type);
- match converted_decimal_type {
- None => None,
- Some(right_decimal_type) => coercion_decimal_mathematics_type(
- mathematics_op,
- lhs_type,
- &right_decimal_type,
- ),
- }
- }
- (_, Decimal128(_, _)) => {
- let converted_decimal_type = coerce_numeric_type_to_decimal(lhs_type);
- match converted_decimal_type {
- None => None,
- Some(left_decimal_type) => coercion_decimal_mathematics_type(
- mathematics_op,
- &left_decimal_type,
- rhs_type,
- ),
- }
+ mathematics_numerical_coercion(lhs_type, value_type)
}
(Float64, _) | (_, Float64) => Some(Float64),
(_, Float32) | (Float32, _) => Some(Float32),
@@ -515,7 +554,38 @@ fn create_decimal_type(precision: u8, scale: i8) -> DataType {
)
}
-fn coercion_decimal_mathematics_type(
+/// Returns the coerced type of applying mathematics operations on decimal types.
+/// Two sides of the mathematics operation will be coerced to the same type. Note
+/// that we don't coerce the decimal operands in analysis phase, but do it in the
+/// execution phase because this is not idempotent.
+pub fn coercion_decimal_mathematics_type(
+ mathematics_op: &Operator,
+ left_decimal_type: &DataType,
+ right_decimal_type: &DataType,
+) -> Option<DataType> {
+ use arrow::datatypes::DataType::*;
+ match (left_decimal_type, right_decimal_type) {
+ // The promotion rule from spark
+ // https://github.com/apache/spark/blob/c20af535803a7250fef047c2bf0fe30be242369d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala#L35
+ (Decimal128(_, _), Decimal128(_, _)) => match mathematics_op {
+ Operator::Plus | Operator::Minus => decimal_op_mathematics_type(
+ mathematics_op,
+ left_decimal_type,
+ right_decimal_type,
+ ),
+ Operator::Multiply | Operator::Divide | Operator::Modulo => {
+ get_wider_decimal_type(left_decimal_type, right_decimal_type)
+ }
+ _ => None,
+ },
+ _ => None,
+ }
+}
+
+/// Returns the output type of applying mathematics operations on decimal types.
+/// The rule is from spark. Note that this is different to the coerced type applied
+/// to two sides of the arithmetic operation.
+pub fn decimal_op_mathematics_type(
mathematics_op: &Operator,
left_decimal_type: &DataType,
right_decimal_type: &DataType,
@@ -559,6 +629,27 @@ fn coercion_decimal_mathematics_type(
_ => None,
}
}
+ (Dictionary(_, lhs_value_type), Dictionary(_, rhs_value_type)) => {
+ decimal_op_mathematics_type(
+ mathematics_op,
+ lhs_value_type.as_ref(),
+ rhs_value_type.as_ref(),
+ )
+ }
+ (Dictionary(key_type, value_type), _) => {
+ let value_type = decimal_op_mathematics_type(
+ mathematics_op,
+ value_type.as_ref(),
+ right_decimal_type,
+ );
+ value_type
+ .map(|value_type| Dictionary(key_type.clone(), Box::new(value_type)))
+ }
+ (_, Dictionary(_, value_type)) => decimal_op_mathematics_type(
+ mathematics_op,
+ left_decimal_type,
+ value_type.as_ref(),
+ ),
_ => None,
}
}
@@ -582,6 +673,35 @@ fn both_numeric_or_null_and_numeric(lhs_type: &DataType, rhs_type: &DataType) ->
}
}
+/// Determine if at least of one of lhs and rhs is decimal, and the other must be NULL or decimal
+fn both_decimal(lhs_type: &DataType, rhs_type: &DataType) -> bool {
+ match (lhs_type, rhs_type) {
+ (_, DataType::Null) => is_decimal(lhs_type),
+ (DataType::Null, _) => is_decimal(rhs_type),
+ (DataType::Decimal128(_, _), DataType::Decimal128(_, _)) => true,
+ (DataType::Dictionary(_, value_type), _) => {
+ is_decimal(value_type) && is_decimal(rhs_type)
+ }
+ (_, DataType::Dictionary(_, value_type)) => {
+ is_decimal(lhs_type) && is_decimal(value_type)
+ }
+ _ => false,
+ }
+}
+
+/// Determine if at least of one of lhs and rhs is decimal
+pub fn any_decimal(lhs_type: &DataType, rhs_type: &DataType) -> bool {
+ match (lhs_type, rhs_type) {
+ (DataType::Dictionary(_, value_type), _) => {
+ is_decimal(value_type) || is_decimal(rhs_type)
+ }
+ (_, DataType::Dictionary(_, value_type)) => {
+ is_decimal(lhs_type) || is_decimal(value_type)
+ }
+ (_, _) => is_decimal(lhs_type) || is_decimal(rhs_type),
+ }
+}
+
/// Coercion rules for Dictionaries: the type that both lhs and rhs
/// can be casted to for the purpose of a computation.
///
@@ -885,6 +1005,9 @@ mod tests {
&left_decimal_type,
&right_decimal_type,
);
+ assert_eq!(DataType::Decimal128(20, 4), result.unwrap());
+ let result =
+ decimal_op_mathematics_type(&op, &left_decimal_type, &right_decimal_type);
assert_eq!(DataType::Decimal128(31, 7), result.unwrap());
let op = Operator::Divide;
let result = coercion_decimal_mathematics_type(
@@ -892,6 +1015,9 @@ mod tests {
&left_decimal_type,
&right_decimal_type,
);
+ assert_eq!(DataType::Decimal128(20, 4), result.unwrap());
+ let result =
+ decimal_op_mathematics_type(&op, &left_decimal_type, &right_decimal_type);
assert_eq!(DataType::Decimal128(35, 24), result.unwrap());
let op = Operator::Modulo;
let result = coercion_decimal_mathematics_type(
@@ -899,6 +1025,9 @@ mod tests {
&left_decimal_type,
&right_decimal_type,
);
+ assert_eq!(DataType::Decimal128(20, 4), result.unwrap());
+ let result =
+ decimal_op_mathematics_type(&op, &left_decimal_type, &right_decimal_type);
assert_eq!(DataType::Decimal128(11, 4), result.unwrap());
}
@@ -1154,45 +1283,101 @@ mod tests {
Operator::Multiply,
DataType::Float64
);
- // decimal
- // bug: https://github.com/apache/arrow-datafusion/issues/3387 will be fixed in the next pr
- // test_coercion_binary_rule!(
- // DataType::Decimal128(10, 2),
- // DataType::Decimal128(10, 2),
- // Operator::Plus,
- // DataType::Decimal128(11, 2)
- // );
- test_coercion_binary_rule!(
+ // TODO add other data type
+ Ok(())
+ }
+
+ fn test_math_decimal_coercion_rule(
+ lhs_type: DataType,
+ rhs_type: DataType,
+ mathematics_op: Operator,
+ expected_lhs_type: Option<DataType>,
+ expected_rhs_type: Option<DataType>,
+ expected_coerced_type: DataType,
+ expected_output_type: DataType,
+ ) {
+ // The coerced types for lhs and rhs, if any of them is not decimal
+ let (l, r) = math_decimal_coercion(&lhs_type, &rhs_type);
+ assert_eq!(l, expected_lhs_type);
+ assert_eq!(r, expected_rhs_type);
+
+ let lhs_type = l.unwrap_or(lhs_type);
+ let rhs_type = r.unwrap_or(rhs_type);
+
+ // The coerced type of decimal math expression, applied during expression evaluation
+ let coerced_type =
+ coercion_decimal_mathematics_type(&mathematics_op, &lhs_type, &rhs_type)
+ .unwrap();
+ assert_eq!(coerced_type, expected_coerced_type);
+
+ // The output type of decimal math expression
+ let output_type =
+ decimal_op_mathematics_type(&mathematics_op, &lhs_type, &rhs_type).unwrap();
+ assert_eq!(output_type, expected_output_type);
+ }
+
+ #[test]
+ fn test_coercion_arithmetic_decimal() -> Result<()> {
+ test_math_decimal_coercion_rule(
+ DataType::Decimal128(10, 2),
+ DataType::Decimal128(10, 2),
+ Operator::Plus,
+ None,
+ None,
+ DataType::Decimal128(11, 2),
+ DataType::Decimal128(11, 2),
+ );
+
+ test_math_decimal_coercion_rule(
DataType::Int32,
DataType::Decimal128(10, 2),
Operator::Plus,
- DataType::Decimal128(13, 2)
+ Some(DataType::Decimal128(10, 0)),
+ None,
+ DataType::Decimal128(13, 2),
+ DataType::Decimal128(13, 2),
);
- test_coercion_binary_rule!(
+
+ test_math_decimal_coercion_rule(
DataType::Int32,
DataType::Decimal128(10, 2),
Operator::Minus,
- DataType::Decimal128(13, 2)
+ Some(DataType::Decimal128(10, 0)),
+ None,
+ DataType::Decimal128(13, 2),
+ DataType::Decimal128(13, 2),
);
- test_coercion_binary_rule!(
+
+ test_math_decimal_coercion_rule(
DataType::Int32,
DataType::Decimal128(10, 2),
Operator::Multiply,
- DataType::Decimal128(21, 2)
+ Some(DataType::Decimal128(10, 0)),
+ None,
+ DataType::Decimal128(12, 2),
+ DataType::Decimal128(21, 2),
);
- test_coercion_binary_rule!(
+
+ test_math_decimal_coercion_rule(
DataType::Int32,
DataType::Decimal128(10, 2),
Operator::Divide,
- DataType::Decimal128(23, 11)
+ Some(DataType::Decimal128(10, 0)),
+ None,
+ DataType::Decimal128(12, 2),
+ DataType::Decimal128(23, 11),
);
- test_coercion_binary_rule!(
+
+ test_math_decimal_coercion_rule(
DataType::Int32,
DataType::Decimal128(10, 2),
Operator::Modulo,
- DataType::Decimal128(10, 2)
+ Some(DataType::Decimal128(10, 0)),
+ None,
+ DataType::Decimal128(12, 2),
+ DataType::Decimal128(10, 2),
);
- // TODO add other data type
+
Ok(())
}
diff --git a/datafusion/expr/src/type_coercion/mod.rs b/datafusion/expr/src/type_coercion/mod.rs
index 73b4a1f93b..4a1d695dea 100644
--- a/datafusion/expr/src/type_coercion/mod.rs
+++ b/datafusion/expr/src/type_coercion/mod.rs
@@ -85,3 +85,8 @@ pub fn is_date(dt: &DataType) -> bool {
pub fn is_utf8_or_large_utf8(dt: &DataType) -> bool {
matches!(dt, DataType::Utf8 | DataType::LargeUtf8)
}
+
+/// Determine whether the given data type `dt` is a `Decimal`.
+pub fn is_decimal(dt: &DataType) -> bool {
+ matches!(dt, DataType::Decimal128(_, _))
+}
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 9fb27036aa..7a346b5b9e 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -28,7 +28,7 @@ use datafusion_expr::expr::{self, Between, BinaryExpr, Case, Like, WindowFunctio
use datafusion_expr::expr_schema::cast_subquery;
use datafusion_expr::logical_plan::Subquery;
use datafusion_expr::type_coercion::binary::{
- coerce_types, comparison_coercion, like_coercion,
+ any_decimal, coerce_types, comparison_coercion, like_coercion, math_decimal_coercion,
};
use datafusion_expr::type_coercion::functions::data_types;
use datafusion_expr::type_coercion::other::{
@@ -266,6 +266,33 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
)))
}
}
+ // For numerical operations between decimals, we don't coerce the types.
+ // But if only one of the operands is decimal, we cast the other operand to decimal
+ // if the other operand is integer. If the other operand is float, we cast the
+ // decimal operand to float.
+ (lhs_type, rhs_type)
+ if op.is_numerical_operators()
+ && any_decimal(lhs_type, rhs_type) =>
+ {
+ let (coerced_lhs_type, coerced_rhs_type) =
+ math_decimal_coercion(lhs_type, rhs_type);
+ let new_left = if let Some(lhs_type) = coerced_lhs_type {
+ left.clone().cast_to(&lhs_type, &self.schema)?
+ } else {
+ left.as_ref().clone()
+ };
+ let new_right = if let Some(rhs_type) = coerced_rhs_type {
+ right.clone().cast_to(&rhs_type, &self.schema)?
+ } else {
+ right.as_ref().clone()
+ };
+ let expr = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(new_left),
+ op,
+ Box::new(new_right),
+ ));
+ Ok(expr)
+ }
_ => {
let coerced_type = coerce_types(&left_type, &op, &right_type)?;
let expr = Expr::BinaryExpr(BinaryExpr::new(
diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs
index 5ef4342435..b98a2dc420 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -50,7 +50,7 @@ use arrow::compute::kernels::comparison::{
eq_dyn_utf8_scalar, gt_dyn_utf8_scalar, gt_eq_dyn_utf8_scalar, lt_dyn_utf8_scalar,
lt_eq_dyn_utf8_scalar, neq_dyn_utf8_scalar,
};
-use arrow::compute::{try_unary, unary};
+use arrow::compute::{try_unary, unary, CastOptions};
use arrow::datatypes::*;
use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn};
@@ -61,7 +61,7 @@ use datafusion_common::scalar::{
op_ym_dt, op_ym_mdn, parse_timezones, seconds_add, seconds_sub, MILLISECOND_MODE,
NANOSECOND_MODE,
};
-use datafusion_expr::type_coercion::{is_timestamp, is_utf8_or_large_utf8};
+use datafusion_expr::type_coercion::{is_decimal, is_timestamp, is_utf8_or_large_utf8};
use kernels::{
bitwise_and, bitwise_and_scalar, bitwise_or, bitwise_or_scalar, bitwise_shift_left,
bitwise_shift_left_scalar, bitwise_shift_right, bitwise_shift_right_scalar,
@@ -82,6 +82,7 @@ use arrow::datatypes::{DataType, Schema, TimeUnit};
use arrow::record_batch::RecordBatch;
use super::column::Column;
+use crate::expressions::cast_column;
use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
use crate::intervals::{apply_operator, Interval};
use crate::physical_expr::down_cast_any_ref;
@@ -95,7 +96,9 @@ use datafusion_common::cast::{
use datafusion_common::scalar::*;
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
-use datafusion_expr::type_coercion::binary::binary_operator_data_type;
+use datafusion_expr::type_coercion::binary::{
+ binary_operator_data_type, coercion_decimal_mathematics_type,
+};
use datafusion_expr::{ColumnarValue, Operator};
/// Binary expression
@@ -383,12 +386,14 @@ macro_rules! compute_primitive_op_dyn_scalar {
/// LEFT is Decimal or Dictionary array of decimal values, RIGHT is scalar value
/// OP_TYPE is the return type of scalar function
macro_rules! compute_primitive_decimal_op_dyn_scalar {
- ($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{
+ ($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr, $RET_TYPE:expr) => {{
// generate the scalar function name, such as add_decimal_dyn_scalar,
// from the $OP parameter (which could have a value of add) and the
// suffix _decimal_dyn_scalar
if let Some(value) = $RIGHT {
- Ok(paste::expr! {[<$OP _decimal_dyn_scalar>]}($LEFT, value)?)
+ Ok(paste::expr! {[<$OP _decimal_dyn_scalar>]}(
+ $LEFT, value, $RET_TYPE,
+ )?)
} else {
// when the $RIGHT is a NULL, generate a NULL array of $OP_TYPE
Ok(Arc::new(new_null_array($OP_TYPE, $LEFT.len())))
@@ -437,15 +442,15 @@ macro_rules! binary_string_array_op {
/// The binary_primitive_array_op macro only evaluates for primitive types
/// like integers and floats.
macro_rules! binary_primitive_array_op_dyn {
- ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
+ ($LEFT:expr, $RIGHT:expr, $OP:ident, $RET_TYPE:expr) => {{
match $LEFT.data_type() {
DataType::Decimal128(_, _) => {
- Ok(paste::expr! {[<$OP _decimal>]}(&$LEFT, &$RIGHT)?)
+ Ok(paste::expr! {[<$OP _decimal>]}(&$LEFT, &$RIGHT, $RET_TYPE)?)
}
DataType::Dictionary(_, value_type)
if matches!(value_type.as_ref(), &DataType::Decimal128(_, _)) =>
{
- Ok(paste::expr! {[<$OP _decimal>]}(&$LEFT, &$RIGHT)?)
+ Ok(paste::expr! {[<$OP _decimal>]}(&$LEFT, &$RIGHT, $RET_TYPE)?)
}
_ => Ok(Arc::new(
$OP(&$LEFT, &$RIGHT).map_err(|err| DataFusionError::ArrowError(err))?,
@@ -458,13 +463,13 @@ macro_rules! binary_primitive_array_op_dyn {
/// The binary_primitive_array_op_dyn_scalar macro only evaluates for primitive
/// types like integers and floats.
macro_rules! binary_primitive_array_op_dyn_scalar {
- ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
+ ($LEFT:expr, $RIGHT:expr, $OP:ident, $RET_TYPE:expr) => {{
// unwrap underlying (non dictionary) value
let right = unwrap_dict_value($RIGHT);
let op_type = $LEFT.data_type();
let result: Result<Arc<dyn Array>> = match right {
- ScalarValue::Decimal128(v, _, _) => compute_primitive_decimal_op_dyn_scalar!($LEFT, v, $OP, op_type),
+ ScalarValue::Decimal128(v, _, _) => compute_primitive_decimal_op_dyn_scalar!($LEFT, v, $OP, op_type, $RET_TYPE),
ScalarValue::Int8(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, Int8Type),
ScalarValue::Int16(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, Int16Type),
ScalarValue::Int32(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, Int32Type),
@@ -662,9 +667,13 @@ impl PhysicalExpr for BinaryExpr {
let left_data_type = left_value.data_type();
let right_data_type = right_value.data_type();
+ let schema = batch.schema();
+ let input_schema = schema.as_ref();
+
match (&left_value, &left_data_type, &right_value, &right_data_type) {
// Types are equal => valid
- (_, l, _, r) if l == r => {}
+ // Decimal types can be different but still valid as we coerce them to the same type later
+ (_, l, _, r) if l == r || (is_decimal(l) && is_decimal(r)) => {}
// Allow comparing a dictionary value with its corresponding scalar value
(
ColumnarValue::Array(_),
@@ -677,7 +686,8 @@ impl PhysicalExpr for BinaryExpr {
scalar_t,
ColumnarValue::Array(_),
DataType::Dictionary(_, dict_t),
- ) if dict_t.as_ref() == scalar_t => {}
+ ) if dict_t.as_ref() == scalar_t
+ || (is_decimal(dict_t.as_ref()) && is_decimal(scalar_t)) => {}
_ => {
return Err(DataFusionError::Internal(format!(
"Cannot evaluate binary expression {:?} with types {:?} and {:?}",
@@ -686,11 +696,29 @@ impl PhysicalExpr for BinaryExpr {
}
}
+ // Coerce decimal types to the same scale and precision
+ let coerced_type = coercion_decimal_mathematics_type(
+ &self.op,
+ &left_data_type,
+ &right_data_type,
+ );
+ let (left_value, right_value) = if let Some(coerced_type) = coerced_type {
+ let options = CastOptions { safe: true };
+ let left_value = cast_column(&left_value, &coerced_type, &options)?;
+ let right_value = cast_column(&right_value, &coerced_type, &options)?;
+ (left_value, right_value)
+ } else {
+ // No need to coerce if it is not decimal or not math operation
+ (left_value, right_value)
+ };
+
+ let result_type = self.data_type(input_schema)?;
+
// Attempt to use special kernels if one input is scalar and the other is an array
let scalar_result = match (&left_value, &right_value) {
(ColumnarValue::Array(array), ColumnarValue::Scalar(scalar)) => {
// if left is array and right is literal - use scalar operations
- self.evaluate_array_scalar(array, scalar.clone())?
+ self.evaluate_array_scalar(array, scalar.clone(), &result_type)?
}
(ColumnarValue::Scalar(scalar), ColumnarValue::Array(array)) => {
// if right is literal and left is array - reverse operator and parameters
@@ -708,8 +736,14 @@ impl PhysicalExpr for BinaryExpr {
left_value.into_array(batch.num_rows()),
right_value.into_array(batch.num_rows()),
);
- self.evaluate_with_resolved_args(left, &left_data_type, right, &right_data_type)
- .map(|a| ColumnarValue::Array(a))
+ self.evaluate_with_resolved_args(
+ left,
+ &left_data_type,
+ right,
+ &right_data_type,
+ &result_type,
+ )
+ .map(|a| ColumnarValue::Array(a))
}
fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
@@ -1025,6 +1059,7 @@ impl BinaryExpr {
&self,
array: &dyn Array,
scalar: ScalarValue,
+ result_type: &DataType,
) -> Result<Option<Result<ArrayRef>>> {
let bool_type = &DataType::Boolean;
let scalar_result = match &self.op {
@@ -1047,19 +1082,29 @@ impl BinaryExpr {
binary_array_op_dyn_scalar!(array, scalar, neq, bool_type)
}
Operator::Plus => {
- binary_primitive_array_op_dyn_scalar!(array, scalar, add)
+ binary_primitive_array_op_dyn_scalar!(array, scalar, add, result_type)
}
Operator::Minus => {
- binary_primitive_array_op_dyn_scalar!(array, scalar, subtract)
+ binary_primitive_array_op_dyn_scalar!(
+ array,
+ scalar,
+ subtract,
+ result_type
+ )
}
Operator::Multiply => {
- binary_primitive_array_op_dyn_scalar!(array, scalar, multiply)
+ binary_primitive_array_op_dyn_scalar!(
+ array,
+ scalar,
+ multiply,
+ result_type
+ )
}
Operator::Divide => {
- binary_primitive_array_op_dyn_scalar!(array, scalar, divide)
+ binary_primitive_array_op_dyn_scalar!(array, scalar, divide, result_type)
}
Operator::Modulo => {
- binary_primitive_array_op_dyn_scalar!(array, scalar, modulus)
+ binary_primitive_array_op_dyn_scalar!(array, scalar, modulus, result_type)
}
Operator::RegexMatch => binary_string_array_flag_op_scalar!(
array,
@@ -1140,6 +1185,7 @@ impl BinaryExpr {
left_data_type: &DataType,
right: Arc<dyn Array>,
right_data_type: &DataType,
+ result_type: &DataType,
) -> Result<ArrayRef> {
match &self.op {
Operator::Lt => lt_dyn(&left, &right),
@@ -1161,16 +1207,20 @@ impl BinaryExpr {
Operator::IsNotDistinctFrom => {
binary_array_op!(left, right, is_not_distinct_from)
}
- Operator::Plus => binary_primitive_array_op_dyn!(left, right, add_dyn),
- Operator::Minus => binary_primitive_array_op_dyn!(left, right, subtract_dyn),
+ Operator::Plus => {
+ binary_primitive_array_op_dyn!(left, right, add_dyn, result_type)
+ }
+ Operator::Minus => {
+ binary_primitive_array_op_dyn!(left, right, subtract_dyn, result_type)
+ }
Operator::Multiply => {
- binary_primitive_array_op_dyn!(left, right, multiply_dyn)
+ binary_primitive_array_op_dyn!(left, right, multiply_dyn, result_type)
}
Operator::Divide => {
- binary_primitive_array_op_dyn!(left, right, divide_dyn_opt)
+ binary_primitive_array_op_dyn!(left, right, divide_dyn_opt, result_type)
}
Operator::Modulo => {
- binary_primitive_array_op_dyn!(left, right, modulus_dyn)
+ binary_primitive_array_op_dyn!(left, right, modulus_dyn, result_type)
}
Operator::And => {
if left_data_type == &DataType::Boolean {
@@ -1236,7 +1286,7 @@ pub fn binary(
"The type of {lhs_type} {op:?} {rhs_type} of binary physical should be same"
)));
}
- if !lhs_type.eq(rhs_type) {
+ if !lhs_type.eq(rhs_type) && (!is_decimal(lhs_type) && !is_decimal(rhs_type)) {
return Err(DataFusionError::Internal(format!(
"The type of {lhs_type} {op:?} {rhs_type} of binary physical should be same"
)));
@@ -2050,7 +2100,7 @@ mod tests {
ArrowNumericType, Decimal128Type, Field, Int32Type, SchemaRef,
};
use datafusion_common::{ColumnStatistics, Result, Statistics};
- use datafusion_expr::type_coercion::binary::coerce_types;
+ use datafusion_expr::type_coercion::binary::{coerce_types, math_decimal_coercion};
// Create a binary expression without coercion. Used here when we do not want to coerce the expressions
// to valid types. Usage can result in an execution (after plan) error.
@@ -2606,7 +2656,7 @@ mod tests {
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Plus,
- create_decimal_array(&[Some(247), None, None, Some(247), Some(246)], 10, 0),
+ create_decimal_array(&[Some(247), None, None, Some(247), Some(246)], 11, 0),
)?;
Ok(())
@@ -2691,7 +2741,7 @@ mod tests {
let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
let decimal_array = create_decimal_array(
&[Some(value + 1), None, Some(value), Some(value + 2)],
- 10,
+ 11,
0,
);
let expected = DictionaryArray::try_new(&keys, &decimal_array)?;
@@ -2825,7 +2875,7 @@ mod tests {
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Minus,
- create_decimal_array(&[Some(-1), None, None, Some(1), Some(0)], 10, 0),
+ create_decimal_array(&[Some(-1), None, None, Some(1), Some(0)], 11, 0),
)?;
Ok(())
@@ -2910,7 +2960,7 @@ mod tests {
let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
let decimal_array = create_decimal_array(
&[Some(value - 1), None, Some(value - 2), Some(value)],
- 10,
+ 11,
0,
);
let expected = DictionaryArray::try_new(&keys, &decimal_array)?;
@@ -3038,7 +3088,7 @@ mod tests {
Operator::Multiply,
create_decimal_array(
&[Some(15252), None, None, Some(15252), Some(15129)],
- 10,
+ 21,
0,
),
)?;
@@ -3124,7 +3174,7 @@ mod tests {
let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
let decimal_array =
- create_decimal_array(&[Some(246), None, Some(244), Some(248)], 10, 0);
+ create_decimal_array(&[Some(246), None, Some(244), Some(248)], 21, 0);
let expected = DictionaryArray::try_new(&keys, &decimal_array)?;
apply_arithmetic_scalar(
@@ -3254,7 +3304,17 @@ mod tests {
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Divide,
- create_decimal_array(&[Some(0), None, None, Some(1), Some(1)], 10, 0),
+ create_decimal_array(
+ &[
+ Some(99193548387), // 0.99193548387
+ None,
+ None,
+ Some(100813008130), // 1.0081300813
+ Some(100000000000), // 1.0
+ ],
+ 21,
+ 11,
+ ),
)?;
Ok(())
@@ -3337,8 +3397,16 @@ mod tests {
let a = DictionaryArray::try_new(&keys, &decimal_array)?;
let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
- let decimal_array =
- create_decimal_array(&[Some(61), None, Some(61), Some(62)], 10, 0);
+ let decimal_array = create_decimal_array(
+ &[
+ Some(6150000000000),
+ None,
+ Some(6100000000000),
+ Some(6200000000000),
+ ],
+ 21,
+ 11,
+ );
let expected = DictionaryArray::try_new(&keys, &decimal_array)?;
apply_arithmetic_scalar(
@@ -4715,38 +4783,47 @@ mod tests {
Ok(())
}
- fn apply_arithmetic_op(
+ fn apply_decimal_arithmetic_op(
schema: &SchemaRef,
left: &ArrayRef,
right: &ArrayRef,
op: Operator,
expected: ArrayRef,
) -> Result<()> {
- let op_type = coerce_types(left.data_type(), &op, right.data_type())?;
- let left_expr = if left.data_type().eq(&op_type) {
- col("a", schema)?
+ let (lhs_op_type, rhs_op_type) =
+ math_decimal_coercion(left.data_type(), right.data_type());
+
+ let (left_expr, lhs_type) = if let Some(lhs_op_type) = lhs_op_type {
+ (
+ try_cast(col("a", schema)?, schema, lhs_op_type.clone())?,
+ lhs_op_type,
+ )
} else {
- try_cast(col("a", schema)?, schema, op_type.clone())?
+ (col("a", schema)?, left.data_type().clone())
};
- let right_expr = if right.data_type().eq(&op_type) {
- col("b", schema)?
+ let (right_expr, rhs_type) = if let Some(rhs_op_type) = rhs_op_type {
+ (
+ try_cast(col("b", schema)?, schema, rhs_op_type.clone())?,
+ rhs_op_type,
+ )
} else {
- try_cast(col("b", schema)?, schema, op_type.clone())?
+ (col("b", schema)?, right.data_type().clone())
};
let coerced_schema = Schema::new(vec![
Field::new(
schema.field(0).name(),
- op_type.clone(),
+ lhs_type,
schema.field(0).is_nullable(),
),
Field::new(
schema.field(1).name(),
- op_type,
+ rhs_type,
schema.field(1).is_nullable(),
),
]);
+
let arithmetic_op = binary_simple(left_expr, op, right_expr, &coerced_schema);
let data: Vec<ArrayRef> = vec![left.clone(), right.clone()];
let batch = RecordBatch::try_new(schema.clone(), data)?;
@@ -4786,7 +4863,7 @@ mod tests {
13,
2,
)) as ArrayRef;
- apply_arithmetic_op(
+ apply_decimal_arithmetic_op(
&schema,
&int32_array,
&decimal_array,
@@ -4797,18 +4874,18 @@ mod tests {
// subtract: decimal array subtract int32 array
let schema = Arc::new(Schema::new(vec![
- Field::new("b", DataType::Int32, true),
Field::new("a", DataType::Decimal128(10, 2), true),
+ Field::new("b", DataType::Int32, true),
]));
let expect = Arc::new(create_decimal_array(
&[Some(-12177), None, Some(-12178), Some(-12276)],
13,
2,
)) as ArrayRef;
- apply_arithmetic_op(
+ apply_decimal_arithmetic_op(
&schema,
- &int32_array,
&decimal_array,
+ &int32_array,
Operator::Minus,
expect,
)
@@ -4820,10 +4897,10 @@ mod tests {
21,
2,
)) as ArrayRef;
- apply_arithmetic_op(
+ apply_decimal_arithmetic_op(
&schema,
- &int32_array,
&decimal_array,
+ &int32_array,
Operator::Multiply,
expect,
)
@@ -4844,7 +4921,7 @@ mod tests {
23,
11,
)) as ArrayRef;
- apply_arithmetic_op(
+ apply_decimal_arithmetic_op(
&schema,
&int32_array,
&decimal_array,
@@ -4863,7 +4940,7 @@ mod tests {
10,
2,
)) as ArrayRef;
- apply_arithmetic_op(
+ apply_decimal_arithmetic_op(
&schema,
&int32_array,
&decimal_array,
@@ -4906,7 +4983,7 @@ mod tests {
Some(124.22),
Some(125.24),
])) as ArrayRef;
- apply_arithmetic_op(
+ apply_decimal_arithmetic_op(
&schema,
&float64_array,
&decimal_array,
@@ -4926,7 +5003,7 @@ mod tests {
Some(121.78),
Some(122.76),
])) as ArrayRef;
- apply_arithmetic_op(
+ apply_decimal_arithmetic_op(
&schema,
&float64_array,
&decimal_array,
@@ -4942,7 +5019,7 @@ mod tests {
Some(150.06),
Some(153.76),
])) as ArrayRef;
- apply_arithmetic_op(
+ apply_decimal_arithmetic_op(
&schema,
&float64_array,
&decimal_array,
@@ -4962,7 +5039,7 @@ mod tests {
Some(100.81967213114754),
Some(100.0),
])) as ArrayRef;
- apply_arithmetic_op(
+ apply_decimal_arithmetic_op(
&schema,
&float64_array,
&decimal_array,
@@ -4982,7 +5059,7 @@ mod tests {
Some(1.0000000000000027),
Some(8.881784197001252e-16),
])) as ArrayRef;
- apply_arithmetic_op(
+ apply_decimal_arithmetic_op(
&schema,
&float64_array,
&decimal_array,
@@ -5025,7 +5102,11 @@ mod tests {
schema,
vec![left_decimal_array, right_decimal_array],
Operator::Divide,
- create_decimal_array(&[Some(123456700), None], 25, 3),
+ create_decimal_array(
+ &[Some(12345670000000000000000000000000000), None],
+ 38,
+ 29,
+ ),
)?;
Ok(())
diff --git a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
index 836ea93450..8998738ba5 100644
--- a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
+++ b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
@@ -28,6 +28,8 @@ use arrow::{array::*, datatypes::ArrowNumericType, downcast_dictionary_array};
use arrow_schema::DataType;
use datafusion_common::cast::as_decimal128_array;
use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::type_coercion::binary::decimal_op_mathematics_type;
+use datafusion_expr::Operator;
use std::sync::Arc;
// Simple (low performance) kernels until optimized kernels are added to arrow
@@ -257,14 +259,22 @@ pub(crate) fn is_not_distinct_from_decimal(
.collect())
}
-pub(crate) fn add_dyn_decimal(left: &dyn Array, right: &dyn Array) -> Result<ArrayRef> {
- let (precision, scale) = get_precision_scale(left)?;
+pub(crate) fn add_dyn_decimal(
+ left: &dyn Array,
+ right: &dyn Array,
+ result_type: &DataType,
+) -> Result<ArrayRef> {
+ let (precision, scale) = get_precision_scale(result_type)?;
let array = add_dyn(left, right)?;
decimal_array_with_precision_scale(array, precision, scale)
}
-pub(crate) fn add_decimal_dyn_scalar(left: &dyn Array, right: i128) -> Result<ArrayRef> {
- let (precision, scale) = get_precision_scale(left)?;
+pub(crate) fn add_decimal_dyn_scalar(
+ left: &dyn Array,
+ right: i128,
+ result_type: &DataType,
+) -> Result<ArrayRef> {
+ let (precision, scale) = get_precision_scale(result_type)?;
let array = add_scalar_dyn::<Decimal128Type>(left, right)?;
decimal_array_with_precision_scale(array, precision, scale)
@@ -273,15 +283,16 @@ pub(crate) fn add_decimal_dyn_scalar(left: &dyn Array, right: i128) -> Result<Ar
pub(crate) fn subtract_decimal_dyn_scalar(
left: &dyn Array,
right: i128,
+ result_type: &DataType,
) -> Result<ArrayRef> {
- let (precision, scale) = get_precision_scale(left)?;
+ let (precision, scale) = get_precision_scale(result_type)?;
let array = subtract_scalar_dyn::<Decimal128Type>(left, right)?;
decimal_array_with_precision_scale(array, precision, scale)
}
-fn get_precision_scale(left: &dyn Array) -> Result<(u8, i8)> {
- match left.data_type() {
+fn get_precision_scale(data_type: &DataType) -> Result<(u8, i8)> {
+ match data_type {
DataType::Decimal128(precision, scale) => Ok((*precision, *scale)),
DataType::Dictionary(_, value_type) => match value_type.as_ref() {
DataType::Decimal128(precision, scale) => Ok((*precision, *scale)),
@@ -333,22 +344,35 @@ fn decimal_array_with_precision_scale(
pub(crate) fn multiply_decimal_dyn_scalar(
left: &dyn Array,
right: i128,
+ result_type: &DataType,
) -> Result<ArrayRef> {
- let (precision, scale) = get_precision_scale(left)?;
+ let (precision, scale) = get_precision_scale(result_type)?;
- let array = multiply_scalar_dyn::<Decimal128Type>(left, right)?;
+ let op_type = decimal_op_mathematics_type(
+ &Operator::Multiply,
+ left.data_type(),
+ left.data_type(),
+ )
+ .unwrap();
+ let (_, op_scale) = get_precision_scale(&op_type)?;
- let divide = 10_i128.pow(scale as u32);
- let array = divide_scalar_dyn::<Decimal128Type>(&array, divide)?;
+ let array = multiply_scalar_dyn::<Decimal128Type>(left, right)?;
- decimal_array_with_precision_scale(array, precision, scale)
+ if op_scale > scale {
+ let div = 10_i128.pow((op_scale - scale) as u32);
+ let array = divide_scalar_dyn::<Decimal128Type>(&array, div)?;
+ decimal_array_with_precision_scale(array, precision, scale)
+ } else {
+ decimal_array_with_precision_scale(array, precision, scale)
+ }
}
pub(crate) fn divide_decimal_dyn_scalar(
left: &dyn Array,
right: i128,
+ result_type: &DataType,
) -> Result<ArrayRef> {
- let (precision, scale) = get_precision_scale(left)?;
+ let (precision, scale) = get_precision_scale(result_type)?;
let mul = 10_i128.pow(scale as u32);
let array = multiply_scalar_dyn::<Decimal128Type>(left, mul)?;
@@ -360,8 +384,9 @@ pub(crate) fn divide_decimal_dyn_scalar(
pub(crate) fn subtract_dyn_decimal(
left: &dyn Array,
right: &dyn Array,
+ result_type: &DataType,
) -> Result<ArrayRef> {
- let (precision, scale) = get_precision_scale(left)?;
+ let (precision, scale) = get_precision_scale(result_type)?;
let array = subtract_dyn(left, right)?;
decimal_array_with_precision_scale(array, precision, scale)
}
@@ -369,24 +394,41 @@ pub(crate) fn subtract_dyn_decimal(
pub(crate) fn multiply_dyn_decimal(
left: &dyn Array,
right: &dyn Array,
+ result_type: &DataType,
) -> Result<ArrayRef> {
- let (precision, scale) = get_precision_scale(left)?;
+ let (precision, scale) = get_precision_scale(result_type)?;
+
+ let op_type = decimal_op_mathematics_type(
+ &Operator::Multiply,
+ left.data_type(),
+ left.data_type(),
+ )
+ .unwrap();
+ let (_, op_scale) = get_precision_scale(&op_type)?;
- let divide = 10_i128.pow(scale as u32);
let array = multiply_dyn(left, right)?;
- let array = divide_scalar_dyn::<Decimal128Type>(&array, divide)?;
- decimal_array_with_precision_scale(array, precision, scale)
+ if op_scale > scale {
+ let div = 10_i128.pow((op_scale - scale) as u32);
+ let array = divide_scalar_dyn::<Decimal128Type>(&array, div)?;
+ decimal_array_with_precision_scale(array, precision, scale)
+ } else {
+ decimal_array_with_precision_scale(array, precision, scale)
+ }
}
pub(crate) fn divide_dyn_opt_decimal(
left: &dyn Array,
right: &dyn Array,
+ result_type: &DataType,
) -> Result<ArrayRef> {
- let (precision, scale) = get_precision_scale(left)?;
+ let (precision, scale) = get_precision_scale(result_type)?;
let mul = 10_i128.pow(scale as u32);
let array = multiply_scalar_dyn::<Decimal128Type>(left, mul)?;
- let array = decimal_array_with_precision_scale(array, precision, scale)?;
+
+ // Restore to original precision and scale (metadata only)
+ let (org_precision, org_scale) = get_precision_scale(right.data_type())?;
+ let array = decimal_array_with_precision_scale(array, org_precision, org_scale)?;
let array = divide_dyn_opt(&array, right)?;
decimal_array_with_precision_scale(array, precision, scale)
}
@@ -394,8 +436,9 @@ pub(crate) fn divide_dyn_opt_decimal(
pub(crate) fn modulus_dyn_decimal(
left: &dyn Array,
right: &dyn Array,
+ result_type: &DataType,
) -> Result<ArrayRef> {
- let (precision, scale) = get_precision_scale(left)?;
+ let (precision, scale) = get_precision_scale(result_type)?;
let array = modulus_dyn(left, right)?;
decimal_array_with_precision_scale(array, precision, scale)
}
@@ -403,8 +446,9 @@ pub(crate) fn modulus_dyn_decimal(
pub(crate) fn modulus_decimal_dyn_scalar(
left: &dyn Array,
right: i128,
+ result_type: &DataType,
) -> Result<ArrayRef> {
- let (precision, scale) = get_precision_scale(left)?;
+ let (precision, scale) = get_precision_scale(result_type)?;
let array = modulus_scalar_dyn::<Decimal128Type>(left, right)?;
decimal_array_with_precision_scale(array, precision, scale)
@@ -503,36 +547,71 @@ mod tests {
3,
);
// add
- let result = add_dyn_decimal(&left_decimal_array, &right_decimal_array)?;
+ let result_type = decimal_op_mathematics_type(
+ &Operator::Plus,
+ left_decimal_array.data_type(),
+ right_decimal_array.data_type(),
+ )
+ .unwrap();
+ let result =
+ add_dyn_decimal(&left_decimal_array, &right_decimal_array, &result_type)?;
let result = as_decimal128_array(&result)?;
let expect =
- create_decimal_array(&[Some(246), None, Some(245), Some(247)], 25, 3);
+ create_decimal_array(&[Some(246), None, Some(245), Some(247)], 26, 3);
assert_eq!(&expect, result);
- let result = add_decimal_dyn_scalar(&left_decimal_array, 10)?;
+ let result = add_decimal_dyn_scalar(&left_decimal_array, 10, &result_type)?;
let result = as_decimal128_array(&result)?;
let expect =
- create_decimal_array(&[Some(133), None, Some(132), Some(134)], 25, 3);
+ create_decimal_array(&[Some(133), None, Some(132), Some(134)], 26, 3);
assert_eq!(&expect, result);
// subtract
- let result = subtract_dyn_decimal(&left_decimal_array, &right_decimal_array)?;
+ let result_type = decimal_op_mathematics_type(
+ &Operator::Minus,
+ left_decimal_array.data_type(),
+ right_decimal_array.data_type(),
+ )
+ .unwrap();
+ let result = subtract_dyn_decimal(
+ &left_decimal_array,
+ &right_decimal_array,
+ &result_type,
+ )?;
let result = as_decimal128_array(&result)?;
- let expect = create_decimal_array(&[Some(0), None, Some(-1), Some(1)], 25, 3);
+ let expect = create_decimal_array(&[Some(0), None, Some(-1), Some(1)], 26, 3);
assert_eq!(&expect, result);
- let result = subtract_decimal_dyn_scalar(&left_decimal_array, 10)?;
+ let result = subtract_decimal_dyn_scalar(&left_decimal_array, 10, &result_type)?;
let result = as_decimal128_array(&result)?;
let expect =
- create_decimal_array(&[Some(113), None, Some(112), Some(114)], 25, 3);
+ create_decimal_array(&[Some(113), None, Some(112), Some(114)], 26, 3);
assert_eq!(&expect, result);
// multiply
- let result = multiply_dyn_decimal(&left_decimal_array, &right_decimal_array)?;
+ let result_type = decimal_op_mathematics_type(
+ &Operator::Multiply,
+ left_decimal_array.data_type(),
+ right_decimal_array.data_type(),
+ )
+ .unwrap();
+ let result = multiply_dyn_decimal(
+ &left_decimal_array,
+ &right_decimal_array,
+ &result_type,
+ )?;
let result = as_decimal128_array(&result)?;
- let expect = create_decimal_array(&[Some(15), None, Some(15), Some(15)], 25, 3);
+ let expect =
+ create_decimal_array(&[Some(15129), None, Some(15006), Some(15252)], 38, 6);
assert_eq!(&expect, result);
- let result = multiply_decimal_dyn_scalar(&left_decimal_array, 10)?;
+ let result = multiply_decimal_dyn_scalar(&left_decimal_array, 10, &result_type)?;
let result = as_decimal128_array(&result)?;
- let expect = create_decimal_array(&[Some(1), None, Some(1), Some(1)], 25, 3);
+ let expect =
+ create_decimal_array(&[Some(1230), None, Some(1220), Some(1240)], 38, 6);
assert_eq!(&expect, result);
// divide
+ let result_type = decimal_op_mathematics_type(
+ &Operator::Divide,
+ left_decimal_array.data_type(),
+ right_decimal_array.data_type(),
+ )
+ .unwrap();
let left_decimal_array = create_decimal_array(
&[
Some(1234567),
@@ -549,34 +628,52 @@ mod tests {
25,
3,
);
- let result = divide_dyn_opt_decimal(&left_decimal_array, &right_decimal_array)?;
+ let result = divide_dyn_opt_decimal(
+ &left_decimal_array,
+ &right_decimal_array,
+ &result_type,
+ )?;
let result = as_decimal128_array(&result)?;
let expect = create_decimal_array(
- &[Some(123456700), None, Some(22446672), Some(-10037130), None],
- 25,
- 3,
+ &[
+ Some(12345670000000000000000000000000000),
+ None,
+ Some(2244667272727272727272727272727272),
+ Some(-1003713008130081300813008130081300),
+ None,
+ ],
+ 38,
+ 29,
);
assert_eq!(&expect, result);
- let result = divide_decimal_dyn_scalar(&left_decimal_array, 10)?;
+ let result = divide_decimal_dyn_scalar(&left_decimal_array, 10, &result_type)?;
let result = as_decimal128_array(&result)?;
let expect = create_decimal_array(
&[
- Some(123456700),
+ Some(12345670000000000000000000000000000),
None,
- Some(123456700),
- Some(123456700),
- Some(123456700),
+ Some(12345670000000000000000000000000000),
+ Some(12345670000000000000000000000000000),
+ Some(12345670000000000000000000000000000),
],
- 25,
- 3,
+ 38,
+ 29,
);
assert_eq!(&expect, result);
- let result = modulus_dyn_decimal(&left_decimal_array, &right_decimal_array)?;
+ // modulus
+ let result_type = decimal_op_mathematics_type(
+ &Operator::Modulo,
+ left_decimal_array.data_type(),
+ right_decimal_array.data_type(),
+ )
+ .unwrap();
+ let result =
+ modulus_dyn_decimal(&left_decimal_array, &right_decimal_array, &result_type)?;
let result = as_decimal128_array(&result)?;
let expect =
create_decimal_array(&[Some(7), None, Some(37), Some(16), None], 25, 3);
assert_eq!(&expect, result);
- let result = modulus_decimal_dyn_scalar(&left_decimal_array, 10)?;
+ let result = modulus_decimal_dyn_scalar(&left_decimal_array, 10, &result_type)?;
let result = as_decimal128_array(&result)?;
let expect =
create_decimal_array(&[Some(7), None, Some(7), Some(7), Some(7)], 25, 3);
@@ -590,12 +687,27 @@ mod tests {
let left_decimal_array = create_decimal_array(&[Some(101)], 10, 1);
let right_decimal_array = create_decimal_array(&[Some(0)], 1, 1);
- let err = divide_decimal_dyn_scalar(&left_decimal_array, 0).unwrap_err();
+ let result_type = decimal_op_mathematics_type(
+ &Operator::Divide,
+ left_decimal_array.data_type(),
+ right_decimal_array.data_type(),
+ )
+ .unwrap();
+ let err =
+ divide_decimal_dyn_scalar(&left_decimal_array, 0, &result_type).unwrap_err();
assert_eq!("Arrow error: Divide by zero error", err.to_string());
+ let result_type = decimal_op_mathematics_type(
+ &Operator::Modulo,
+ left_decimal_array.data_type(),
+ right_decimal_array.data_type(),
+ )
+ .unwrap();
let err =
- modulus_dyn_decimal(&left_decimal_array, &right_decimal_array).unwrap_err();
+ modulus_dyn_decimal(&left_decimal_array, &right_decimal_array, &result_type)
+ .unwrap_err();
assert_eq!("Arrow error: Divide by zero error", err.to_string());
- let err = modulus_decimal_dyn_scalar(&left_decimal_array, 0).unwrap_err();
+ let err =
+ modulus_decimal_dyn_scalar(&left_decimal_array, 0, &result_type).unwrap_err();
assert_eq!("Arrow error: Divide by zero error", err.to_string());
}