You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/04/20 07:20:49 UTC

[arrow-datafusion] branch main updated: Decimal multiply kernel should not cause precision loss (#5980)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e81f54b278 Decimal multiply kernel should not cause precision loss (#5980)
e81f54b278 is described below

commit e81f54b2786a8ebdbf6dfc1f47ddbed85338be55
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Thu Apr 20 00:20:43 2023 -0700

    Decimal multiply kernel should not cause precision loss (#5980)
    
    * Init
    
    * More
    
    * More
    
    * More
    
    * Fix
    
    * fix
    
    * More
    
    * Fix
    
    * Add query comment
    
    * Update expected plans
    
    * Fix
    
    * Fix clippy
    
    * Fix
    
    * Fix
    
    * Fix
    
    * Fix
    
    * Enable verify_q6 test
---
 benchmarks/expected-plans/q1.txt                   |  46 ++--
 benchmarks/expected-plans/q10.txt                  | 126 ++++-----
 benchmarks/expected-plans/q11.txt                  |   4 +-
 benchmarks/expected-plans/q14.txt                  |  60 ++--
 benchmarks/expected-plans/q15.txt                  | 132 ++++-----
 benchmarks/expected-plans/q19.txt                  |   2 +-
 benchmarks/expected-plans/q3.txt                   | 110 ++++----
 benchmarks/expected-plans/q5.txt                   | 170 ++++++------
 benchmarks/expected-plans/q7.txt                   |   4 +-
 benchmarks/expected-plans/q8.txt                   |   8 +-
 benchmarks/expected-plans/q9.txt                   | 158 +++++------
 benchmarks/queries/q8.sql                          |   8 +-
 benchmarks/src/bin/tpch.rs                         |   1 -
 .../tests/sqllogictests/src/engines/conversion.rs  |   5 +-
 .../src/engines/datafusion/normalize.rs            |   5 +-
 .../core/tests/sqllogictests/test_files/tpch.slt   |   6 +-
 datafusion/expr/src/type_coercion/binary.rs        | 305 +++++++++++++++++----
 datafusion/expr/src/type_coercion/mod.rs           |   5 +
 datafusion/optimizer/src/analyzer/type_coercion.rs |  29 +-
 datafusion/physical-expr/src/expressions/binary.rs | 201 ++++++++++----
 .../src/expressions/binary/kernels_arrow.rs        | 212 ++++++++++----
 21 files changed, 1005 insertions(+), 592 deletions(-)

diff --git a/benchmarks/expected-plans/q1.txt b/benchmarks/expected-plans/q1.txt
index e02c1402a3..c45329cb11 100644
--- a/benchmarks/expected-plans/q1.txt
+++ b/benchmarks/expected-plans/q1.txt
@@ -1,23 +1,23 @@
-+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
-| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       [...]
-+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
-| logical_plan  | Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST                                                                                                                                                                                                                                                                                                                                                                                                           [...]
-|               |   Projection: lineitem.l_returnflag, lineitem.l_linestatus, SUM(lineitem.l_quantity) AS sum_qty, SUM(lineitem.l_extendedprice) AS sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(UInt8(1)) AS co [...]
-|               |     Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) [...]
-|               |       Projection: CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(line [...]
-|               |         Filter: lineitem.l_shipdate <= Date32("10471")                                                                                                                                                                                                                                                                                                                                                                                                                                     [...]
-|               |           TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate]                                                                                                                                                                                                                                                                                                                                                          [...]
-| physical_plan | SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                     [...]
-|               |   SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                             [...]
-|               |     ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, SUM(lineitem.l_extendedprice)@3 as sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, AVG(lineitem.l_quantity)@6 as avg_qty, AVG(lineitem.l_extendedprice)@7 as avg_price, AVG(lineitem.l_di [...]
-|               |       AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]                                           [...]
-|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                        [...]
-|               |           RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2), input_partitions=2                                                                                                                                                                                                                                                                                                                                [...]
-|               |             AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]                                              [...]
-|               |               ProjectionExec: expr=[CAST(l_extendedprice@1 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@2 AS Decimal128(23, 2)) AS Decimal128(38, 4)) as CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_d [...]
-|               |                 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                                                                                                                                                                                                                                       [...]
-|               |                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                              [...]
-|               |                     FilterExec: l_shipdate@6 <= 10471                                                                                                                                                                                                                                                                                                                                                                                                                                      [...]
-|               |                       MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                                                                                                                                                                                         [...]
-|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            [...]
-+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
\ No newline at end of file
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       [...]
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| logical_plan  | Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST                                                                                                                                                                                                                                                                                                                                                                                                           [...]
+|               |   Projection: lineitem.l_returnflag, lineitem.l_linestatus, SUM(lineitem.l_quantity) AS sum_qty, SUM(lineitem.l_extendedprice) AS sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(UInt8(1)) AS co [...]
+|               |     Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), [...]
+|               |       Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus                                                            [...]
+|               |         Filter: lineitem.l_shipdate <= Date32("10471")                                                                                                                                                                                                                                                                                                                                                                                                                                     [...]
+|               |           TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate]                                                                                                                                                                                                                                                                                                                                                          [...]
+| physical_plan | SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                     [...]
+|               |   SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                             [...]
+|               |     ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, SUM(lineitem.l_extendedprice)@3 as sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, AVG(lineitem.l_quantity)@6 as avg_qty, AVG(lineitem.l_extendedprice)@7 as avg_price, AVG(lineitem.l_di [...]
+|               |       AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]                                           [...]
+|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                        [...]
+|               |           RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2), input_partitions=2                                                                                                                                                                                                                                                                                                                                [...]
+|               |             AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]                                              [...]
+|               |               ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]                     [...]
+|               |                 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                                                                                                                                                                                                                                       [...]
+|               |                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                              [...]
+|               |                     FilterExec: l_shipdate@6 <= 10471                                                                                                                                                                                                                                                                                                                                                                                                                                      [...]
+|               |                       MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                                                                                                                                                                                         [...]
+|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            [...]
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q10.txt b/benchmarks/expected-plans/q10.txt
index 794ab50938..3be2e9592e 100644
--- a/benchmarks/expected-plans/q10.txt
+++ b/benchmarks/expected-plans/q10.txt
@@ -1,63 +1,63 @@
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                      |
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan  | Sort: revenue DESC NULLS FIRST                                                                                                                                                                                                                                                                                                                                                                            |
-|               |   Projection: customer.c_custkey, customer.c_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue, customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone, customer.c_comment                                                                                                                                                                                 |
-|               |     Aggregate: groupBy=[[customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
-|               |       Projection: customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name                                                                                                                                                                                                         |
-|               |         Inner Join: customer.c_nationkey = nation.n_nationkey                                                                                                                                                                                                                                                                                                                                             |
-|               |           Projection: customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                                                                              |
-|               |             Inner Join: orders.o_orderkey = lineitem.l_orderkey                                                                                                                                                                                                                                                                                                                                           |
-|               |               Projection: customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment, orders.o_orderkey                                                                                                                                                                                                                      |
-|               |                 Inner Join: customer.c_custkey = orders.o_custkey                                                                                                                                                                                                                                                                                                                                         |
-|               |                   TableScan: customer projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_comment]                                                                                                                                                                                                                                                                               |
-|               |                   Projection: orders.o_orderkey, orders.o_custkey                                                                                                                                                                                                                                                                                                                                         |
-|               |                     Filter: orders.o_orderdate >= Date32("8674") AND orders.o_orderdate < Date32("8766")                                                                                                                                                                                                                                                                                                  |
-|               |                       TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate]                                                                                                                                                                                                                                                                                                                   |
-|               |               Projection: lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                                                                                                                                                                                              |
-|               |                 Filter: lineitem.l_returnflag = Utf8("R")                                                                                                                                                                                                                                                                                                                                                 |
-|               |                   TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_returnflag]                                                                                                                                                                                                                                                                                                  |
-|               |           TableScan: nation projection=[n_nationkey, n_name]                                                                                                                                                                                                                                                                                                                                              |
-| physical_plan | SortPreservingMergeExec: [revenue@2 DESC]                                                                                                                                                                                                                                                                                                                                                                 |
-|               |   SortExec: expr=[revenue@2 DESC]                                                                                                                                                                                                                                                                                                                                                                         |
-|               |     ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@7 as revenue, c_acctbal@2 as c_acctbal, n_name@4 as n_name, c_address@5 as c_address, c_phone@3 as c_phone, c_comment@6 as c_comment]                                                                                                                              |
-|               |       AggregateExec: mode=FinalPartitioned, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@2 as c_acctbal, c_phone@3 as c_phone, n_name@4 as n_name, c_address@5 as c_address, c_comment@6 as c_comment], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                             |
-|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                       |
-|               |           RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }, Column { name: "c_name", index: 1 }, Column { name: "c_acctbal", index: 2 }, Column { name: "c_phone", index: 3 }, Column { name: "n_name", index: 4 }, Column { name: "c_address", index: 5 }, Column { name: "c_comment", index: 6 }], 2), input_partitions=2                                                     |
-|               |             AggregateExec: mode=Partial, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@4 as c_acctbal, c_phone@3 as c_phone, n_name@8 as n_name, c_address@2 as c_address, c_comment@5 as c_comment], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                |
-|               |               ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, l_extendedprice@7 as l_extendedprice, l_discount@8 as l_discount, n_name@10 as n_name]                                                                                                                              |
-|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                               |
-|               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_nationkey", index: 3 }, Column { name: "n_nationkey", index: 0 })]                                                                                                                                                                                                                                              |
-|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                           |
-|               |                       RepartitionExec: partitioning=Hash([Column { name: "c_nationkey", index: 3 }], 2), input_partitions=2                                                                                                                                                                                                                                                                               |
-|               |                         ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_nationkey@3 as c_nationkey, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, l_extendedprice@9 as l_extendedprice, l_discount@10 as l_discount]                                                                                                          |
-|               |                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                     |
-|               |                             HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 7 }, Column { name: "l_orderkey", index: 0 })]                                                                                                                                                                                                                                      |
-|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                 |
-|               |                                 RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 7 }], 2), input_partitions=2                                                                                                                                                                                                                                                                      |
-|               |                                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2                                                                                                                                                                                                                                                                                                    |
-|               |                                     ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_nationkey@3 as c_nationkey, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, o_orderkey@7 as o_orderkey]                                                                                                                                     |
-|               |                                       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                         |
-|               |                                         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })]                                                                                                                                                                                                                            |
-|               |                                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                     |
-|               |                                             RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2), input_partitions=0                                                                                                                                                                                                                                                           |
-|               |                                               MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                                                                                |
-|               |                                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                     |
-|               |                                             RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 2), input_partitions=2                                                                                                                                                                                                                                                           |
-|               |                                               RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                                                                                                                        |
-|               |                                                 ProjectionExec: expr=[o_orderkey@0 as o_orderkey, o_custkey@1 as o_custkey]                                                                                                                                                                                                                                                                               |
-|               |                                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                             |
-|               |                                                     FilterExec: o_orderdate@2 >= 8674 AND o_orderdate@2 < 8766                                                                                                                                                                                                                                                                                            |
-|               |                                                       MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                                                                        |
-|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                 |
-|               |                                 RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=2                                                                                                                                                                                                                                                                      |
-|               |                                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                                                                                                                                    |
-|               |                                     ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]                                                                                                                                                                                                                                                   |
-|               |                                       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                         |
-|               |                                         FilterExec: l_returnflag@3 = R                                                                                                                                                                                                                                                                                                                                    |
-|               |                                           MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                                                                                    |
-|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                           |
-|               |                       RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=0                                                                                                                                                                                                                                                                               |
-|               |                         MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                                                                                                      |
-|               |                                                                                                                                                                                                                                                                                                                                                                                                           |
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                  |
++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan  | Sort: revenue DESC NULLS FIRST                                                                                                                                                                                                                                                                                                                        |
+|               |   Projection: customer.c_custkey, customer.c_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue, customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone, customer.c_comment                                                                                                                             |
+|               |     Aggregate: groupBy=[[customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment]], aggr=[[SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]                              |
+|               |       Projection: customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name                                                                                                                                                     |
+|               |         Inner Join: customer.c_nationkey = nation.n_nationkey                                                                                                                                                                                                                                                                                         |
+|               |           Projection: customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                          |
+|               |             Inner Join: orders.o_orderkey = lineitem.l_orderkey                                                                                                                                                                                                                                                                                       |
+|               |               Projection: customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment, orders.o_orderkey                                                                                                                                                                  |
+|               |                 Inner Join: customer.c_custkey = orders.o_custkey                                                                                                                                                                                                                                                                                     |
+|               |                   TableScan: customer projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_comment]                                                                                                                                                                                                                           |
+|               |                   Projection: orders.o_orderkey, orders.o_custkey                                                                                                                                                                                                                                                                                     |
+|               |                     Filter: orders.o_orderdate >= Date32("8674") AND orders.o_orderdate < Date32("8766")                                                                                                                                                                                                                                              |
+|               |                       TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate]                                                                                                                                                                                                                                                               |
+|               |               Projection: lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                                                                                                                                          |
+|               |                 Filter: lineitem.l_returnflag = Utf8("R")                                                                                                                                                                                                                                                                                             |
+|               |                   TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_returnflag]                                                                                                                                                                                                                                              |
+|               |           TableScan: nation projection=[n_nationkey, n_name]                                                                                                                                                                                                                                                                                          |
+| physical_plan | SortPreservingMergeExec: [revenue@2 DESC]                                                                                                                                                                                                                                                                                                             |
+|               |   SortExec: expr=[revenue@2 DESC]                                                                                                                                                                                                                                                                                                                     |
+|               |     ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@7 as revenue, c_acctbal@2 as c_acctbal, n_name@4 as n_name, c_address@5 as c_address, c_phone@3 as c_phone, c_comment@6 as c_comment]                                                                          |
+|               |       AggregateExec: mode=FinalPartitioned, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@2 as c_acctbal, c_phone@3 as c_phone, n_name@4 as n_name, c_address@5 as c_address, c_comment@6 as c_comment], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                         |
+|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                   |
+|               |           RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }, Column { name: "c_name", index: 1 }, Column { name: "c_acctbal", index: 2 }, Column { name: "c_phone", index: 3 }, Column { name: "n_name", index: 4 }, Column { name: "c_address", index: 5 }, Column { name: "c_comment", index: 6 }], 2), input_partitions=2 |
+|               |             AggregateExec: mode=Partial, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@4 as c_acctbal, c_phone@3 as c_phone, n_name@8 as n_name, c_address@2 as c_address, c_comment@5 as c_comment], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                            |
+|               |               ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, l_extendedprice@7 as l_extendedprice, l_discount@8 as l_discount, n_name@10 as n_name]                                                                          |
+|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                           |
+|               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_nationkey", index: 3 }, Column { name: "n_nationkey", index: 0 })]                                                                                                                                                                                          |
+|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                       |
+|               |                       RepartitionExec: partitioning=Hash([Column { name: "c_nationkey", index: 3 }], 2), input_partitions=2                                                                                                                                                                                                                           |
+|               |                         ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_nationkey@3 as c_nationkey, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, l_extendedprice@9 as l_extendedprice, l_discount@10 as l_discount]                                                      |
+|               |                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                 |
+|               |                             HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 7 }, Column { name: "l_orderkey", index: 0 })]                                                                                                                                                                                  |
+|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                             |
+|               |                                 RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 7 }], 2), input_partitions=2                                                                                                                                                                                                                  |
+|               |                                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2                                                                                                                                                                                                                                                |
+|               |                                     ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_address@2 as c_address, c_nationkey@3 as c_nationkey, c_phone@4 as c_phone, c_acctbal@5 as c_acctbal, c_comment@6 as c_comment, o_orderkey@7 as o_orderkey]                                                                                 |
+|               |                                       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                     |
+|               |                                         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })]                                                                                                                                                                        |
+|               |                                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                 |
+|               |                                             RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2), input_partitions=0                                                                                                                                                                                                       |
+|               |                                               MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                            |
+|               |                                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                 |
+|               |                                             RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 2), input_partitions=2                                                                                                                                                                                                       |
+|               |                                               RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                                                                    |
+|               |                                                 ProjectionExec: expr=[o_orderkey@0 as o_orderkey, o_custkey@1 as o_custkey]                                                                                                                                                                                                                           |
+|               |                                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                         |
+|               |                                                     FilterExec: o_orderdate@2 >= 8674 AND o_orderdate@2 < 8766                                                                                                                                                                                                                                        |
+|               |                                                       MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                    |
+|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                             |
+|               |                                 RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=2                                                                                                                                                                                                                  |
+|               |                                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                                                                                |
+|               |                                     ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]                                                                                                                                                                                               |
+|               |                                       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                     |
+|               |                                         FilterExec: l_returnflag@3 = R                                                                                                                                                                                                                                                                                |
+|               |                                           MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                                |
+|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                       |
+|               |                       RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=0                                                                                                                                                                                                                           |
+|               |                         MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                                                  |
+|               |                                                                                                                                                                                                                                                                                                                                                       |
++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q11.txt b/benchmarks/expected-plans/q11.txt
index 2f87f7c98e..0a732897c3 100644
--- a/benchmarks/expected-plans/q11.txt
+++ b/benchmarks/expected-plans/q11.txt
@@ -5,7 +5,7 @@
 |               |   Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value                                                                                  |
 |               |     Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > __scalar_sq_1.__value                                                              |
 |               |       CrossJoin:                                                                                                                                                                |
-|               |         Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]           |
+|               |         Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(partsupp.ps_supplycost * CAST(partsupp.ps_availqty AS Decimal128(10, 0)))]]                                      |
 |               |           Projection: partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost                                                                                         |
 |               |             Inner Join: supplier.s_nationkey = nation.n_nationkey                                                                                                               |
 |               |               Projection: partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey                                                               |
@@ -17,7 +17,7 @@
 |               |                   TableScan: nation projection=[n_nationkey, n_name]                                                                                                            |
 |               |         SubqueryAlias: __scalar_sq_1                                                                                                                                            |
 |               |           Projection: CAST(CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS Decimal128(38, 15)) AS __value                              |
-|               |             Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]                          |
+|               |             Aggregate: groupBy=[[]], aggr=[[SUM(partsupp.ps_supplycost * CAST(partsupp.ps_availqty AS Decimal128(10, 0)))]]                                                     |
 |               |               Projection: partsupp.ps_availqty, partsupp.ps_supplycost                                                                                                          |
 |               |                 Inner Join: supplier.s_nationkey = nation.n_nationkey                                                                                                           |
 |               |                   Projection: partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey                                                                                |
diff --git a/benchmarks/expected-plans/q14.txt b/benchmarks/expected-plans/q14.txt
index b1b8a423d0..43eb0ea7f5 100644
--- a/benchmarks/expected-plans/q14.txt
+++ b/benchmarks/expected-plans/q14.txt
@@ -1,30 +1,30 @@
-+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
-| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       [...]
-+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
-| logical_plan  | Projection: Float64(100) * CAST(SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%")  THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END) AS Float64) / CAST(SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS Float64) AS promo_revenue                                                                                                                                                                                                            [...]
-|               |   Aggregate: groupBy=[[]], aggr=[[SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2))CAST(lineitem.l_discount AS Decimal128(23, 2))lineitem.l [...]
-|               |     Projection: CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineit [...]
-|               |       Inner Join: lineitem.l_partkey = part.p_partkey                                                                                                                                                                                                                                                                                                                                                                                                                                      [...]
-|               |         Projection: lineitem.l_partkey, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                                                                                                                                                                                                                                                                                      [...]
-|               |           Filter: lineitem.l_shipdate >= Date32("9374") AND lineitem.l_shipdate < Date32("9404")                                                                                                                                                                                                                                                                                                                                                                                           [...]
-|               |             TableScan: lineitem projection=[l_partkey, l_extendedprice, l_discount, l_shipdate]                                                                                                                                                                                                                                                                                                                                                                                            [...]
-|               |         TableScan: part projection=[p_partkey, p_type]                                                                                                                                                                                                                                                                                                                                                                                                                                     [...]
-| physical_plan | ProjectionExec: expr=[100 * CAST(SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%")  THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END)@0 AS Float64) / CAST(SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 AS Float64) as promo_revenue]                                                                                                                                                                                                      [...]
-|               |   AggregateExec: mode=Final, gby=[], aggr=[SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%")  THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                                                                                                                                               [...]
-|               |     CoalescePartitionsExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                 [...]
-|               |       AggregateExec: mode=Partial, gby=[], aggr=[SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%")  THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                                                                                                                                         [...]
-|               |         ProjectionExec: expr=[CAST(l_extendedprice@1 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@2 AS Decimal128(23, 2)) AS Decimal128(38, 4)) as CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_discoun [...]
-|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                      [...]
-|               |             HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })]                                                                                                                                                                                                                                                                                                                                         [...]
-|               |               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                  [...]
-|               |                 RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=2                                                                                                                                                                                                                                                                                                                                                                        [...]
-|               |                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                                                                                                                                                                                                                                     [...]
-|               |                     ProjectionExec: expr=[l_partkey@0 as l_partkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]                                                                                                                                                                                                                                                                                                                                                      [...]
-|               |                       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                          [...]
-|               |                         FilterExec: l_shipdate@3 >= 9374 AND l_shipdate@3 < 9404                                                                                                                                                                                                                                                                                                                                                                                                           [...]
-|               |                           MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                                                                                                                                                                                     [...]
-|               |               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                  [...]
-|               |                 RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=0                                                                                                                                                                                                                                                                                                                                                                        [...]
-|               |                   MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                                                                                                                                                                                             [...]
-|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            [...]
-+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
\ No newline at end of file
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       [...]
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| logical_plan  | Projection: Float64(100) * CAST(SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%")  THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END) AS Float64) / CAST(SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS Float64) AS promo_revenue                                                                                                                                                                                                            [...]
+|               |   Aggregate: groupBy=[[]], aggr=[[SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount ELSE Decimal128(Some(0),38,4) END) AS SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%")  THEN lineitem.l_extendedprice * Int64( [...]
+|               |     Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice, part.p_type                                                                                                                                                                                   [...]
+|               |       Inner Join: lineitem.l_partkey = part.p_partkey                                                                                                                                                                                                                                                                                                                                                                                                                                      [...]
+|               |         Projection: lineitem.l_partkey, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                                                                                                                                                                                                                                                                                      [...]
+|               |           Filter: lineitem.l_shipdate >= Date32("9374") AND lineitem.l_shipdate < Date32("9404")                                                                                                                                                                                                                                                                                                                                                                                           [...]
+|               |             TableScan: lineitem projection=[l_partkey, l_extendedprice, l_discount, l_shipdate]                                                                                                                                                                                                                                                                                                                                                                                            [...]
+|               |         TableScan: part projection=[p_partkey, p_type]                                                                                                                                                                                                                                                                                                                                                                                                                                     [...]
+| physical_plan | ProjectionExec: expr=[100 * CAST(SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%")  THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END)@0 AS Float64) / CAST(SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 AS Float64) as promo_revenue]                                                                                                                                                                                                      [...]
+|               |   AggregateExec: mode=Final, gby=[], aggr=[SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%")  THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                                                                                                                                               [...]
+|               |     CoalescePartitionsExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                 [...]
+|               |       AggregateExec: mode=Partial, gby=[], aggr=[SUM(CASE WHEN part.p_type LIKE Utf8("PROMO%")  THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                                                                                                                                         [...]
+|               |         ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice, p_type@4 as p_type]                                                                                                                                                                                       [...]
+|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                      [...]
+|               |             HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })]                                                                                                                                                                                                                                                                                                                                         [...]
+|               |               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                  [...]
+|               |                 RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=2                                                                                                                                                                                                                                                                                                                                                                        [...]
+|               |                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                                                                                                                                                                                                                                     [...]
+|               |                     ProjectionExec: expr=[l_partkey@0 as l_partkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]                                                                                                                                                                                                                                                                                                                                                      [...]
+|               |                       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                          [...]
+|               |                         FilterExec: l_shipdate@3 >= 9374 AND l_shipdate@3 < 9404                                                                                                                                                                                                                                                                                                                                                                                                           [...]
+|               |                           MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                                                                                                                                                                                     [...]
+|               |               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                  [...]
+|               |                 RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=0                                                                                                                                                                                                                                                                                                                                                                        [...]
+|               |                   MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                                                                                                                                                                                             [...]
+|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            [...]
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q15.txt b/benchmarks/expected-plans/q15.txt
index 50cfe85418..208f4c6690 100644
--- a/benchmarks/expected-plans/q15.txt
+++ b/benchmarks/expected-plans/q15.txt
@@ -1,66 +1,66 @@
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type     | plan                                                                                                                                                                                                                                                                                                    |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan  | Sort: supplier.s_suppkey ASC NULLS LAST                                                                                                                                                                                                                                                                 |
-|               |   Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue                                                                                                                                                                                         |
-|               |     Inner Join: revenue0.total_revenue = __scalar_sq_1.__value                                                                                                                                                                                                                                          |
-|               |       Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue                                                                                                                                                                                     |
-|               |         Inner Join: supplier.s_suppkey = revenue0.supplier_no                                                                                                                                                                                                                                           |
-|               |           TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone]                                                                                                                                                                                                                        |
-|               |           SubqueryAlias: revenue0                                                                                                                                                                                                                                                                       |
-|               |             Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue                                                                                                                                                              |
-|               |               Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]   |
-|               |                 Projection: lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                                                                                           |
-|               |                   Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")                                                                                                                                                                                                |
-|               |                     TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]                                                                                                                                                                                                 |
-|               |       SubqueryAlias: __scalar_sq_1                                                                                                                                                                                                                                                                      |
-|               |         Projection: MAX(revenue0.total_revenue) AS __value                                                                                                                                                                                                                                              |
-|               |           Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]]                                                                                                                                                                                                                                 |
-|               |             SubqueryAlias: revenue0                                                                                                                                                                                                                                                                     |
-|               |               Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue                                                                                                                                                                                               |
-|               |                 Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
-|               |                   Projection: lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                                                                                         |
-|               |                     Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")                                                                                                                                                                                              |
-|               |                       TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]                                                                                                                                                                                               |
-| physical_plan | SortPreservingMergeExec: [s_suppkey@0 ASC NULLS LAST]                                                                                                                                                                                                                                                   |
-|               |   SortExec: expr=[s_suppkey@0 ASC NULLS LAST]                                                                                                                                                                                                                                                           |
-|               |     ProjectionExec: expr=[s_suppkey@0 as s_suppkey, s_name@1 as s_name, s_address@2 as s_address, s_phone@3 as s_phone, total_revenue@4 as total_revenue]                                                                                                                                               |
-|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                       |
-|               |         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "total_revenue", index: 4 }, Column { name: "__value", index: 0 })]                                                                                                                                                        |
-|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                   |
-|               |             RepartitionExec: partitioning=Hash([Column { name: "total_revenue", index: 4 }], 2), input_partitions=2                                                                                                                                                                                     |
-|               |               RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2                                                                                                                                                                                                                      |
-|               |                 ProjectionExec: expr=[s_suppkey@0 as s_suppkey, s_name@1 as s_name, s_address@2 as s_address, s_phone@3 as s_phone, total_revenue@5 as total_revenue]                                                                                                                                   |
-|               |                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                           |
-|               |                     HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_suppkey", index: 0 }, Column { name: "supplier_no", index: 0 })]                                                                                                                                            |
-|               |                       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                       |
-|               |                         RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2), input_partitions=0                                                                                                                                                                             |
-|               |                           MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                  |
-|               |                       ProjectionExec: expr=[l_suppkey@0 as supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as total_revenue]                                                                                                                                              |
-|               |                         AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                                     |
-|               |                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                   |
-|               |                             RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 0 }], 2), input_partitions=2                                                                                                                                                                         |
-|               |                               AggregateExec: mode=Partial, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                                        |
-|               |                                 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                                    |
-|               |                                   ProjectionExec: expr=[l_suppkey@0 as l_suppkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]                                                                                                                                                     |
-|               |                                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                         |
-|               |                                       FilterExec: l_shipdate@3 >= 9496 AND l_shipdate@3 < 9587                                                                                                                                                                                                          |
-|               |                                         MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                    |
-|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                   |
-|               |             RepartitionExec: partitioning=Hash([Column { name: "__value", index: 0 }], 2), input_partitions=1                                                                                                                                                                                           |
-|               |               ProjectionExec: expr=[MAX(revenue0.total_revenue)@0 as __value]                                                                                                                                                                                                                           |
-|               |                 AggregateExec: mode=Final, gby=[], aggr=[MAX(revenue0.total_revenue)]                                                                                                                                                                                                                   |
-|               |                   CoalescePartitionsExec                                                                                                                                                                                                                                                                |
-|               |                     AggregateExec: mode=Partial, gby=[], aggr=[MAX(revenue0.total_revenue)]                                                                                                                                                                                                             |
-|               |                       ProjectionExec: expr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as total_revenue]                                                                                                                                                                          |
-|               |                         AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                                     |
-|               |                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                   |
-|               |                             RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 0 }], 2), input_partitions=2                                                                                                                                                                         |
-|               |                               AggregateExec: mode=Partial, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                                        |
-|               |                                 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                                    |
-|               |                                   ProjectionExec: expr=[l_suppkey@0 as l_suppkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]                                                                                                                                                     |
-|               |                                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                         |
-|               |                                       FilterExec: l_shipdate@3 >= 9496 AND l_shipdate@3 < 9587                                                                                                                                                                                                          |
-|               |                                         MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                    |
-|               |                                                                                                                                                                                                                                                                                                         |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type     | plan                                                                                                                                                                                                                   |
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan  | Sort: supplier.s_suppkey ASC NULLS LAST                                                                                                                                                                                |
+|               |   Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue                                                                                                        |
+|               |     Inner Join: revenue0.total_revenue = __scalar_sq_1.__value                                                                                                                                                         |
+|               |       Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue                                                                                                    |
+|               |         Inner Join: supplier.s_suppkey = revenue0.supplier_no                                                                                                                                                          |
+|               |           TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone]                                                                                                                                       |
+|               |           SubqueryAlias: revenue0                                                                                                                                                                                      |
+|               |             Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue                                                                             |
+|               |               Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]   |
+|               |                 Projection: lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                          |
+|               |                   Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")                                                                                                               |
+|               |                     TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]                                                                                                                |
+|               |       SubqueryAlias: __scalar_sq_1                                                                                                                                                                                     |
+|               |         Projection: MAX(revenue0.total_revenue) AS __value                                                                                                                                                             |
+|               |           Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]]                                                                                                                                                |
+|               |             SubqueryAlias: revenue0                                                                                                                                                                                    |
+|               |               Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue                                                                                                              |
+|               |                 Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
+|               |                   Projection: lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                        |
+|               |                     Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")                                                                                                             |
+|               |                       TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]                                                                                                              |
+| physical_plan | SortPreservingMergeExec: [s_suppkey@0 ASC NULLS LAST]                                                                                                                                                                  |
+|               |   SortExec: expr=[s_suppkey@0 ASC NULLS LAST]                                                                                                                                                                          |
+|               |     ProjectionExec: expr=[s_suppkey@0 as s_suppkey, s_name@1 as s_name, s_address@2 as s_address, s_phone@3 as s_phone, total_revenue@4 as total_revenue]                                                              |
+|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                      |
+|               |         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "total_revenue", index: 4 }, Column { name: "__value", index: 0 })]                                                                       |
+|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                  |
+|               |             RepartitionExec: partitioning=Hash([Column { name: "total_revenue", index: 4 }], 2), input_partitions=2                                                                                                    |
+|               |               RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2                                                                                                                                     |
+|               |                 ProjectionExec: expr=[s_suppkey@0 as s_suppkey, s_name@1 as s_name, s_address@2 as s_address, s_phone@3 as s_phone, total_revenue@5 as total_revenue]                                                  |
+|               |                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                          |
+|               |                     HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_suppkey", index: 0 }, Column { name: "supplier_no", index: 0 })]                                                           |
+|               |                       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                      |
+|               |                         RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2), input_partitions=0                                                                                            |
+|               |                           MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                 |
+|               |                       ProjectionExec: expr=[l_suppkey@0 as supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as total_revenue]                                                             |
+|               |                         AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                    |
+|               |                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                  |
+|               |                             RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 0 }], 2), input_partitions=2                                                                                        |
+|               |                               AggregateExec: mode=Partial, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                       |
+|               |                                 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                   |
+|               |                                   ProjectionExec: expr=[l_suppkey@0 as l_suppkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]                                                                    |
+|               |                                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                        |
+|               |                                       FilterExec: l_shipdate@3 >= 9496 AND l_shipdate@3 < 9587                                                                                                                         |
+|               |                                         MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                   |
+|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                  |
+|               |             RepartitionExec: partitioning=Hash([Column { name: "__value", index: 0 }], 2), input_partitions=1                                                                                                          |
+|               |               ProjectionExec: expr=[MAX(revenue0.total_revenue)@0 as __value]                                                                                                                                          |
+|               |                 AggregateExec: mode=Final, gby=[], aggr=[MAX(revenue0.total_revenue)]                                                                                                                                  |
+|               |                   CoalescePartitionsExec                                                                                                                                                                               |
+|               |                     AggregateExec: mode=Partial, gby=[], aggr=[MAX(revenue0.total_revenue)]                                                                                                                            |
+|               |                       ProjectionExec: expr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as total_revenue]                                                                                         |
+|               |                         AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                    |
+|               |                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                  |
+|               |                             RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 0 }], 2), input_partitions=2                                                                                        |
+|               |                               AggregateExec: mode=Partial, gby=[l_suppkey@0 as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                       |
+|               |                                 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                   |
+|               |                                   ProjectionExec: expr=[l_suppkey@0 as l_suppkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]                                                                    |
+|               |                                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                        |
+|               |                                       FilterExec: l_shipdate@3 >= 9496 AND l_shipdate@3 < 9587                                                                                                                         |
+|               |                                         MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                   |
+|               |                                                                                                                                                                                                                        |
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q19.txt b/benchmarks/expected-plans/q19.txt
index 2e3ccbde94..77c889d38f 100644
--- a/benchmarks/expected-plans/q19.txt
+++ b/benchmarks/expected-plans/q19.txt
@@ -2,7 +2,7 @@
 | plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       [...]
 +---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
 | logical_plan  | Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue                                                                                                                                                                                                                                                                                                                                                                                                      [...]
-|               |   Aggregate: groupBy=[[]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]                                                                                                                                                                                                                    [...]
+|               |   Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]                                                                                                                                                                                                                                                                                                     [...]
 |               |     Projection: lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                                                                                                                                                                                                                                                                                                              [...]
 |               |       Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand = Utf8("Brand#12") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND lineitem.l_quan [...]
 |               |         Projection: lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                                                                                                                                                                                                                                                                 [...]
diff --git a/benchmarks/expected-plans/q3.txt b/benchmarks/expected-plans/q3.txt
index 36ddcc8fdd..0152235a48 100644
--- a/benchmarks/expected-plans/q3.txt
+++ b/benchmarks/expected-plans/q3.txt
@@ -1,55 +1,55 @@
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type     | plan                                                                                                                                                                                                                                                                                                                                    |
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan  | Sort: revenue DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST                                                                                                                                                                                                                                                                       |
-|               |   Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue, orders.o_orderdate, orders.o_shippriority                                                                                                                                                                                 |
-|               |     Aggregate: groupBy=[[lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
-|               |       Projection: orders.o_orderdate, orders.o_shippriority, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                                                                                         |
-|               |         Inner Join: orders.o_orderkey = lineitem.l_orderkey                                                                                                                                                                                                                                                                             |
-|               |           Projection: orders.o_orderkey, orders.o_orderdate, orders.o_shippriority                                                                                                                                                                                                                                                      |
-|               |             Inner Join: customer.c_custkey = orders.o_custkey                                                                                                                                                                                                                                                                           |
-|               |               Projection: customer.c_custkey                                                                                                                                                                                                                                                                                            |
-|               |                 Filter: customer.c_mktsegment = Utf8("BUILDING")                                                                                                                                                                                                                                                                        |
-|               |                   TableScan: customer projection=[c_custkey, c_mktsegment]                                                                                                                                                                                                                                                              |
-|               |               Filter: orders.o_orderdate < Date32("9204")                                                                                                                                                                                                                                                                               |
-|               |                 TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate, o_shippriority]                                                                                                                                                                                                                                       |
-|               |           Projection: lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                                                                                                                                |
-|               |             Filter: lineitem.l_shipdate > Date32("9204")                                                                                                                                                                                                                                                                                |
-|               |               TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_shipdate]                                                                                                                                                                                                                                      |
-| physical_plan | SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]                                                                                                                                                                                                                                                                  |
-|               |   SortExec: expr=[revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]                                                                                                                                                                                                                                                                          |
-|               |     ProjectionExec: expr=[l_orderkey@0 as l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]                                                                                                                                    |
-|               |       AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                   |
-|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                     |
-|               |           RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, Column { name: "o_shippriority", index: 2 }], 2), input_partitions=2                                                                                                                                   |
-|               |             AggregateExec: mode=Partial, gby=[l_orderkey@2 as l_orderkey, o_orderdate@0 as o_orderdate, o_shippriority@1 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                      |
-|               |               ProjectionExec: expr=[o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority, l_orderkey@3 as l_orderkey, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount]                                                                                                                                     |
-|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                             |
-|               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 0 }, Column { name: "l_orderkey", index: 0 })]                                                                                                                                                                              |
-|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                         |
-|               |                       RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 0 }], 2), input_partitions=2                                                                                                                                                                                                              |
-|               |                         ProjectionExec: expr=[o_orderkey@1 as o_orderkey, o_orderdate@3 as o_orderdate, o_shippriority@4 as o_shippriority]                                                                                                                                                                                             |
-|               |                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                   |
-|               |                             HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })]                                                                                                                                                                      |
-|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                               |
-|               |                                 RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2), input_partitions=2                                                                                                                                                                                                     |
-|               |                                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                                                                  |
-|               |                                     ProjectionExec: expr=[c_custkey@0 as c_custkey]                                                                                                                                                                                                                                                     |
-|               |                                       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                       |
-|               |                                         FilterExec: c_mktsegment@1 = BUILDING                                                                                                                                                                                                                                                           |
-|               |                                           MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                  |
-|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                               |
-|               |                                 RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 2), input_partitions=2                                                                                                                                                                                                     |
-|               |                                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                                                                  |
-|               |                                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                         |
-|               |                                       FilterExec: o_orderdate@2 < 9204                                                                                                                                                                                                                                                                  |
-|               |                                         MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                    |
-|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                         |
-|               |                       RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=2                                                                                                                                                                                                              |
-|               |                         RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                                                                            |
-|               |                           ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]                                                                                                                                                                                           |
-|               |                             CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                 |
-|               |                               FilterExec: l_shipdate@3 > 9204                                                                                                                                                                                                                                                                           |
-|               |                                 MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                            |
-|               |                                                                                                                                                                                                                                                                                                                                         |
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type     | plan                                                                                                                                                                                                                                                   |
++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan  | Sort: revenue DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST                                                                                                                                                                                      |
+|               |   Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue, orders.o_orderdate, orders.o_shippriority                                                                                                |
+|               |     Aggregate: groupBy=[[lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority]], aggr=[[SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
+|               |       Projection: orders.o_orderdate, orders.o_shippriority, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                        |
+|               |         Inner Join: orders.o_orderkey = lineitem.l_orderkey                                                                                                                                                                                            |
+|               |           Projection: orders.o_orderkey, orders.o_orderdate, orders.o_shippriority                                                                                                                                                                     |
+|               |             Inner Join: customer.c_custkey = orders.o_custkey                                                                                                                                                                                          |
+|               |               Projection: customer.c_custkey                                                                                                                                                                                                           |
+|               |                 Filter: customer.c_mktsegment = Utf8("BUILDING")                                                                                                                                                                                       |
+|               |                   TableScan: customer projection=[c_custkey, c_mktsegment]                                                                                                                                                                             |
+|               |               Filter: orders.o_orderdate < Date32("9204")                                                                                                                                                                                              |
+|               |                 TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate, o_shippriority]                                                                                                                                                      |
+|               |           Projection: lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                                               |
+|               |             Filter: lineitem.l_shipdate > Date32("9204")                                                                                                                                                                                               |
+|               |               TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_shipdate]                                                                                                                                                     |
+| physical_plan | SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]                                                                                                                                                                                 |
+|               |   SortExec: expr=[revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]                                                                                                                                                                                         |
+|               |     ProjectionExec: expr=[l_orderkey@0 as l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]                                                   |
+|               |       AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                  |
+|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                    |
+|               |           RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, Column { name: "o_shippriority", index: 2 }], 2), input_partitions=2                                                  |
+|               |             AggregateExec: mode=Partial, gby=[l_orderkey@2 as l_orderkey, o_orderdate@0 as o_orderdate, o_shippriority@1 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                     |
+|               |               ProjectionExec: expr=[o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority, l_orderkey@3 as l_orderkey, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount]                                                    |
+|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                            |
+|               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 0 }, Column { name: "l_orderkey", index: 0 })]                                                                                             |
+|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                        |
+|               |                       RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 0 }], 2), input_partitions=2                                                                                                                             |
+|               |                         ProjectionExec: expr=[o_orderkey@1 as o_orderkey, o_orderdate@3 as o_orderdate, o_shippriority@4 as o_shippriority]                                                                                                            |
+|               |                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                  |
+|               |                             HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })]                                                                                     |
+|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                              |
+|               |                                 RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2), input_partitions=2                                                                                                                    |
+|               |                                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                 |
+|               |                                     ProjectionExec: expr=[c_custkey@0 as c_custkey]                                                                                                                                                                    |
+|               |                                       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                      |
+|               |                                         FilterExec: c_mktsegment@1 = BUILDING                                                                                                                                                                          |
+|               |                                           MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                 |
+|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                              |
+|               |                                 RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 2), input_partitions=2                                                                                                                    |
+|               |                                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                 |
+|               |                                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                        |
+|               |                                       FilterExec: o_orderdate@2 < 9204                                                                                                                                                                                 |
+|               |                                         MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                   |
+|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                        |
+|               |                       RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=2                                                                                                                             |
+|               |                         RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                           |
+|               |                           ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]                                                                                                          |
+|               |                             CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                |
+|               |                               FilterExec: l_shipdate@3 > 9204                                                                                                                                                                                          |
+|               |                                 MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                           |
+|               |                                                                                                                                                                                                                                                        |
++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q5.txt b/benchmarks/expected-plans/q5.txt
index de6854eecd..9388a385d9 100644
--- a/benchmarks/expected-plans/q5.txt
+++ b/benchmarks/expected-plans/q5.txt
@@ -1,85 +1,85 @@
-+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type     | plan                                                                                                                                                                                                                                                                                   |
-+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan  | Sort: revenue DESC NULLS FIRST                                                                                                                                                                                                                                                         |
-|               |   Projection: nation.n_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue                                                                                                                                                                                 |
-|               |     Aggregate: groupBy=[[nation.n_name]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] |
-|               |       Projection: lineitem.l_extendedprice, lineitem.l_discount, nation.n_name                                                                                                                                                                                                         |
-|               |         Inner Join: nation.n_regionkey = region.r_regionkey                                                                                                                                                                                                                            |
-|               |           Projection: lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, nation.n_regionkey                                                                                                                                                                                 |
-|               |             Inner Join: supplier.s_nationkey = nation.n_nationkey                                                                                                                                                                                                                      |
-|               |               Projection: lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey                                                                                                                                                                                          |
-|               |                 Inner Join: lineitem.l_suppkey = supplier.s_suppkey, customer.c_nationkey = supplier.s_nationkey                                                                                                                                                                       |
-|               |                   Projection: customer.c_nationkey, lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                                                  |
-|               |                     Inner Join: orders.o_orderkey = lineitem.l_orderkey                                                                                                                                                                                                                |
-|               |                       Projection: customer.c_nationkey, orders.o_orderkey                                                                                                                                                                                                              |
-|               |                         Inner Join: customer.c_custkey = orders.o_custkey                                                                                                                                                                                                              |
-|               |                           TableScan: customer projection=[c_custkey, c_nationkey]                                                                                                                                                                                                      |
-|               |                           Projection: orders.o_orderkey, orders.o_custkey                                                                                                                                                                                                              |
-|               |                             Filter: orders.o_orderdate >= Date32("8766") AND orders.o_orderdate < Date32("9131")                                                                                                                                                                       |
-|               |                               TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate]                                                                                                                                                                                        |
-|               |                       TableScan: lineitem projection=[l_orderkey, l_suppkey, l_extendedprice, l_discount]                                                                                                                                                                              |
-|               |                   TableScan: supplier projection=[s_suppkey, s_nationkey]                                                                                                                                                                                                              |
-|               |               TableScan: nation projection=[n_nationkey, n_name, n_regionkey]                                                                                                                                                                                                          |
-|               |           Projection: region.r_regionkey                                                                                                                                                                                                                                               |
-|               |             Filter: region.r_name = Utf8("ASIA")                                                                                                                                                                                                                                       |
-|               |               TableScan: region projection=[r_regionkey, r_name]                                                                                                                                                                                                                       |
-| physical_plan | SortPreservingMergeExec: [revenue@1 DESC]                                                                                                                                                                                                                                              |
-|               |   SortExec: expr=[revenue@1 DESC]                                                                                                                                                                                                                                                      |
-|               |     ProjectionExec: expr=[n_name@0 as n_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as revenue]                                                                                                                                                             |
-|               |       AggregateExec: mode=FinalPartitioned, gby=[n_name@0 as n_name], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                                            |
-|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                    |
-|               |           RepartitionExec: partitioning=Hash([Column { name: "n_name", index: 0 }], 2), input_partitions=2                                                                                                                                                                             |
-|               |             AggregateExec: mode=Partial, gby=[n_name@2 as n_name], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                                               |
-|               |               ProjectionExec: expr=[l_extendedprice@0 as l_extendedprice, l_discount@1 as l_discount, n_name@2 as n_name]                                                                                                                                                              |
-|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                            |
-|               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "n_regionkey", index: 3 }, Column { name: "r_regionkey", index: 0 })]                                                                                                                           |
-|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                        |
-|               |                       RepartitionExec: partitioning=Hash([Column { name: "n_regionkey", index: 3 }], 2), input_partitions=2                                                                                                                                                            |
-|               |                         ProjectionExec: expr=[l_extendedprice@0 as l_extendedprice, l_discount@1 as l_discount, n_name@4 as n_name, n_regionkey@5 as n_regionkey]                                                                                                                      |
-|               |                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                  |
-|               |                             HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_nationkey", index: 2 }, Column { name: "n_nationkey", index: 0 })]                                                                                                                 |
-|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                              |
-|               |                                 RepartitionExec: partitioning=Hash([Column { name: "s_nationkey", index: 2 }], 2), input_partitions=2                                                                                                                                                  |
-|               |                                   ProjectionExec: expr=[l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount, s_nationkey@5 as s_nationkey]                                                                                                                                |
-|               |                                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                        |
-|               |                                       HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_suppkey", index: 1 }, Column { name: "s_suppkey", index: 0 }), (Column { name: "c_nationkey", index: 0 }, Column { name: "s_nationkey", index: 1 })]                     |
-|               |                                         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                    |
-|               |                                           RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 1 }, Column { name: "c_nationkey", index: 0 }], 2), input_partitions=2                                                                                                |
-|               |                                             ProjectionExec: expr=[c_nationkey@0 as c_nationkey, l_suppkey@3 as l_suppkey, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount]                                                                                            |
-|               |                                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                              |
-|               |                                                 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 1 }, Column { name: "l_orderkey", index: 0 })]                                                                                               |
-|               |                                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                          |
-|               |                                                     RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 1 }], 2), input_partitions=2                                                                                                                               |
-|               |                                                       RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2                                                                                                                                                             |
-|               |                                                         ProjectionExec: expr=[c_nationkey@1 as c_nationkey, o_orderkey@2 as o_orderkey]                                                                                                                                                |
-|               |                                                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                  |
-|               |                                                             HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })]                                                                                     |
-|               |                                                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                              |
-|               |                                                                 RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2), input_partitions=0                                                                                                                    |
-|               |                                                                   MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                         |
-|               |                                                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                              |
-|               |                                                                 RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 2), input_partitions=2                                                                                                                    |
-|               |                                                                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                 |
-|               |                                                                     ProjectionExec: expr=[o_orderkey@0 as o_orderkey, o_custkey@1 as o_custkey]                                                                                                                                        |
-|               |                                                                       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                      |
-|               |                                                                         FilterExec: o_orderdate@2 >= 8766 AND o_orderdate@2 < 9131                                                                                                                                                     |
-|               |                                                                           MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                 |
-|               |                                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                          |
-|               |                                                     RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=0                                                                                                                               |
-|               |                                                       MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                     |
-|               |                                         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                    |
-|               |                                           RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }, Column { name: "s_nationkey", index: 1 }], 2), input_partitions=0                                                                                                |
-|               |                                             MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                               |
-|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                              |
-|               |                                 RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=0                                                                                                                                                  |
-|               |                                   MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                         |
-|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                        |
-|               |                       RepartitionExec: partitioning=Hash([Column { name: "r_regionkey", index: 0 }], 2), input_partitions=2                                                                                                                                                            |
-|               |                         RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                           |
-|               |                           ProjectionExec: expr=[r_regionkey@0 as r_regionkey]                                                                                                                                                                                                          |
-|               |                             CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                |
-|               |                               FilterExec: r_name@1 = ASIA                                                                                                                                                                                                                              |
-|               |                                 MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                           |
-|               |                                                                                                                                                                                                                                                                                        |
-+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type     | plan                                                                                                                                                                                                                                                               |
++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan  | Sort: revenue DESC NULLS FIRST                                                                                                                                                                                                                                     |
+|               |   Projection: nation.n_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue                                                                                                                                                             |
+|               |     Aggregate: groupBy=[[nation.n_name]], aggr=[[SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]                                                              |
+|               |       Projection: lineitem.l_extendedprice, lineitem.l_discount, nation.n_name                                                                                                                                                                                     |
+|               |         Inner Join: nation.n_regionkey = region.r_regionkey                                                                                                                                                                                                        |
+|               |           Projection: lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, nation.n_regionkey                                                                                                                                                             |
+|               |             Inner Join: supplier.s_nationkey = nation.n_nationkey                                                                                                                                                                                                  |
+|               |               Projection: lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey                                                                                                                                                                      |
+|               |                 Inner Join: lineitem.l_suppkey = supplier.s_suppkey, customer.c_nationkey = supplier.s_nationkey                                                                                                                                                   |
+|               |                   Projection: customer.c_nationkey, lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                              |
+|               |                     Inner Join: orders.o_orderkey = lineitem.l_orderkey                                                                                                                                                                                            |
+|               |                       Projection: customer.c_nationkey, orders.o_orderkey                                                                                                                                                                                          |
+|               |                         Inner Join: customer.c_custkey = orders.o_custkey                                                                                                                                                                                          |
+|               |                           TableScan: customer projection=[c_custkey, c_nationkey]                                                                                                                                                                                  |
+|               |                           Projection: orders.o_orderkey, orders.o_custkey                                                                                                                                                                                          |
+|               |                             Filter: orders.o_orderdate >= Date32("8766") AND orders.o_orderdate < Date32("9131")                                                                                                                                                   |
+|               |                               TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate]                                                                                                                                                                    |
+|               |                       TableScan: lineitem projection=[l_orderkey, l_suppkey, l_extendedprice, l_discount]                                                                                                                                                          |
+|               |                   TableScan: supplier projection=[s_suppkey, s_nationkey]                                                                                                                                                                                          |
+|               |               TableScan: nation projection=[n_nationkey, n_name, n_regionkey]                                                                                                                                                                                      |
+|               |           Projection: region.r_regionkey                                                                                                                                                                                                                           |
+|               |             Filter: region.r_name = Utf8("ASIA")                                                                                                                                                                                                                   |
+|               |               TableScan: region projection=[r_regionkey, r_name]                                                                                                                                                                                                   |
+| physical_plan | SortPreservingMergeExec: [revenue@1 DESC]                                                                                                                                                                                                                          |
+|               |   SortExec: expr=[revenue@1 DESC]                                                                                                                                                                                                                                  |
+|               |     ProjectionExec: expr=[n_name@0 as n_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as revenue]                                                                                                                                         |
+|               |       AggregateExec: mode=FinalPartitioned, gby=[n_name@0 as n_name], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                        |
+|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                |
+|               |           RepartitionExec: partitioning=Hash([Column { name: "n_name", index: 0 }], 2), input_partitions=2                                                                                                                                                         |
+|               |             AggregateExec: mode=Partial, gby=[n_name@2 as n_name], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]                                                                                                                           |
+|               |               ProjectionExec: expr=[l_extendedprice@0 as l_extendedprice, l_discount@1 as l_discount, n_name@2 as n_name]                                                                                                                                          |
+|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                        |
+|               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "n_regionkey", index: 3 }, Column { name: "r_regionkey", index: 0 })]                                                                                                       |
+|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                    |
+|               |                       RepartitionExec: partitioning=Hash([Column { name: "n_regionkey", index: 3 }], 2), input_partitions=2                                                                                                                                        |
+|               |                         ProjectionExec: expr=[l_extendedprice@0 as l_extendedprice, l_discount@1 as l_discount, n_name@4 as n_name, n_regionkey@5 as n_regionkey]                                                                                                  |
+|               |                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                              |
+|               |                             HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_nationkey", index: 2 }, Column { name: "n_nationkey", index: 0 })]                                                                                             |
+|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                          |
+|               |                                 RepartitionExec: partitioning=Hash([Column { name: "s_nationkey", index: 2 }], 2), input_partitions=2                                                                                                                              |
+|               |                                   ProjectionExec: expr=[l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount, s_nationkey@5 as s_nationkey]                                                                                                            |
+|               |                                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                    |
+|               |                                       HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_suppkey", index: 1 }, Column { name: "s_suppkey", index: 0 }), (Column { name: "c_nationkey", index: 0 }, Column { name: "s_nationkey", index: 1 })] |
+|               |                                         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                |
+|               |                                           RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 1 }, Column { name: "c_nationkey", index: 0 }], 2), input_partitions=2                                                                            |
+|               |                                             ProjectionExec: expr=[c_nationkey@0 as c_nationkey, l_suppkey@3 as l_suppkey, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount]                                                                        |
+|               |                                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                          |
+|               |                                                 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 1 }, Column { name: "l_orderkey", index: 0 })]                                                                           |
+|               |                                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                      |
+|               |                                                     RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 1 }], 2), input_partitions=2                                                                                                           |
+|               |                                                       RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2                                                                                                                                         |
+|               |                                                         ProjectionExec: expr=[c_nationkey@1 as c_nationkey, o_orderkey@2 as o_orderkey]                                                                                                                            |
+|               |                                                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                              |
+|               |                                                             HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })]                                                                 |
+|               |                                                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                          |
+|               |                                                                 RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2), input_partitions=0                                                                                                |
+|               |                                                                   MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                     |
+|               |                                                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                          |
+|               |                                                                 RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 2), input_partitions=2                                                                                                |
+|               |                                                                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                             |
+|               |                                                                     ProjectionExec: expr=[o_orderkey@0 as o_orderkey, o_custkey@1 as o_custkey]                                                                                                                    |
+|               |                                                                       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                  |
+|               |                                                                         FilterExec: o_orderdate@2 >= 8766 AND o_orderdate@2 < 9131                                                                                                                                 |
+|               |                                                                           MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                             |
+|               |                                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                      |
+|               |                                                     RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=0                                                                                                           |
+|               |                                                       MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                 |
+|               |                                         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                |
+|               |                                           RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }, Column { name: "s_nationkey", index: 1 }], 2), input_partitions=0                                                                            |
+|               |                                             MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                           |
+|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                          |
+|               |                                 RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=0                                                                                                                              |
+|               |                                   MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                     |
+|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                    |
+|               |                       RepartitionExec: partitioning=Hash([Column { name: "r_regionkey", index: 0 }], 2), input_partitions=2                                                                                                                                        |
+|               |                         RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                       |
+|               |                           ProjectionExec: expr=[r_regionkey@0 as r_regionkey]                                                                                                                                                                                      |
+|               |                             CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                            |
+|               |                               FilterExec: r_name@1 = ASIA                                                                                                                                                                                                          |
+|               |                                 MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                       |
+|               |                                                                                                                                                                                                                                                                    |
++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q7.txt b/benchmarks/expected-plans/q7.txt
index 619ac28d0a..572ee28ff0 100644
--- a/benchmarks/expected-plans/q7.txt
+++ b/benchmarks/expected-plans/q7.txt
@@ -5,7 +5,7 @@
 |               |   Projection: shipping.supp_nation, shipping.cust_nation, shipping.l_year, SUM(shipping.volume) AS revenue                                                                                                                                                                                                                                                                                                                                                                                 [...]
 |               |     Aggregate: groupBy=[[shipping.supp_nation, shipping.cust_nation, shipping.l_year]], aggr=[[SUM(shipping.volume)]]                                                                                                                                                                                                                                                                                                                                                                      [...]
 |               |       SubqueryAlias: shipping                                                                                                                                                                                                                                                                                                                                                                                                                                                              [...]
-|               |         Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, datepart(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS volume                                                                                                                                                                                      [...]
+|               |         Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, datepart(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS volume                                                                                                                                                                                                                                                                       [...]
 |               |           Inner Join: customer.c_nationkey = n2.n_nationkey Filter: n1.n_name = Utf8("FRANCE") AND n2.n_name = Utf8("GERMANY") OR n1.n_name = Utf8("GERMANY") AND n2.n_name = Utf8("FRANCE")                                                                                                                                                                                                                                                                                               [...]
 |               |             Projection: lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, customer.c_nationkey, n1.n_name                                                                                                                                                                                                                                                                                                                                                                [...]
 |               |               Inner Join: supplier.s_nationkey = n1.n_nationkey                                                                                                                                                                                                                                                                                                                                                                                                                            [...]
@@ -33,7 +33,7 @@
 |               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                        [...]
 |               |           RepartitionExec: partitioning=Hash([Column { name: "supp_nation", index: 0 }, Column { name: "cust_nation", index: 1 }, Column { name: "l_year", index: 2 }], 2), input_partitions=2                                                                                                                                                                                                                                                                                             [...]
 |               |             AggregateExec: mode=Partial, gby=[supp_nation@0 as supp_nation, cust_nation@1 as cust_nation, l_year@2 as l_year], aggr=[SUM(shipping.volume)]                                                                                                                                                                                                                                                                                                                                 [...]
-|               |               ProjectionExec: expr=[n_name@4 as supp_nation, n_name@6 as cust_nation, datepart(YEAR, l_shipdate@2) as l_year, CAST(l_extendedprice@0 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@1 AS Decimal128(23, 2)) AS Decimal128(38, 4)) as volume]                                                                                                                                                                                                                [...]
+|               |               ProjectionExec: expr=[n_name@4 as supp_nation, n_name@6 as cust_nation, datepart(YEAR, l_shipdate@2) as l_year, l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as volume]                                                                                                                                                                                                                                                                                                 [...]
 |               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                [...]
 |               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_nationkey", index: 3 }, Column { name: "n_nationkey", index: 0 })], filter=BinaryExpr { left: BinaryExpr { left: BinaryExpr { left: Column { name: "n_name", index: 0 }, op: Eq, right: Literal { value: Utf8("FRANCE") } }, op: And, right: BinaryExpr { left: Column { name: "n_name", index: 1 }, op: Eq, right: Literal { value: Utf8("GERMANY") } } }, op: Or, right: BinaryExpr { left: Bi [...]
 |               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                            [...]
diff --git a/benchmarks/expected-plans/q8.txt b/benchmarks/expected-plans/q8.txt
index d196bcc214..b356b9ded3 100644
--- a/benchmarks/expected-plans/q8.txt
+++ b/benchmarks/expected-plans/q8.txt
@@ -2,10 +2,10 @@
 | plan_type     | plan                                                                                                                                                                                                                                                                                          |
 +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
 | logical_plan  | Sort: all_nations.o_year ASC NULLS LAST                                                                                                                                                                                                                                                       |
-|               |   Projection: all_nations.o_year, SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END) / SUM(all_nations.volume) AS mkt_share                                                                                                                         |
+|               |   Projection: all_nations.o_year, CAST(CAST(SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END) AS Decimal128(12, 2)) / CAST(SUM(all_nations.volume) AS Decimal128(12, 2)) AS Decimal128(15, 2)) AS mkt_share                                        |
 |               |     Aggregate: groupBy=[[all_nations.o_year]], aggr=[[SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Decimal128(Some(0),38,4) END) AS SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), SUM(all_nations.volume)]] |
 |               |       SubqueryAlias: all_nations                                                                                                                                                                                                                                                              |
-|               |         Projection: datepart(Utf8("YEAR"), orders.o_orderdate) AS o_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS volume, n2.n_name AS nation                         |
+|               |         Projection: datepart(Utf8("YEAR"), orders.o_orderdate) AS o_year, lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS volume, n2.n_name AS nation                                                                                                          |
 |               |           Inner Join: n1.n_regionkey = region.r_regionkey                                                                                                                                                                                                                                     |
 |               |             Projection: lineitem.l_extendedprice, lineitem.l_discount, orders.o_orderdate, n1.n_regionkey, n2.n_name                                                                                                                                                                          |
 |               |               Inner Join: supplier.s_nationkey = n2.n_nationkey                                                                                                                                                                                                                               |
@@ -36,12 +36,12 @@
 |               |                 TableScan: region projection=[r_regionkey, r_name]                                                                                                                                                                                                                            |
 | physical_plan | SortPreservingMergeExec: [o_year@0 ASC NULLS LAST]                                                                                                                                                                                                                                            |
 |               |   SortExec: expr=[o_year@0 ASC NULLS LAST]                                                                                                                                                                                                                                                    |
-|               |     ProjectionExec: expr=[o_year@0 as o_year, SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END)@1 / SUM(all_nations.volume)@2 as mkt_share]                                                                                                        |
+|               |     ProjectionExec: expr=[o_year@0 as o_year, CAST(CAST(SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END)@1 AS Decimal128(12, 2)) / CAST(SUM(all_nations.volume)@2 AS Decimal128(12, 2)) AS Decimal128(15, 2)) as mkt_share]                       |
 |               |       AggregateExec: mode=FinalPartitioned, gby=[o_year@0 as o_year], aggr=[SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), SUM(all_nations.volume)]                                                                                            |
 |               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                           |
 |               |           RepartitionExec: partitioning=Hash([Column { name: "o_year", index: 0 }], 2), input_partitions=2                                                                                                                                                                                    |
 |               |             AggregateExec: mode=Partial, gby=[o_year@0 as o_year], aggr=[SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), SUM(all_nations.volume)]                                                                                               |
-|               |               ProjectionExec: expr=[datepart(YEAR, o_orderdate@2) as o_year, CAST(l_extendedprice@0 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@1 AS Decimal128(23, 2)) AS Decimal128(38, 4)) as volume, n_name@4 as nation]                                                |
+|               |               ProjectionExec: expr=[datepart(YEAR, o_orderdate@2) as o_year, l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as volume, n_name@4 as nation]                                                                                                                                 |
 |               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                   |
 |               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "n_regionkey", index: 3 }, Column { name: "r_regionkey", index: 0 })]                                                                                                                                  |
 |               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                               |
diff --git a/benchmarks/expected-plans/q9.txt b/benchmarks/expected-plans/q9.txt
index 414ecf804a..24a5e7f16e 100644
--- a/benchmarks/expected-plans/q9.txt
+++ b/benchmarks/expected-plans/q9.txt
@@ -1,79 +1,79 @@
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                |
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan  | Sort: profit.nation ASC NULLS LAST, profit.o_year DESC NULLS FIRST                                                                                                                                                                                                                                                                                  |
-|               |   Projection: profit.nation, profit.o_year, SUM(profit.amount) AS sum_profit                                                                                                                                                                                                                                                                        |
-|               |     Aggregate: groupBy=[[profit.nation, profit.o_year]], aggr=[[SUM(profit.amount)]]                                                                                                                                                                                                                                                                |
-|               |       SubqueryAlias: profit                                                                                                                                                                                                                                                                                                                         |
-|               |         Projection: nation.n_name AS nation, datepart(Utf8("YEAR"), orders.o_orderdate) AS o_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) - CAST(partsupp.ps_supplycost * lineitem.l_quantity AS Decimal128(38, 4)) AS amount |
-|               |           Inner Join: supplier.s_nationkey = nation.n_nationkey                                                                                                                                                                                                                                                                                     |
-|               |             Projection: lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey, partsupp.ps_supplycost, orders.o_orderdate                                                                                                                                                                                        |
-|               |               Inner Join: lineitem.l_orderkey = orders.o_orderkey                                                                                                                                                                                                                                                                                   |
-|               |                 Projection: lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey, partsupp.ps_supplycost                                                                                                                                                                                   |
-|               |                   Inner Join: lineitem.l_suppkey = partsupp.ps_suppkey, lineitem.l_partkey = partsupp.ps_partkey                                                                                                                                                                                                                                    |
-|               |                     Projection: lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey                                                                                                                                                               |
-|               |                       Inner Join: lineitem.l_suppkey = supplier.s_suppkey                                                                                                                                                                                                                                                                           |
-|               |                         Projection: lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount                                                                                                                                                                                 |
-|               |                           Inner Join: part.p_partkey = lineitem.l_partkey                                                                                                                                                                                                                                                                           |
-|               |                             Projection: part.p_partkey                                                                                                                                                                                                                                                                                              |
-|               |                               Filter: part.p_name LIKE Utf8("%green%")                                                                                                                                                                                                                                                                              |
-|               |                                 TableScan: part projection=[p_partkey, p_name]                                                                                                                                                                                                                                                                      |
-|               |                             TableScan: lineitem projection=[l_orderkey, l_partkey, l_suppkey, l_quantity, l_extendedprice, l_discount]                                                                                                                                                                                                              |
-|               |                         TableScan: supplier projection=[s_suppkey, s_nationkey]                                                                                                                                                                                                                                                                     |
-|               |                     TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]                                                                                                                                                                                                                                                          |
-|               |                 TableScan: orders projection=[o_orderkey, o_orderdate]                                                                                                                                                                                                                                                                              |
-|               |             TableScan: nation projection=[n_nationkey, n_name]                                                                                                                                                                                                                                                                                      |
-| physical_plan | SortPreservingMergeExec: [nation@0 ASC NULLS LAST,o_year@1 DESC]                                                                                                                                                                                                                                                                                    |
-|               |   SortExec: expr=[nation@0 ASC NULLS LAST,o_year@1 DESC]                                                                                                                                                                                                                                                                                            |
-|               |     ProjectionExec: expr=[nation@0 as nation, o_year@1 as o_year, SUM(profit.amount)@2 as sum_profit]                                                                                                                                                                                                                                               |
-|               |       AggregateExec: mode=FinalPartitioned, gby=[nation@0 as nation, o_year@1 as o_year], aggr=[SUM(profit.amount)]                                                                                                                                                                                                                                 |
-|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                 |
-|               |           RepartitionExec: partitioning=Hash([Column { name: "nation", index: 0 }, Column { name: "o_year", index: 1 }], 2), input_partitions=2                                                                                                                                                                                                     |
-|               |             AggregateExec: mode=Partial, gby=[nation@0 as nation, o_year@1 as o_year], aggr=[SUM(profit.amount)]                                                                                                                                                                                                                                    |
-|               |               ProjectionExec: expr=[n_name@7 as nation, datepart(YEAR, o_orderdate@5) as o_year, CAST(l_extendedprice@1 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@2 AS Decimal128(23, 2)) AS Decimal128(38, 4)) - CAST(ps_supplycost@4 * l_quantity@0 AS Decimal128(38, 4)) as amount]                                          |
-|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                         |
-|               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_nationkey", index: 3 }, Column { name: "n_nationkey", index: 0 })]                                                                                                                                                                                        |
-|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                     |
-|               |                       RepartitionExec: partitioning=Hash([Column { name: "s_nationkey", index: 3 }], 2), input_partitions=2                                                                                                                                                                                                                         |
-|               |                         ProjectionExec: expr=[l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount, s_nationkey@4 as s_nationkey, ps_supplycost@5 as ps_supplycost, o_orderdate@7 as o_orderdate]                                                                                                           |
-|               |                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                               |
-|               |                             HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })]                                                                                                                                                                                |
-|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                           |
-|               |                                 RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=2                                                                                                                                                                                                                |
-|               |                                   ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_quantity@3 as l_quantity, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount, s_nationkey@6 as s_nationkey, ps_supplycost@9 as ps_supplycost]                                                                                                   |
-|               |                                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                     |
-|               |                                       HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_suppkey", index: 2 }, Column { name: "ps_suppkey", index: 1 }), (Column { name: "l_partkey", index: 1 }, Column { name: "ps_partkey", index: 0 })]                                                                                    |
-|               |                                         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                 |
-|               |                                           RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 2 }, Column { name: "l_partkey", index: 1 }], 2), input_partitions=2                                                                                                                                                               |
-|               |                                             ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_partkey@1 as l_partkey, l_suppkey@2 as l_suppkey, l_quantity@3 as l_quantity, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount, s_nationkey@7 as s_nationkey]                                                                       |
-|               |                                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                           |
-|               |                                                 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_suppkey", index: 2 }, Column { name: "s_suppkey", index: 0 })]                                                                                                                                                              |
-|               |                                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                       |
-|               |                                                     RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 2 }], 2), input_partitions=2                                                                                                                                                                                             |
-|               |                                                       ProjectionExec: expr=[l_orderkey@1 as l_orderkey, l_partkey@2 as l_partkey, l_suppkey@3 as l_suppkey, l_quantity@4 as l_quantity, l_extendedprice@5 as l_extendedprice, l_discount@6 as l_discount]                                                                                           |
-|               |                                                         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                 |
-|               |                                                           HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "p_partkey", index: 0 }, Column { name: "l_partkey", index: 1 })]                                                                                                                                                    |
-|               |                                                             CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                             |
-|               |                                                               RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=2                                                                                                                                                                                   |
-|               |                                                                 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                                                                                                |
-|               |                                                                   ProjectionExec: expr=[p_partkey@0 as p_partkey]                                                                                                                                                                                                                                   |
-|               |                                                                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                     |
-|               |                                                                       FilterExec: p_name@1 LIKE %green%                                                                                                                                                                                                                                             |
-|               |                                                                         MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                |
-|               |                                                             CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                             |
-|               |                                                               RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 1 }], 2), input_partitions=0                                                                                                                                                                                   |
-|               |                                                                 MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                        |
-|               |                                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                       |
-|               |                                                     RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2), input_partitions=0                                                                                                                                                                                             |
-|               |                                                       MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                  |
-|               |                                         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                 |
-|               |                                           RepartitionExec: partitioning=Hash([Column { name: "ps_suppkey", index: 1 }, Column { name: "ps_partkey", index: 0 }], 2), input_partitions=0                                                                                                                                                             |
-|               |                                             MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                            |
-|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                           |
-|               |                                 RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 0 }], 2), input_partitions=0                                                                                                                                                                                                                |
-|               |                                   MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                                      |
-|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                     |
-|               |                       RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=0                                                                                                                                                                                                                         |
-|               |                         MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                                                                                                |
-|               |                                                                                                                                                                                                                                                                                                                                                     |
-+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type     | plan                                                                                                                                                                                                                                                                          |
++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan  | Sort: profit.nation ASC NULLS LAST, profit.o_year DESC NULLS FIRST                                                                                                                                                                                                            |
+|               |   Projection: profit.nation, profit.o_year, SUM(profit.amount) AS sum_profit                                                                                                                                                                                                  |
+|               |     Aggregate: groupBy=[[profit.nation, profit.o_year]], aggr=[[SUM(profit.amount)]]                                                                                                                                                                                          |
+|               |       SubqueryAlias: profit                                                                                                                                                                                                                                                   |
+|               |         Projection: nation.n_name AS nation, datepart(Utf8("YEAR"), orders.o_orderdate) AS o_year, lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) - partsupp.ps_supplycost * lineitem.l_quantity AS amount                                       |
+|               |           Inner Join: supplier.s_nationkey = nation.n_nationkey                                                                                                                                                                                                               |
+|               |             Projection: lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey, partsupp.ps_supplycost, orders.o_orderdate                                                                                                                  |
+|               |               Inner Join: lineitem.l_orderkey = orders.o_orderkey                                                                                                                                                                                                             |
+|               |                 Projection: lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey, partsupp.ps_supplycost                                                                                                             |
+|               |                   Inner Join: lineitem.l_suppkey = partsupp.ps_suppkey, lineitem.l_partkey = partsupp.ps_partkey                                                                                                                                                              |
+|               |                     Projection: lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey                                                                                         |
+|               |                       Inner Join: lineitem.l_suppkey = supplier.s_suppkey                                                                                                                                                                                                     |
+|               |                         Projection: lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount                                                                                                           |
+|               |                           Inner Join: part.p_partkey = lineitem.l_partkey                                                                                                                                                                                                     |
+|               |                             Projection: part.p_partkey                                                                                                                                                                                                                        |
+|               |                               Filter: part.p_name LIKE Utf8("%green%")                                                                                                                                                                                                        |
+|               |                                 TableScan: part projection=[p_partkey, p_name]                                                                                                                                                                                                |
+|               |                             TableScan: lineitem projection=[l_orderkey, l_partkey, l_suppkey, l_quantity, l_extendedprice, l_discount]                                                                                                                                        |
+|               |                         TableScan: supplier projection=[s_suppkey, s_nationkey]                                                                                                                                                                                               |
+|               |                     TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]                                                                                                                                                                                    |
+|               |                 TableScan: orders projection=[o_orderkey, o_orderdate]                                                                                                                                                                                                        |
+|               |             TableScan: nation projection=[n_nationkey, n_name]                                                                                                                                                                                                                |
+| physical_plan | SortPreservingMergeExec: [nation@0 ASC NULLS LAST,o_year@1 DESC]                                                                                                                                                                                                              |
+|               |   SortExec: expr=[nation@0 ASC NULLS LAST,o_year@1 DESC]                                                                                                                                                                                                                      |
+|               |     ProjectionExec: expr=[nation@0 as nation, o_year@1 as o_year, SUM(profit.amount)@2 as sum_profit]                                                                                                                                                                         |
+|               |       AggregateExec: mode=FinalPartitioned, gby=[nation@0 as nation, o_year@1 as o_year], aggr=[SUM(profit.amount)]                                                                                                                                                           |
+|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                           |
+|               |           RepartitionExec: partitioning=Hash([Column { name: "nation", index: 0 }, Column { name: "o_year", index: 1 }], 2), input_partitions=2                                                                                                                               |
+|               |             AggregateExec: mode=Partial, gby=[nation@0 as nation, o_year@1 as o_year], aggr=[SUM(profit.amount)]                                                                                                                                                              |
+|               |               ProjectionExec: expr=[n_name@7 as nation, datepart(YEAR, o_orderdate@5) as o_year, l_extendedprice@1 * (Some(1),20,0 - l_discount@2) - ps_supplycost@4 * l_quantity@0 as amount]                                                                                |
+|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                   |
+|               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_nationkey", index: 3 }, Column { name: "n_nationkey", index: 0 })]                                                                                                                  |
+|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                               |
+|               |                       RepartitionExec: partitioning=Hash([Column { name: "s_nationkey", index: 3 }], 2), input_partitions=2                                                                                                                                                   |
+|               |                         ProjectionExec: expr=[l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount, s_nationkey@4 as s_nationkey, ps_supplycost@5 as ps_supplycost, o_orderdate@7 as o_orderdate]                                     |
+|               |                           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                         |
+|               |                             HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })]                                                                                                          |
+|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                     |
+|               |                                 RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2), input_partitions=2                                                                                                                                          |
+|               |                                   ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_quantity@3 as l_quantity, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount, s_nationkey@6 as s_nationkey, ps_supplycost@9 as ps_supplycost]                             |
+|               |                                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                               |
+|               |                                       HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_suppkey", index: 2 }, Column { name: "ps_suppkey", index: 1 }), (Column { name: "l_partkey", index: 1 }, Column { name: "ps_partkey", index: 0 })]              |
+|               |                                         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                           |
+|               |                                           RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 2 }, Column { name: "l_partkey", index: 1 }], 2), input_partitions=2                                                                                         |
+|               |                                             ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_partkey@1 as l_partkey, l_suppkey@2 as l_suppkey, l_quantity@3 as l_quantity, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount, s_nationkey@7 as s_nationkey] |
+|               |                                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                     |
+|               |                                                 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_suppkey", index: 2 }, Column { name: "s_suppkey", index: 0 })]                                                                                        |
+|               |                                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                 |
+|               |                                                     RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 2 }], 2), input_partitions=2                                                                                                                       |
+|               |                                                       ProjectionExec: expr=[l_orderkey@1 as l_orderkey, l_partkey@2 as l_partkey, l_suppkey@3 as l_suppkey, l_quantity@4 as l_quantity, l_extendedprice@5 as l_extendedprice, l_discount@6 as l_discount]                     |
+|               |                                                         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                           |
+|               |                                                           HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "p_partkey", index: 0 }, Column { name: "l_partkey", index: 1 })]                                                                              |
+|               |                                                             CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                       |
+|               |                                                               RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=2                                                                                                             |
+|               |                                                                 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0                                                                                                                                          |
+|               |                                                                   ProjectionExec: expr=[p_partkey@0 as p_partkey]                                                                                                                                                             |
+|               |                                                                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                               |
+|               |                                                                       FilterExec: p_name@1 LIKE %green%                                                                                                                                                                       |
+|               |                                                                         MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                          |
+|               |                                                             CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                       |
+|               |                                                               RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 1 }], 2), input_partitions=0                                                                                                             |
+|               |                                                                 MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                  |
+|               |                                                   CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                 |
+|               |                                                     RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2), input_partitions=0                                                                                                                       |
+|               |                                                       MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                            |
+|               |                                         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                           |
+|               |                                           RepartitionExec: partitioning=Hash([Column { name: "ps_suppkey", index: 1 }, Column { name: "ps_partkey", index: 0 }], 2), input_partitions=0                                                                                       |
+|               |                                             MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                      |
+|               |                               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                     |
+|               |                                 RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 0 }], 2), input_partitions=0                                                                                                                                          |
+|               |                                   MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                |
+|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                               |
+|               |                       RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=0                                                                                                                                                   |
+|               |                         MemoryExec: partitions=0, partition_sizes=[]                                                                                                                                                                                                          |
+|               |                                                                                                                                                                                                                                                                               |
++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
diff --git a/benchmarks/queries/q8.sql b/benchmarks/queries/q8.sql
index 6ddb2a6747..4f34dca6a0 100644
--- a/benchmarks/queries/q8.sql
+++ b/benchmarks/queries/q8.sql
@@ -1,9 +1,9 @@
 select
     o_year,
-    sum(case
-            when nation = 'BRAZIL' then volume
-            else 0
-        end) / sum(volume) as mkt_share
+    cast(cast(sum(case
+                      when nation = 'BRAZIL' then volume
+                      else 0
+        end) as decimal(12,2)) / cast(sum(volume) as decimal(12,2)) as decimal(15,2)) as mkt_share
 from
     (
         select
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 1c50478522..36da520cb7 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -908,7 +908,6 @@ mod ci {
         verify_query(5).await
     }
 
-    #[ignore] // https://github.com/apache/arrow-datafusion/issues/4024
     #[tokio::test]
     async fn verify_q6() -> Result<()> {
         verify_query(6).await
diff --git a/datafusion/core/tests/sqllogictests/src/engines/conversion.rs b/datafusion/core/tests/sqllogictests/src/engines/conversion.rs
index 0d013c47b3..c069c2d4a4 100644
--- a/datafusion/core/tests/sqllogictests/src/engines/conversion.rs
+++ b/datafusion/core/tests/sqllogictests/src/engines/conversion.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow::datatypes::{Decimal128Type, DecimalType};
 use bigdecimal::BigDecimal;
 use half::f16;
 use rust_decimal::prelude::*;
@@ -74,9 +75,9 @@ pub fn f64_to_str(value: f64) -> String {
     }
 }
 
-pub fn i128_to_str(value: i128, scale: u32) -> String {
+pub fn i128_to_str(value: i128, precision: &u8, scale: &i8) -> String {
     big_decimal_to_str(
-        BigDecimal::from_str(&Decimal::from_i128_with_scale(value, scale).to_string())
+        BigDecimal::from_str(&Decimal128Type::format_decimal(value, *precision, *scale))
             .unwrap(),
     )
 }
diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs
index 1f4855730c..20cd2331a4 100644
--- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs
+++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs
@@ -200,10 +200,9 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result<String> {
             DataType::Float64 => {
                 Ok(f64_to_str(get_row_value!(array::Float64Array, col, row)))
             }
-            DataType::Decimal128(_, scale) => {
+            DataType::Decimal128(precision, scale) => {
                 let value = get_row_value!(array::Decimal128Array, col, row);
-                let decimal_scale = u32::try_from((*scale).max(0)).unwrap();
-                Ok(i128_to_str(value, decimal_scale))
+                Ok(i128_to_str(value, precision, scale))
             }
             DataType::LargeUtf8 => Ok(varchar_to_str(get_row_value!(
                 array::LargeStringArray,
diff --git a/datafusion/core/tests/sqllogictests/test_files/tpch.slt b/datafusion/core/tests/sqllogictests/test_files/tpch.slt
index ee9b8d92d5..619fc8e0f6 100644
--- a/datafusion/core/tests/sqllogictests/test_files/tpch.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/tpch.slt
@@ -118,6 +118,10 @@ CREATE EXTERNAL TABLE IF NOT EXISTS supplier (
 
 
 # q1
+# The decimal calculation `sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge` actually causes
+# overflow on decimal multiplication. Because the kernel DataFusion uses doesn't allow precision loss, we need
+# a cast to decimal(12,2) to avoid the overflow. We can get rid of it once we can use new multiply kernel from
+# arrow-rs which allows precision-loss.
 query TTRRRRRRRI
 select
     l_returnflag,
@@ -125,7 +129,7 @@ select
     sum(l_quantity) as sum_qty,
     sum(l_extendedprice) as sum_base_price,
     sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
-    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
+    sum(cast(l_extendedprice as decimal(12,2)) * (1 - l_discount) * (1 + l_tax)) as sum_charge,
     avg(l_quantity) as avg_qty,
     avg(l_extendedprice) as avg_price,
     avg(l_discount) as avg_disc,
diff --git a/datafusion/expr/src/type_coercion/binary.rs b/datafusion/expr/src/type_coercion/binary.rs
index 7f767d9368..4983dd25d2 100644
--- a/datafusion/expr/src/type_coercion/binary.rs
+++ b/datafusion/expr/src/type_coercion/binary.rs
@@ -17,7 +17,7 @@
 
 //! Coercion rules for matching argument types for binary operators
 
-use crate::type_coercion::{is_date, is_interval, is_numeric, is_timestamp};
+use crate::type_coercion::{is_date, is_decimal, is_interval, is_numeric, is_timestamp};
 use crate::Operator;
 use arrow::compute::can_cast_types;
 use arrow::datatypes::{
@@ -35,9 +35,40 @@ pub fn binary_operator_data_type(
     op: &Operator,
     rhs_type: &DataType,
 ) -> Result<DataType> {
-    // validate that it is possible to perform the operation on incoming types.
-    // (or the return datatype cannot be inferred)
-    let result_type = coerce_types(lhs_type, op, rhs_type)?;
+    let result_type = if !any_decimal(lhs_type, rhs_type) {
+        // validate that it is possible to perform the operation on incoming types.
+        // (or the return datatype cannot be inferred)
+        coerce_types(lhs_type, op, rhs_type)?
+    } else {
+        let (coerced_lhs_type, coerced_rhs_type) =
+            math_decimal_coercion(lhs_type, rhs_type);
+        let lhs_type = if let Some(lhs_type) = coerced_lhs_type {
+            lhs_type
+        } else {
+            lhs_type.clone()
+        };
+        let rhs_type = if let Some(rhs_type) = coerced_rhs_type {
+            rhs_type
+        } else {
+            rhs_type.clone()
+        };
+
+        match op {
+            Operator::Plus
+            | Operator::Minus
+            | Operator::Divide
+            | Operator::Multiply
+            | Operator::Modulo => decimal_op_mathematics_type(op, &lhs_type, &rhs_type)
+                .or_else(|| coerce_types(&lhs_type, op, &rhs_type).ok())
+                .ok_or_else(|| {
+                    DataFusionError::Internal(format!(
+                        "Could not get return type for {:?} between {:?} and {:?}",
+                        op, lhs_type, rhs_type
+                    ))
+                })?,
+            _ => coerce_types(&lhs_type, op, &rhs_type)?,
+        }
+    };
 
     match op {
         // operators that return a boolean
@@ -135,7 +166,7 @@ pub fn coerce_types(
         | Operator::Minus
         | Operator::Modulo
         | Operator::Divide
-        | Operator::Multiply => mathematics_numerical_coercion(op, lhs_type, rhs_type),
+        | Operator::Multiply => mathematics_numerical_coercion(lhs_type, rhs_type),
         Operator::RegexMatch
         | Operator::RegexIMatch
         | Operator::RegexNotMatch
@@ -155,6 +186,46 @@ pub fn coerce_types(
     }
 }
 
+/// Coercion rules for mathematics operators between decimal and non-decimal types.
+pub fn math_decimal_coercion(
+    lhs_type: &DataType,
+    rhs_type: &DataType,
+) -> (Option<DataType>, Option<DataType>) {
+    use arrow::datatypes::DataType::*;
+
+    if both_decimal(lhs_type, rhs_type) {
+        return (None, None);
+    }
+
+    match (lhs_type, rhs_type) {
+        (Null, dec_type @ Decimal128(_, _)) => (Some(dec_type.clone()), None),
+        (dec_type @ Decimal128(_, _), Null) => (None, Some(dec_type.clone())),
+        (Dictionary(key_type, value_type), _) => {
+            let (value_type, rhs_type) = math_decimal_coercion(value_type, rhs_type);
+            let lhs_type = value_type
+                .map(|value_type| Dictionary(key_type.clone(), Box::new(value_type)));
+            (lhs_type, rhs_type)
+        }
+        (_, Dictionary(key_type, value_type)) => {
+            let (lhs_type, value_type) = math_decimal_coercion(lhs_type, value_type);
+            let rhs_type = value_type
+                .map(|value_type| Dictionary(key_type.clone(), Box::new(value_type)));
+            (lhs_type, rhs_type)
+        }
+        (Decimal128(_, _), Float32 | Float64) => (Some(Float64), Some(Float64)),
+        (Float32 | Float64, Decimal128(_, _)) => (Some(Float64), Some(Float64)),
+        (Decimal128(_, _), _) => {
+            let converted_decimal_type = coerce_numeric_type_to_decimal(rhs_type);
+            (None, converted_decimal_type)
+        }
+        (_, Decimal128(_, _)) => {
+            let converted_decimal_type = coerce_numeric_type_to_decimal(lhs_type);
+            (converted_decimal_type, None)
+        }
+        _ => (None, None),
+    }
+}
+
 /// Returns the output type of applying bitwise operations such as
 /// `&`, `|`, or `xor`to arguments of `lhs_type` and `rhs_type`.
 fn bitwise_coercion(left_type: &DataType, right_type: &DataType) -> Option<DataType> {
@@ -428,7 +499,6 @@ fn coerce_numeric_type_to_decimal(numeric_type: &DataType) -> Option<DataType> {
 /// Returns the output type of applying mathematics operations such as
 /// `+` to arguments of `lhs_type` and `rhs_type`.
 fn mathematics_numerical_coercion(
-    mathematics_op: &Operator,
     lhs_type: &DataType,
     rhs_type: &DataType,
 ) -> Option<DataType> {
@@ -452,47 +522,16 @@ fn mathematics_numerical_coercion(
     // these are ordered from most informative to least informative so
     // that the coercion removes the least amount of information
     match (lhs_type, rhs_type) {
-        (Decimal128(_, _), Decimal128(_, _)) => {
-            coercion_decimal_mathematics_type(mathematics_op, lhs_type, rhs_type)
-        }
-        (Null, dec_type @ Decimal128(_, _)) | (dec_type @ Decimal128(_, _), Null) => {
-            Some(dec_type.clone())
-        }
         (Dictionary(_, lhs_value_type), Dictionary(_, rhs_value_type)) => {
-            mathematics_numerical_coercion(mathematics_op, lhs_value_type, rhs_value_type)
+            mathematics_numerical_coercion(lhs_value_type, rhs_value_type)
         }
         (Dictionary(key_type, value_type), _) => {
-            let value_type =
-                mathematics_numerical_coercion(mathematics_op, value_type, rhs_type);
+            let value_type = mathematics_numerical_coercion(value_type, rhs_type);
             value_type
                 .map(|value_type| Dictionary(key_type.clone(), Box::new(value_type)))
         }
         (_, Dictionary(_, value_type)) => {
-            mathematics_numerical_coercion(mathematics_op, lhs_type, value_type)
-        }
-        (Decimal128(_, _), Float32 | Float64) => Some(Float64),
-        (Float32 | Float64, Decimal128(_, _)) => Some(Float64),
-        (Decimal128(_, _), _) => {
-            let converted_decimal_type = coerce_numeric_type_to_decimal(rhs_type);
-            match converted_decimal_type {
-                None => None,
-                Some(right_decimal_type) => coercion_decimal_mathematics_type(
-                    mathematics_op,
-                    lhs_type,
-                    &right_decimal_type,
-                ),
-            }
-        }
-        (_, Decimal128(_, _)) => {
-            let converted_decimal_type = coerce_numeric_type_to_decimal(lhs_type);
-            match converted_decimal_type {
-                None => None,
-                Some(left_decimal_type) => coercion_decimal_mathematics_type(
-                    mathematics_op,
-                    &left_decimal_type,
-                    rhs_type,
-                ),
-            }
+            mathematics_numerical_coercion(lhs_type, value_type)
         }
         (Float64, _) | (_, Float64) => Some(Float64),
         (_, Float32) | (Float32, _) => Some(Float32),
@@ -515,7 +554,38 @@ fn create_decimal_type(precision: u8, scale: i8) -> DataType {
     )
 }
 
-fn coercion_decimal_mathematics_type(
+/// Returns the coerced type of applying mathematics operations on decimal types.
+/// Two sides of the mathematics operation will be coerced to the same type. Note
+/// that we don't coerce the decimal operands in analysis phase, but do it in the
+/// execution phase because this is not idempotent.
+pub fn coercion_decimal_mathematics_type(
+    mathematics_op: &Operator,
+    left_decimal_type: &DataType,
+    right_decimal_type: &DataType,
+) -> Option<DataType> {
+    use arrow::datatypes::DataType::*;
+    match (left_decimal_type, right_decimal_type) {
+        // The promotion rule from spark
+        // https://github.com/apache/spark/blob/c20af535803a7250fef047c2bf0fe30be242369d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala#L35
+        (Decimal128(_, _), Decimal128(_, _)) => match mathematics_op {
+            Operator::Plus | Operator::Minus => decimal_op_mathematics_type(
+                mathematics_op,
+                left_decimal_type,
+                right_decimal_type,
+            ),
+            Operator::Multiply | Operator::Divide | Operator::Modulo => {
+                get_wider_decimal_type(left_decimal_type, right_decimal_type)
+            }
+            _ => None,
+        },
+        _ => None,
+    }
+}
+
+/// Returns the output type of applying mathematics operations on decimal types.
+/// The rule is from spark. Note that this is different to the coerced type applied
+/// to two sides of the arithmetic operation.
+pub fn decimal_op_mathematics_type(
     mathematics_op: &Operator,
     left_decimal_type: &DataType,
     right_decimal_type: &DataType,
@@ -559,6 +629,27 @@ fn coercion_decimal_mathematics_type(
                 _ => None,
             }
         }
+        (Dictionary(_, lhs_value_type), Dictionary(_, rhs_value_type)) => {
+            decimal_op_mathematics_type(
+                mathematics_op,
+                lhs_value_type.as_ref(),
+                rhs_value_type.as_ref(),
+            )
+        }
+        (Dictionary(key_type, value_type), _) => {
+            let value_type = decimal_op_mathematics_type(
+                mathematics_op,
+                value_type.as_ref(),
+                right_decimal_type,
+            );
+            value_type
+                .map(|value_type| Dictionary(key_type.clone(), Box::new(value_type)))
+        }
+        (_, Dictionary(_, value_type)) => decimal_op_mathematics_type(
+            mathematics_op,
+            left_decimal_type,
+            value_type.as_ref(),
+        ),
         _ => None,
     }
 }
@@ -582,6 +673,35 @@ fn both_numeric_or_null_and_numeric(lhs_type: &DataType, rhs_type: &DataType) ->
     }
 }
 
+/// Determine if at least of one of lhs and rhs is decimal, and the other must be NULL or decimal
+fn both_decimal(lhs_type: &DataType, rhs_type: &DataType) -> bool {
+    match (lhs_type, rhs_type) {
+        (_, DataType::Null) => is_decimal(lhs_type),
+        (DataType::Null, _) => is_decimal(rhs_type),
+        (DataType::Decimal128(_, _), DataType::Decimal128(_, _)) => true,
+        (DataType::Dictionary(_, value_type), _) => {
+            is_decimal(value_type) && is_decimal(rhs_type)
+        }
+        (_, DataType::Dictionary(_, value_type)) => {
+            is_decimal(lhs_type) && is_decimal(value_type)
+        }
+        _ => false,
+    }
+}
+
+/// Determine if at least of one of lhs and rhs is decimal
+pub fn any_decimal(lhs_type: &DataType, rhs_type: &DataType) -> bool {
+    match (lhs_type, rhs_type) {
+        (DataType::Dictionary(_, value_type), _) => {
+            is_decimal(value_type) || is_decimal(rhs_type)
+        }
+        (_, DataType::Dictionary(_, value_type)) => {
+            is_decimal(lhs_type) || is_decimal(value_type)
+        }
+        (_, _) => is_decimal(lhs_type) || is_decimal(rhs_type),
+    }
+}
+
 /// Coercion rules for Dictionaries: the type that both lhs and rhs
 /// can be casted to for the purpose of a computation.
 ///
@@ -885,6 +1005,9 @@ mod tests {
             &left_decimal_type,
             &right_decimal_type,
         );
+        assert_eq!(DataType::Decimal128(20, 4), result.unwrap());
+        let result =
+            decimal_op_mathematics_type(&op, &left_decimal_type, &right_decimal_type);
         assert_eq!(DataType::Decimal128(31, 7), result.unwrap());
         let op = Operator::Divide;
         let result = coercion_decimal_mathematics_type(
@@ -892,6 +1015,9 @@ mod tests {
             &left_decimal_type,
             &right_decimal_type,
         );
+        assert_eq!(DataType::Decimal128(20, 4), result.unwrap());
+        let result =
+            decimal_op_mathematics_type(&op, &left_decimal_type, &right_decimal_type);
         assert_eq!(DataType::Decimal128(35, 24), result.unwrap());
         let op = Operator::Modulo;
         let result = coercion_decimal_mathematics_type(
@@ -899,6 +1025,9 @@ mod tests {
             &left_decimal_type,
             &right_decimal_type,
         );
+        assert_eq!(DataType::Decimal128(20, 4), result.unwrap());
+        let result =
+            decimal_op_mathematics_type(&op, &left_decimal_type, &right_decimal_type);
         assert_eq!(DataType::Decimal128(11, 4), result.unwrap());
     }
 
@@ -1154,45 +1283,101 @@ mod tests {
             Operator::Multiply,
             DataType::Float64
         );
-        // decimal
-        // bug: https://github.com/apache/arrow-datafusion/issues/3387 will be fixed in the next pr
-        // test_coercion_binary_rule!(
-        //     DataType::Decimal128(10, 2),
-        //     DataType::Decimal128(10, 2),
-        //     Operator::Plus,
-        //     DataType::Decimal128(11, 2)
-        // );
-        test_coercion_binary_rule!(
+        // TODO add other data type
+        Ok(())
+    }
+
+    fn test_math_decimal_coercion_rule(
+        lhs_type: DataType,
+        rhs_type: DataType,
+        mathematics_op: Operator,
+        expected_lhs_type: Option<DataType>,
+        expected_rhs_type: Option<DataType>,
+        expected_coerced_type: DataType,
+        expected_output_type: DataType,
+    ) {
+        // The coerced types for lhs and rhs, if any of them is not decimal
+        let (l, r) = math_decimal_coercion(&lhs_type, &rhs_type);
+        assert_eq!(l, expected_lhs_type);
+        assert_eq!(r, expected_rhs_type);
+
+        let lhs_type = l.unwrap_or(lhs_type);
+        let rhs_type = r.unwrap_or(rhs_type);
+
+        // The coerced type of decimal math expression, applied during expression evaluation
+        let coerced_type =
+            coercion_decimal_mathematics_type(&mathematics_op, &lhs_type, &rhs_type)
+                .unwrap();
+        assert_eq!(coerced_type, expected_coerced_type);
+
+        // The output type of decimal math expression
+        let output_type =
+            decimal_op_mathematics_type(&mathematics_op, &lhs_type, &rhs_type).unwrap();
+        assert_eq!(output_type, expected_output_type);
+    }
+
+    #[test]
+    fn test_coercion_arithmetic_decimal() -> Result<()> {
+        test_math_decimal_coercion_rule(
+            DataType::Decimal128(10, 2),
+            DataType::Decimal128(10, 2),
+            Operator::Plus,
+            None,
+            None,
+            DataType::Decimal128(11, 2),
+            DataType::Decimal128(11, 2),
+        );
+
+        test_math_decimal_coercion_rule(
             DataType::Int32,
             DataType::Decimal128(10, 2),
             Operator::Plus,
-            DataType::Decimal128(13, 2)
+            Some(DataType::Decimal128(10, 0)),
+            None,
+            DataType::Decimal128(13, 2),
+            DataType::Decimal128(13, 2),
         );
-        test_coercion_binary_rule!(
+
+        test_math_decimal_coercion_rule(
             DataType::Int32,
             DataType::Decimal128(10, 2),
             Operator::Minus,
-            DataType::Decimal128(13, 2)
+            Some(DataType::Decimal128(10, 0)),
+            None,
+            DataType::Decimal128(13, 2),
+            DataType::Decimal128(13, 2),
         );
-        test_coercion_binary_rule!(
+
+        test_math_decimal_coercion_rule(
             DataType::Int32,
             DataType::Decimal128(10, 2),
             Operator::Multiply,
-            DataType::Decimal128(21, 2)
+            Some(DataType::Decimal128(10, 0)),
+            None,
+            DataType::Decimal128(12, 2),
+            DataType::Decimal128(21, 2),
         );
-        test_coercion_binary_rule!(
+
+        test_math_decimal_coercion_rule(
             DataType::Int32,
             DataType::Decimal128(10, 2),
             Operator::Divide,
-            DataType::Decimal128(23, 11)
+            Some(DataType::Decimal128(10, 0)),
+            None,
+            DataType::Decimal128(12, 2),
+            DataType::Decimal128(23, 11),
         );
-        test_coercion_binary_rule!(
+
+        test_math_decimal_coercion_rule(
             DataType::Int32,
             DataType::Decimal128(10, 2),
             Operator::Modulo,
-            DataType::Decimal128(10, 2)
+            Some(DataType::Decimal128(10, 0)),
+            None,
+            DataType::Decimal128(12, 2),
+            DataType::Decimal128(10, 2),
         );
-        // TODO add other data type
+
         Ok(())
     }
 
diff --git a/datafusion/expr/src/type_coercion/mod.rs b/datafusion/expr/src/type_coercion/mod.rs
index 73b4a1f93b..4a1d695dea 100644
--- a/datafusion/expr/src/type_coercion/mod.rs
+++ b/datafusion/expr/src/type_coercion/mod.rs
@@ -85,3 +85,8 @@ pub fn is_date(dt: &DataType) -> bool {
 pub fn is_utf8_or_large_utf8(dt: &DataType) -> bool {
     matches!(dt, DataType::Utf8 | DataType::LargeUtf8)
 }
+
+/// Determine whether the given data type `dt` is a `Decimal`.
+pub fn is_decimal(dt: &DataType) -> bool {
+    matches!(dt, DataType::Decimal128(_, _))
+}
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 9fb27036aa..7a346b5b9e 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -28,7 +28,7 @@ use datafusion_expr::expr::{self, Between, BinaryExpr, Case, Like, WindowFunctio
 use datafusion_expr::expr_schema::cast_subquery;
 use datafusion_expr::logical_plan::Subquery;
 use datafusion_expr::type_coercion::binary::{
-    coerce_types, comparison_coercion, like_coercion,
+    any_decimal, coerce_types, comparison_coercion, like_coercion, math_decimal_coercion,
 };
 use datafusion_expr::type_coercion::functions::data_types;
 use datafusion_expr::type_coercion::other::{
@@ -266,6 +266,33 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
                             )))
                         }
                     }
+                    // For numerical operations between decimals, we don't coerce the types.
+                    // But if only one of the operands is decimal, we cast the other operand to decimal
+                    // if the other operand is integer. If the other operand is float, we cast the
+                    // decimal operand to float.
+                    (lhs_type, rhs_type)
+                        if op.is_numerical_operators()
+                            && any_decimal(lhs_type, rhs_type) =>
+                    {
+                        let (coerced_lhs_type, coerced_rhs_type) =
+                            math_decimal_coercion(lhs_type, rhs_type);
+                        let new_left = if let Some(lhs_type) = coerced_lhs_type {
+                            left.clone().cast_to(&lhs_type, &self.schema)?
+                        } else {
+                            left.as_ref().clone()
+                        };
+                        let new_right = if let Some(rhs_type) = coerced_rhs_type {
+                            right.clone().cast_to(&rhs_type, &self.schema)?
+                        } else {
+                            right.as_ref().clone()
+                        };
+                        let expr = Expr::BinaryExpr(BinaryExpr::new(
+                            Box::new(new_left),
+                            op,
+                            Box::new(new_right),
+                        ));
+                        Ok(expr)
+                    }
                     _ => {
                         let coerced_type = coerce_types(&left_type, &op, &right_type)?;
                         let expr = Expr::BinaryExpr(BinaryExpr::new(
diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs
index 5ef4342435..b98a2dc420 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -50,7 +50,7 @@ use arrow::compute::kernels::comparison::{
     eq_dyn_utf8_scalar, gt_dyn_utf8_scalar, gt_eq_dyn_utf8_scalar, lt_dyn_utf8_scalar,
     lt_eq_dyn_utf8_scalar, neq_dyn_utf8_scalar,
 };
-use arrow::compute::{try_unary, unary};
+use arrow::compute::{try_unary, unary, CastOptions};
 use arrow::datatypes::*;
 
 use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn};
@@ -61,7 +61,7 @@ use datafusion_common::scalar::{
     op_ym_dt, op_ym_mdn, parse_timezones, seconds_add, seconds_sub, MILLISECOND_MODE,
     NANOSECOND_MODE,
 };
-use datafusion_expr::type_coercion::{is_timestamp, is_utf8_or_large_utf8};
+use datafusion_expr::type_coercion::{is_decimal, is_timestamp, is_utf8_or_large_utf8};
 use kernels::{
     bitwise_and, bitwise_and_scalar, bitwise_or, bitwise_or_scalar, bitwise_shift_left,
     bitwise_shift_left_scalar, bitwise_shift_right, bitwise_shift_right_scalar,
@@ -82,6 +82,7 @@ use arrow::datatypes::{DataType, Schema, TimeUnit};
 use arrow::record_batch::RecordBatch;
 
 use super::column::Column;
+use crate::expressions::cast_column;
 use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
 use crate::intervals::{apply_operator, Interval};
 use crate::physical_expr::down_cast_any_ref;
@@ -95,7 +96,9 @@ use datafusion_common::cast::{
 use datafusion_common::scalar::*;
 use datafusion_common::ScalarValue;
 use datafusion_common::{DataFusionError, Result};
-use datafusion_expr::type_coercion::binary::binary_operator_data_type;
+use datafusion_expr::type_coercion::binary::{
+    binary_operator_data_type, coercion_decimal_mathematics_type,
+};
 use datafusion_expr::{ColumnarValue, Operator};
 
 /// Binary expression
@@ -383,12 +386,14 @@ macro_rules! compute_primitive_op_dyn_scalar {
 /// LEFT is Decimal or Dictionary array of decimal values, RIGHT is scalar value
 /// OP_TYPE is the return type of scalar function
 macro_rules! compute_primitive_decimal_op_dyn_scalar {
-    ($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{
+    ($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr, $RET_TYPE:expr) => {{
         // generate the scalar function name, such as add_decimal_dyn_scalar,
         // from the $OP parameter (which could have a value of add) and the
         // suffix _decimal_dyn_scalar
         if let Some(value) = $RIGHT {
-            Ok(paste::expr! {[<$OP _decimal_dyn_scalar>]}($LEFT, value)?)
+            Ok(paste::expr! {[<$OP _decimal_dyn_scalar>]}(
+                $LEFT, value, $RET_TYPE,
+            )?)
         } else {
             // when the $RIGHT is a NULL, generate a NULL array of $OP_TYPE
             Ok(Arc::new(new_null_array($OP_TYPE, $LEFT.len())))
@@ -437,15 +442,15 @@ macro_rules! binary_string_array_op {
 /// The binary_primitive_array_op macro only evaluates for primitive types
 /// like integers and floats.
 macro_rules! binary_primitive_array_op_dyn {
-    ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
+    ($LEFT:expr, $RIGHT:expr, $OP:ident, $RET_TYPE:expr) => {{
         match $LEFT.data_type() {
             DataType::Decimal128(_, _) => {
-                Ok(paste::expr! {[<$OP _decimal>]}(&$LEFT, &$RIGHT)?)
+                Ok(paste::expr! {[<$OP _decimal>]}(&$LEFT, &$RIGHT, $RET_TYPE)?)
             }
             DataType::Dictionary(_, value_type)
                 if matches!(value_type.as_ref(), &DataType::Decimal128(_, _)) =>
             {
-                Ok(paste::expr! {[<$OP _decimal>]}(&$LEFT, &$RIGHT)?)
+                Ok(paste::expr! {[<$OP _decimal>]}(&$LEFT, &$RIGHT, $RET_TYPE)?)
             }
             _ => Ok(Arc::new(
                 $OP(&$LEFT, &$RIGHT).map_err(|err| DataFusionError::ArrowError(err))?,
@@ -458,13 +463,13 @@ macro_rules! binary_primitive_array_op_dyn {
 /// The binary_primitive_array_op_dyn_scalar macro only evaluates for primitive
 /// types like integers and floats.
 macro_rules! binary_primitive_array_op_dyn_scalar {
-    ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
+    ($LEFT:expr, $RIGHT:expr, $OP:ident, $RET_TYPE:expr) => {{
         // unwrap underlying (non dictionary) value
         let right = unwrap_dict_value($RIGHT);
         let op_type = $LEFT.data_type();
 
         let result: Result<Arc<dyn Array>> = match right {
-            ScalarValue::Decimal128(v, _, _) => compute_primitive_decimal_op_dyn_scalar!($LEFT, v, $OP, op_type),
+            ScalarValue::Decimal128(v, _, _) => compute_primitive_decimal_op_dyn_scalar!($LEFT, v, $OP, op_type, $RET_TYPE),
             ScalarValue::Int8(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, Int8Type),
             ScalarValue::Int16(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, Int16Type),
             ScalarValue::Int32(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, Int32Type),
@@ -662,9 +667,13 @@ impl PhysicalExpr for BinaryExpr {
         let left_data_type = left_value.data_type();
         let right_data_type = right_value.data_type();
 
+        let schema = batch.schema();
+        let input_schema = schema.as_ref();
+
         match (&left_value, &left_data_type, &right_value, &right_data_type) {
             // Types are equal => valid
-            (_, l, _, r) if l == r => {}
+            // Decimal types can be different but still valid as we coerce them to the same type later
+            (_, l, _, r) if l == r || (is_decimal(l) && is_decimal(r)) => {}
             // Allow comparing a dictionary value with its corresponding scalar value
             (
                 ColumnarValue::Array(_),
@@ -677,7 +686,8 @@ impl PhysicalExpr for BinaryExpr {
                 scalar_t,
                 ColumnarValue::Array(_),
                 DataType::Dictionary(_, dict_t),
-            ) if dict_t.as_ref() == scalar_t => {}
+            ) if dict_t.as_ref() == scalar_t
+                || (is_decimal(dict_t.as_ref()) && is_decimal(scalar_t)) => {}
             _ => {
                 return Err(DataFusionError::Internal(format!(
                     "Cannot evaluate binary expression {:?} with types {:?} and {:?}",
@@ -686,11 +696,29 @@ impl PhysicalExpr for BinaryExpr {
             }
         }
 
+        // Coerce decimal types to the same scale and precision
+        let coerced_type = coercion_decimal_mathematics_type(
+            &self.op,
+            &left_data_type,
+            &right_data_type,
+        );
+        let (left_value, right_value) = if let Some(coerced_type) = coerced_type {
+            let options = CastOptions { safe: true };
+            let left_value = cast_column(&left_value, &coerced_type, &options)?;
+            let right_value = cast_column(&right_value, &coerced_type, &options)?;
+            (left_value, right_value)
+        } else {
+            // No need to coerce if it is not decimal or not math operation
+            (left_value, right_value)
+        };
+
+        let result_type = self.data_type(input_schema)?;
+
         // Attempt to use special kernels if one input is scalar and the other is an array
         let scalar_result = match (&left_value, &right_value) {
             (ColumnarValue::Array(array), ColumnarValue::Scalar(scalar)) => {
                 // if left is array and right is literal - use scalar operations
-                self.evaluate_array_scalar(array, scalar.clone())?
+                self.evaluate_array_scalar(array, scalar.clone(), &result_type)?
             }
             (ColumnarValue::Scalar(scalar), ColumnarValue::Array(array)) => {
                 // if right is literal and left is array - reverse operator and parameters
@@ -708,8 +736,14 @@ impl PhysicalExpr for BinaryExpr {
             left_value.into_array(batch.num_rows()),
             right_value.into_array(batch.num_rows()),
         );
-        self.evaluate_with_resolved_args(left, &left_data_type, right, &right_data_type)
-            .map(|a| ColumnarValue::Array(a))
+        self.evaluate_with_resolved_args(
+            left,
+            &left_data_type,
+            right,
+            &right_data_type,
+            &result_type,
+        )
+        .map(|a| ColumnarValue::Array(a))
     }
 
     fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
@@ -1025,6 +1059,7 @@ impl BinaryExpr {
         &self,
         array: &dyn Array,
         scalar: ScalarValue,
+        result_type: &DataType,
     ) -> Result<Option<Result<ArrayRef>>> {
         let bool_type = &DataType::Boolean;
         let scalar_result = match &self.op {
@@ -1047,19 +1082,29 @@ impl BinaryExpr {
                 binary_array_op_dyn_scalar!(array, scalar, neq, bool_type)
             }
             Operator::Plus => {
-                binary_primitive_array_op_dyn_scalar!(array, scalar, add)
+                binary_primitive_array_op_dyn_scalar!(array, scalar, add, result_type)
             }
             Operator::Minus => {
-                binary_primitive_array_op_dyn_scalar!(array, scalar, subtract)
+                binary_primitive_array_op_dyn_scalar!(
+                    array,
+                    scalar,
+                    subtract,
+                    result_type
+                )
             }
             Operator::Multiply => {
-                binary_primitive_array_op_dyn_scalar!(array, scalar, multiply)
+                binary_primitive_array_op_dyn_scalar!(
+                    array,
+                    scalar,
+                    multiply,
+                    result_type
+                )
             }
             Operator::Divide => {
-                binary_primitive_array_op_dyn_scalar!(array, scalar, divide)
+                binary_primitive_array_op_dyn_scalar!(array, scalar, divide, result_type)
             }
             Operator::Modulo => {
-                binary_primitive_array_op_dyn_scalar!(array, scalar, modulus)
+                binary_primitive_array_op_dyn_scalar!(array, scalar, modulus, result_type)
             }
             Operator::RegexMatch => binary_string_array_flag_op_scalar!(
                 array,
@@ -1140,6 +1185,7 @@ impl BinaryExpr {
         left_data_type: &DataType,
         right: Arc<dyn Array>,
         right_data_type: &DataType,
+        result_type: &DataType,
     ) -> Result<ArrayRef> {
         match &self.op {
             Operator::Lt => lt_dyn(&left, &right),
@@ -1161,16 +1207,20 @@ impl BinaryExpr {
             Operator::IsNotDistinctFrom => {
                 binary_array_op!(left, right, is_not_distinct_from)
             }
-            Operator::Plus => binary_primitive_array_op_dyn!(left, right, add_dyn),
-            Operator::Minus => binary_primitive_array_op_dyn!(left, right, subtract_dyn),
+            Operator::Plus => {
+                binary_primitive_array_op_dyn!(left, right, add_dyn, result_type)
+            }
+            Operator::Minus => {
+                binary_primitive_array_op_dyn!(left, right, subtract_dyn, result_type)
+            }
             Operator::Multiply => {
-                binary_primitive_array_op_dyn!(left, right, multiply_dyn)
+                binary_primitive_array_op_dyn!(left, right, multiply_dyn, result_type)
             }
             Operator::Divide => {
-                binary_primitive_array_op_dyn!(left, right, divide_dyn_opt)
+                binary_primitive_array_op_dyn!(left, right, divide_dyn_opt, result_type)
             }
             Operator::Modulo => {
-                binary_primitive_array_op_dyn!(left, right, modulus_dyn)
+                binary_primitive_array_op_dyn!(left, right, modulus_dyn, result_type)
             }
             Operator::And => {
                 if left_data_type == &DataType::Boolean {
@@ -1236,7 +1286,7 @@ pub fn binary(
             "The type of {lhs_type} {op:?} {rhs_type} of binary physical should be same"
         )));
     }
-    if !lhs_type.eq(rhs_type) {
+    if !lhs_type.eq(rhs_type) && (!is_decimal(lhs_type) && !is_decimal(rhs_type)) {
         return Err(DataFusionError::Internal(format!(
             "The type of {lhs_type} {op:?} {rhs_type} of binary physical should be same"
         )));
@@ -2050,7 +2100,7 @@ mod tests {
         ArrowNumericType, Decimal128Type, Field, Int32Type, SchemaRef,
     };
     use datafusion_common::{ColumnStatistics, Result, Statistics};
-    use datafusion_expr::type_coercion::binary::coerce_types;
+    use datafusion_expr::type_coercion::binary::{coerce_types, math_decimal_coercion};
 
     // Create a binary expression without coercion. Used here when we do not want to coerce the expressions
     // to valid types. Usage can result in an execution (after plan) error.
@@ -2606,7 +2656,7 @@ mod tests {
             Arc::new(schema),
             vec![Arc::new(a), Arc::new(b)],
             Operator::Plus,
-            create_decimal_array(&[Some(247), None, None, Some(247), Some(246)], 10, 0),
+            create_decimal_array(&[Some(247), None, None, Some(247), Some(246)], 11, 0),
         )?;
 
         Ok(())
@@ -2691,7 +2741,7 @@ mod tests {
         let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
         let decimal_array = create_decimal_array(
             &[Some(value + 1), None, Some(value), Some(value + 2)],
-            10,
+            11,
             0,
         );
         let expected = DictionaryArray::try_new(&keys, &decimal_array)?;
@@ -2825,7 +2875,7 @@ mod tests {
             Arc::new(schema),
             vec![Arc::new(a), Arc::new(b)],
             Operator::Minus,
-            create_decimal_array(&[Some(-1), None, None, Some(1), Some(0)], 10, 0),
+            create_decimal_array(&[Some(-1), None, None, Some(1), Some(0)], 11, 0),
         )?;
 
         Ok(())
@@ -2910,7 +2960,7 @@ mod tests {
         let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
         let decimal_array = create_decimal_array(
             &[Some(value - 1), None, Some(value - 2), Some(value)],
-            10,
+            11,
             0,
         );
         let expected = DictionaryArray::try_new(&keys, &decimal_array)?;
@@ -3038,7 +3088,7 @@ mod tests {
             Operator::Multiply,
             create_decimal_array(
                 &[Some(15252), None, None, Some(15252), Some(15129)],
-                10,
+                21,
                 0,
             ),
         )?;
@@ -3124,7 +3174,7 @@ mod tests {
 
         let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
         let decimal_array =
-            create_decimal_array(&[Some(246), None, Some(244), Some(248)], 10, 0);
+            create_decimal_array(&[Some(246), None, Some(244), Some(248)], 21, 0);
         let expected = DictionaryArray::try_new(&keys, &decimal_array)?;
 
         apply_arithmetic_scalar(
@@ -3254,7 +3304,17 @@ mod tests {
             Arc::new(schema),
             vec![Arc::new(a), Arc::new(b)],
             Operator::Divide,
-            create_decimal_array(&[Some(0), None, None, Some(1), Some(1)], 10, 0),
+            create_decimal_array(
+                &[
+                    Some(99193548387), // 0.99193548387
+                    None,
+                    None,
+                    Some(100813008130), // 1.0081300813
+                    Some(100000000000), // 1.0
+                ],
+                21,
+                11,
+            ),
         )?;
 
         Ok(())
@@ -3337,8 +3397,16 @@ mod tests {
         let a = DictionaryArray::try_new(&keys, &decimal_array)?;
 
         let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
-        let decimal_array =
-            create_decimal_array(&[Some(61), None, Some(61), Some(62)], 10, 0);
+        let decimal_array = create_decimal_array(
+            &[
+                Some(6150000000000),
+                None,
+                Some(6100000000000),
+                Some(6200000000000),
+            ],
+            21,
+            11,
+        );
         let expected = DictionaryArray::try_new(&keys, &decimal_array)?;
 
         apply_arithmetic_scalar(
@@ -4715,38 +4783,47 @@ mod tests {
         Ok(())
     }
 
-    fn apply_arithmetic_op(
+    fn apply_decimal_arithmetic_op(
         schema: &SchemaRef,
         left: &ArrayRef,
         right: &ArrayRef,
         op: Operator,
         expected: ArrayRef,
     ) -> Result<()> {
-        let op_type = coerce_types(left.data_type(), &op, right.data_type())?;
-        let left_expr = if left.data_type().eq(&op_type) {
-            col("a", schema)?
+        let (lhs_op_type, rhs_op_type) =
+            math_decimal_coercion(left.data_type(), right.data_type());
+
+        let (left_expr, lhs_type) = if let Some(lhs_op_type) = lhs_op_type {
+            (
+                try_cast(col("a", schema)?, schema, lhs_op_type.clone())?,
+                lhs_op_type,
+            )
         } else {
-            try_cast(col("a", schema)?, schema, op_type.clone())?
+            (col("a", schema)?, left.data_type().clone())
         };
 
-        let right_expr = if right.data_type().eq(&op_type) {
-            col("b", schema)?
+        let (right_expr, rhs_type) = if let Some(rhs_op_type) = rhs_op_type {
+            (
+                try_cast(col("b", schema)?, schema, rhs_op_type.clone())?,
+                rhs_op_type,
+            )
         } else {
-            try_cast(col("b", schema)?, schema, op_type.clone())?
+            (col("b", schema)?, right.data_type().clone())
         };
 
         let coerced_schema = Schema::new(vec![
             Field::new(
                 schema.field(0).name(),
-                op_type.clone(),
+                lhs_type,
                 schema.field(0).is_nullable(),
             ),
             Field::new(
                 schema.field(1).name(),
-                op_type,
+                rhs_type,
                 schema.field(1).is_nullable(),
             ),
         ]);
+
         let arithmetic_op = binary_simple(left_expr, op, right_expr, &coerced_schema);
         let data: Vec<ArrayRef> = vec![left.clone(), right.clone()];
         let batch = RecordBatch::try_new(schema.clone(), data)?;
@@ -4786,7 +4863,7 @@ mod tests {
             13,
             2,
         )) as ArrayRef;
-        apply_arithmetic_op(
+        apply_decimal_arithmetic_op(
             &schema,
             &int32_array,
             &decimal_array,
@@ -4797,18 +4874,18 @@ mod tests {
 
         // subtract: decimal array subtract int32 array
         let schema = Arc::new(Schema::new(vec![
-            Field::new("b", DataType::Int32, true),
             Field::new("a", DataType::Decimal128(10, 2), true),
+            Field::new("b", DataType::Int32, true),
         ]));
         let expect = Arc::new(create_decimal_array(
             &[Some(-12177), None, Some(-12178), Some(-12276)],
             13,
             2,
         )) as ArrayRef;
-        apply_arithmetic_op(
+        apply_decimal_arithmetic_op(
             &schema,
-            &int32_array,
             &decimal_array,
+            &int32_array,
             Operator::Minus,
             expect,
         )
@@ -4820,10 +4897,10 @@ mod tests {
             21,
             2,
         )) as ArrayRef;
-        apply_arithmetic_op(
+        apply_decimal_arithmetic_op(
             &schema,
-            &int32_array,
             &decimal_array,
+            &int32_array,
             Operator::Multiply,
             expect,
         )
@@ -4844,7 +4921,7 @@ mod tests {
             23,
             11,
         )) as ArrayRef;
-        apply_arithmetic_op(
+        apply_decimal_arithmetic_op(
             &schema,
             &int32_array,
             &decimal_array,
@@ -4863,7 +4940,7 @@ mod tests {
             10,
             2,
         )) as ArrayRef;
-        apply_arithmetic_op(
+        apply_decimal_arithmetic_op(
             &schema,
             &int32_array,
             &decimal_array,
@@ -4906,7 +4983,7 @@ mod tests {
             Some(124.22),
             Some(125.24),
         ])) as ArrayRef;
-        apply_arithmetic_op(
+        apply_decimal_arithmetic_op(
             &schema,
             &float64_array,
             &decimal_array,
@@ -4926,7 +5003,7 @@ mod tests {
             Some(121.78),
             Some(122.76),
         ])) as ArrayRef;
-        apply_arithmetic_op(
+        apply_decimal_arithmetic_op(
             &schema,
             &float64_array,
             &decimal_array,
@@ -4942,7 +5019,7 @@ mod tests {
             Some(150.06),
             Some(153.76),
         ])) as ArrayRef;
-        apply_arithmetic_op(
+        apply_decimal_arithmetic_op(
             &schema,
             &float64_array,
             &decimal_array,
@@ -4962,7 +5039,7 @@ mod tests {
             Some(100.81967213114754),
             Some(100.0),
         ])) as ArrayRef;
-        apply_arithmetic_op(
+        apply_decimal_arithmetic_op(
             &schema,
             &float64_array,
             &decimal_array,
@@ -4982,7 +5059,7 @@ mod tests {
             Some(1.0000000000000027),
             Some(8.881784197001252e-16),
         ])) as ArrayRef;
-        apply_arithmetic_op(
+        apply_decimal_arithmetic_op(
             &schema,
             &float64_array,
             &decimal_array,
@@ -5025,7 +5102,11 @@ mod tests {
             schema,
             vec![left_decimal_array, right_decimal_array],
             Operator::Divide,
-            create_decimal_array(&[Some(123456700), None], 25, 3),
+            create_decimal_array(
+                &[Some(12345670000000000000000000000000000), None],
+                38,
+                29,
+            ),
         )?;
 
         Ok(())
diff --git a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
index 836ea93450..8998738ba5 100644
--- a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
+++ b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
@@ -28,6 +28,8 @@ use arrow::{array::*, datatypes::ArrowNumericType, downcast_dictionary_array};
 use arrow_schema::DataType;
 use datafusion_common::cast::as_decimal128_array;
 use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::type_coercion::binary::decimal_op_mathematics_type;
+use datafusion_expr::Operator;
 use std::sync::Arc;
 
 // Simple (low performance) kernels until optimized kernels are added to arrow
@@ -257,14 +259,22 @@ pub(crate) fn is_not_distinct_from_decimal(
         .collect())
 }
 
-pub(crate) fn add_dyn_decimal(left: &dyn Array, right: &dyn Array) -> Result<ArrayRef> {
-    let (precision, scale) = get_precision_scale(left)?;
+pub(crate) fn add_dyn_decimal(
+    left: &dyn Array,
+    right: &dyn Array,
+    result_type: &DataType,
+) -> Result<ArrayRef> {
+    let (precision, scale) = get_precision_scale(result_type)?;
     let array = add_dyn(left, right)?;
     decimal_array_with_precision_scale(array, precision, scale)
 }
 
-pub(crate) fn add_decimal_dyn_scalar(left: &dyn Array, right: i128) -> Result<ArrayRef> {
-    let (precision, scale) = get_precision_scale(left)?;
+pub(crate) fn add_decimal_dyn_scalar(
+    left: &dyn Array,
+    right: i128,
+    result_type: &DataType,
+) -> Result<ArrayRef> {
+    let (precision, scale) = get_precision_scale(result_type)?;
 
     let array = add_scalar_dyn::<Decimal128Type>(left, right)?;
     decimal_array_with_precision_scale(array, precision, scale)
@@ -273,15 +283,16 @@ pub(crate) fn add_decimal_dyn_scalar(left: &dyn Array, right: i128) -> Result<Ar
 pub(crate) fn subtract_decimal_dyn_scalar(
     left: &dyn Array,
     right: i128,
+    result_type: &DataType,
 ) -> Result<ArrayRef> {
-    let (precision, scale) = get_precision_scale(left)?;
+    let (precision, scale) = get_precision_scale(result_type)?;
 
     let array = subtract_scalar_dyn::<Decimal128Type>(left, right)?;
     decimal_array_with_precision_scale(array, precision, scale)
 }
 
-fn get_precision_scale(left: &dyn Array) -> Result<(u8, i8)> {
-    match left.data_type() {
+fn get_precision_scale(data_type: &DataType) -> Result<(u8, i8)> {
+    match data_type {
         DataType::Decimal128(precision, scale) => Ok((*precision, *scale)),
         DataType::Dictionary(_, value_type) => match value_type.as_ref() {
             DataType::Decimal128(precision, scale) => Ok((*precision, *scale)),
@@ -333,22 +344,35 @@ fn decimal_array_with_precision_scale(
 pub(crate) fn multiply_decimal_dyn_scalar(
     left: &dyn Array,
     right: i128,
+    result_type: &DataType,
 ) -> Result<ArrayRef> {
-    let (precision, scale) = get_precision_scale(left)?;
+    let (precision, scale) = get_precision_scale(result_type)?;
 
-    let array = multiply_scalar_dyn::<Decimal128Type>(left, right)?;
+    let op_type = decimal_op_mathematics_type(
+        &Operator::Multiply,
+        left.data_type(),
+        left.data_type(),
+    )
+    .unwrap();
+    let (_, op_scale) = get_precision_scale(&op_type)?;
 
-    let divide = 10_i128.pow(scale as u32);
-    let array = divide_scalar_dyn::<Decimal128Type>(&array, divide)?;
+    let array = multiply_scalar_dyn::<Decimal128Type>(left, right)?;
 
-    decimal_array_with_precision_scale(array, precision, scale)
+    if op_scale > scale {
+        let div = 10_i128.pow((op_scale - scale) as u32);
+        let array = divide_scalar_dyn::<Decimal128Type>(&array, div)?;
+        decimal_array_with_precision_scale(array, precision, scale)
+    } else {
+        decimal_array_with_precision_scale(array, precision, scale)
+    }
 }
 
 pub(crate) fn divide_decimal_dyn_scalar(
     left: &dyn Array,
     right: i128,
+    result_type: &DataType,
 ) -> Result<ArrayRef> {
-    let (precision, scale) = get_precision_scale(left)?;
+    let (precision, scale) = get_precision_scale(result_type)?;
 
     let mul = 10_i128.pow(scale as u32);
     let array = multiply_scalar_dyn::<Decimal128Type>(left, mul)?;
@@ -360,8 +384,9 @@ pub(crate) fn divide_decimal_dyn_scalar(
 pub(crate) fn subtract_dyn_decimal(
     left: &dyn Array,
     right: &dyn Array,
+    result_type: &DataType,
 ) -> Result<ArrayRef> {
-    let (precision, scale) = get_precision_scale(left)?;
+    let (precision, scale) = get_precision_scale(result_type)?;
     let array = subtract_dyn(left, right)?;
     decimal_array_with_precision_scale(array, precision, scale)
 }
@@ -369,24 +394,41 @@ pub(crate) fn subtract_dyn_decimal(
 pub(crate) fn multiply_dyn_decimal(
     left: &dyn Array,
     right: &dyn Array,
+    result_type: &DataType,
 ) -> Result<ArrayRef> {
-    let (precision, scale) = get_precision_scale(left)?;
+    let (precision, scale) = get_precision_scale(result_type)?;
+
+    let op_type = decimal_op_mathematics_type(
+        &Operator::Multiply,
+        left.data_type(),
+        left.data_type(),
+    )
+    .unwrap();
+    let (_, op_scale) = get_precision_scale(&op_type)?;
 
-    let divide = 10_i128.pow(scale as u32);
     let array = multiply_dyn(left, right)?;
-    let array = divide_scalar_dyn::<Decimal128Type>(&array, divide)?;
-    decimal_array_with_precision_scale(array, precision, scale)
+    if op_scale > scale {
+        let div = 10_i128.pow((op_scale - scale) as u32);
+        let array = divide_scalar_dyn::<Decimal128Type>(&array, div)?;
+        decimal_array_with_precision_scale(array, precision, scale)
+    } else {
+        decimal_array_with_precision_scale(array, precision, scale)
+    }
 }
 
 pub(crate) fn divide_dyn_opt_decimal(
     left: &dyn Array,
     right: &dyn Array,
+    result_type: &DataType,
 ) -> Result<ArrayRef> {
-    let (precision, scale) = get_precision_scale(left)?;
+    let (precision, scale) = get_precision_scale(result_type)?;
 
     let mul = 10_i128.pow(scale as u32);
     let array = multiply_scalar_dyn::<Decimal128Type>(left, mul)?;
-    let array = decimal_array_with_precision_scale(array, precision, scale)?;
+
+    // Restore to original precision and scale (metadata only)
+    let (org_precision, org_scale) = get_precision_scale(right.data_type())?;
+    let array = decimal_array_with_precision_scale(array, org_precision, org_scale)?;
     let array = divide_dyn_opt(&array, right)?;
     decimal_array_with_precision_scale(array, precision, scale)
 }
@@ -394,8 +436,9 @@ pub(crate) fn divide_dyn_opt_decimal(
 pub(crate) fn modulus_dyn_decimal(
     left: &dyn Array,
     right: &dyn Array,
+    result_type: &DataType,
 ) -> Result<ArrayRef> {
-    let (precision, scale) = get_precision_scale(left)?;
+    let (precision, scale) = get_precision_scale(result_type)?;
     let array = modulus_dyn(left, right)?;
     decimal_array_with_precision_scale(array, precision, scale)
 }
@@ -403,8 +446,9 @@ pub(crate) fn modulus_dyn_decimal(
 pub(crate) fn modulus_decimal_dyn_scalar(
     left: &dyn Array,
     right: i128,
+    result_type: &DataType,
 ) -> Result<ArrayRef> {
-    let (precision, scale) = get_precision_scale(left)?;
+    let (precision, scale) = get_precision_scale(result_type)?;
 
     let array = modulus_scalar_dyn::<Decimal128Type>(left, right)?;
     decimal_array_with_precision_scale(array, precision, scale)
@@ -503,36 +547,71 @@ mod tests {
             3,
         );
         // add
-        let result = add_dyn_decimal(&left_decimal_array, &right_decimal_array)?;
+        let result_type = decimal_op_mathematics_type(
+            &Operator::Plus,
+            left_decimal_array.data_type(),
+            right_decimal_array.data_type(),
+        )
+        .unwrap();
+        let result =
+            add_dyn_decimal(&left_decimal_array, &right_decimal_array, &result_type)?;
         let result = as_decimal128_array(&result)?;
         let expect =
-            create_decimal_array(&[Some(246), None, Some(245), Some(247)], 25, 3);
+            create_decimal_array(&[Some(246), None, Some(245), Some(247)], 26, 3);
         assert_eq!(&expect, result);
-        let result = add_decimal_dyn_scalar(&left_decimal_array, 10)?;
+        let result = add_decimal_dyn_scalar(&left_decimal_array, 10, &result_type)?;
         let result = as_decimal128_array(&result)?;
         let expect =
-            create_decimal_array(&[Some(133), None, Some(132), Some(134)], 25, 3);
+            create_decimal_array(&[Some(133), None, Some(132), Some(134)], 26, 3);
         assert_eq!(&expect, result);
         // subtract
-        let result = subtract_dyn_decimal(&left_decimal_array, &right_decimal_array)?;
+        let result_type = decimal_op_mathematics_type(
+            &Operator::Minus,
+            left_decimal_array.data_type(),
+            right_decimal_array.data_type(),
+        )
+        .unwrap();
+        let result = subtract_dyn_decimal(
+            &left_decimal_array,
+            &right_decimal_array,
+            &result_type,
+        )?;
         let result = as_decimal128_array(&result)?;
-        let expect = create_decimal_array(&[Some(0), None, Some(-1), Some(1)], 25, 3);
+        let expect = create_decimal_array(&[Some(0), None, Some(-1), Some(1)], 26, 3);
         assert_eq!(&expect, result);
-        let result = subtract_decimal_dyn_scalar(&left_decimal_array, 10)?;
+        let result = subtract_decimal_dyn_scalar(&left_decimal_array, 10, &result_type)?;
         let result = as_decimal128_array(&result)?;
         let expect =
-            create_decimal_array(&[Some(113), None, Some(112), Some(114)], 25, 3);
+            create_decimal_array(&[Some(113), None, Some(112), Some(114)], 26, 3);
         assert_eq!(&expect, result);
         // multiply
-        let result = multiply_dyn_decimal(&left_decimal_array, &right_decimal_array)?;
+        let result_type = decimal_op_mathematics_type(
+            &Operator::Multiply,
+            left_decimal_array.data_type(),
+            right_decimal_array.data_type(),
+        )
+        .unwrap();
+        let result = multiply_dyn_decimal(
+            &left_decimal_array,
+            &right_decimal_array,
+            &result_type,
+        )?;
         let result = as_decimal128_array(&result)?;
-        let expect = create_decimal_array(&[Some(15), None, Some(15), Some(15)], 25, 3);
+        let expect =
+            create_decimal_array(&[Some(15129), None, Some(15006), Some(15252)], 38, 6);
         assert_eq!(&expect, result);
-        let result = multiply_decimal_dyn_scalar(&left_decimal_array, 10)?;
+        let result = multiply_decimal_dyn_scalar(&left_decimal_array, 10, &result_type)?;
         let result = as_decimal128_array(&result)?;
-        let expect = create_decimal_array(&[Some(1), None, Some(1), Some(1)], 25, 3);
+        let expect =
+            create_decimal_array(&[Some(1230), None, Some(1220), Some(1240)], 38, 6);
         assert_eq!(&expect, result);
         // divide
+        let result_type = decimal_op_mathematics_type(
+            &Operator::Divide,
+            left_decimal_array.data_type(),
+            right_decimal_array.data_type(),
+        )
+        .unwrap();
         let left_decimal_array = create_decimal_array(
             &[
                 Some(1234567),
@@ -549,34 +628,52 @@ mod tests {
             25,
             3,
         );
-        let result = divide_dyn_opt_decimal(&left_decimal_array, &right_decimal_array)?;
+        let result = divide_dyn_opt_decimal(
+            &left_decimal_array,
+            &right_decimal_array,
+            &result_type,
+        )?;
         let result = as_decimal128_array(&result)?;
         let expect = create_decimal_array(
-            &[Some(123456700), None, Some(22446672), Some(-10037130), None],
-            25,
-            3,
+            &[
+                Some(12345670000000000000000000000000000),
+                None,
+                Some(2244667272727272727272727272727272),
+                Some(-1003713008130081300813008130081300),
+                None,
+            ],
+            38,
+            29,
         );
         assert_eq!(&expect, result);
-        let result = divide_decimal_dyn_scalar(&left_decimal_array, 10)?;
+        let result = divide_decimal_dyn_scalar(&left_decimal_array, 10, &result_type)?;
         let result = as_decimal128_array(&result)?;
         let expect = create_decimal_array(
             &[
-                Some(123456700),
+                Some(12345670000000000000000000000000000),
                 None,
-                Some(123456700),
-                Some(123456700),
-                Some(123456700),
+                Some(12345670000000000000000000000000000),
+                Some(12345670000000000000000000000000000),
+                Some(12345670000000000000000000000000000),
             ],
-            25,
-            3,
+            38,
+            29,
         );
         assert_eq!(&expect, result);
-        let result = modulus_dyn_decimal(&left_decimal_array, &right_decimal_array)?;
+        // modulus
+        let result_type = decimal_op_mathematics_type(
+            &Operator::Modulo,
+            left_decimal_array.data_type(),
+            right_decimal_array.data_type(),
+        )
+        .unwrap();
+        let result =
+            modulus_dyn_decimal(&left_decimal_array, &right_decimal_array, &result_type)?;
         let result = as_decimal128_array(&result)?;
         let expect =
             create_decimal_array(&[Some(7), None, Some(37), Some(16), None], 25, 3);
         assert_eq!(&expect, result);
-        let result = modulus_decimal_dyn_scalar(&left_decimal_array, 10)?;
+        let result = modulus_decimal_dyn_scalar(&left_decimal_array, 10, &result_type)?;
         let result = as_decimal128_array(&result)?;
         let expect =
             create_decimal_array(&[Some(7), None, Some(7), Some(7), Some(7)], 25, 3);
@@ -590,12 +687,27 @@ mod tests {
         let left_decimal_array = create_decimal_array(&[Some(101)], 10, 1);
         let right_decimal_array = create_decimal_array(&[Some(0)], 1, 1);
 
-        let err = divide_decimal_dyn_scalar(&left_decimal_array, 0).unwrap_err();
+        let result_type = decimal_op_mathematics_type(
+            &Operator::Divide,
+            left_decimal_array.data_type(),
+            right_decimal_array.data_type(),
+        )
+        .unwrap();
+        let err =
+            divide_decimal_dyn_scalar(&left_decimal_array, 0, &result_type).unwrap_err();
         assert_eq!("Arrow error: Divide by zero error", err.to_string());
+        let result_type = decimal_op_mathematics_type(
+            &Operator::Modulo,
+            left_decimal_array.data_type(),
+            right_decimal_array.data_type(),
+        )
+        .unwrap();
         let err =
-            modulus_dyn_decimal(&left_decimal_array, &right_decimal_array).unwrap_err();
+            modulus_dyn_decimal(&left_decimal_array, &right_decimal_array, &result_type)
+                .unwrap_err();
         assert_eq!("Arrow error: Divide by zero error", err.to_string());
-        let err = modulus_decimal_dyn_scalar(&left_decimal_array, 0).unwrap_err();
+        let err =
+            modulus_decimal_dyn_scalar(&left_decimal_array, 0, &result_type).unwrap_err();
         assert_eq!("Arrow error: Divide by zero error", err.to_string());
     }