You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/10/31 12:33:09 UTC

[PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

mustafasrepo opened a new pull request, #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006

   ## Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #.
   
   ## Rationale for this change
   In the existing code base we have two different methods to keep track of equivalences in the `Arc<dyn PhysicalPlan>`.
   Which are `equivalence_properties` and `ordering_equivalence_properties`. 
   
   As a background `EquivalenceProperties` keeps track of equivalent columns such as `a=b=c`, `e=f`.
   `OrderingEquivalenceProperties` alternative orderings that table satisfies. such `[a ASC, b ASC]`, and `[c ASC, d ASC]`.
   `OrderingEquivalenceProperties` keeps track of constant expressions also (e.g expression that are known to have a constant value. This can arise after filter, join, etc.).
   
   Inherently, this information is coupled, as an example 
   Assume that 
   - existing table satisfies following orderings `[a ASC, b ASC]` and `[c ASC, d ASC]`.
   - table have equivalent columns `a=e`.
   - It is known that `f` is constant.
   
   If an operator requires ordering at its input `[e ASC, f ASC, b ASC]`. During the analysis for whether this requirement is satisfied by existing ordering, we need to consider all orderings, equivalences, and constants at the same time.
   
   Following naive algorithm can be followed for this analysis (Please note that algorithm in this PR is more elaborate.)
   - Remove constant expressions from the requirement (This converts requirement `[e ASC, f ASC, b ASC]` to `[e ASC, b ASC]`)
   - Rewrite requirement such that it uses only representative expression for each distinct equivalent group (This converts requirement `[e ASC, b ASC]` to `[a ASC, b ASC]`). 
   - Apply same procedures above each of the orderings inside the `OrderingEquivalences` (This converts ordering `[a ASC, b ASC]` to `[a ASC, b ASC]` and `[c ASC, d ASC]` to `[c ASC, d ASC]` no change ). 
   - Check whether normalized requirement `[a ASC, b ASC]` is satisfied by any of normalized orderings `[a ASC, b ASC]`, `[c ASC, d ASC]`.
   
   As can be seen from the example above. Keeping track of these information separately, is a bit cumbersome. 
   
   Also for the user implementing new functionality is a bit hard, and existing APIs are a bit involved also. Such as `ordering_satisfy`, `requirements_compatible`, etc.
   
   I think it is better to unify these information in a single `struct` so that
   - We can expose better, and more friendly `API`s from struct.
   - Move utils, functions, to method calls
   - Keep the invariants in the state (not relying on correct arguments). 
   - All of the implementations, algorithms resides in a single place, and logic is not scatterred in different files.
   
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   ## What changes are included in this PR?
   
   This PR unifies `EquivalenceProperties` and `OrderingEquivalenceProperties` to single struct called `EquivalenceProperties` (equivalence now involves, exact equivalence,  ordering equivalence, constants).
   And all of the implementation that depend on this information is moved to method calls (such as `projection`, `ordering_satisfy`, etc.)
   
   - Additional tests to show that better plans can be produced with new design. (As an example `ordering_satisfy` no longer  just depends on single ordering, which is output ordering. Bu considers all of the valid orderings for the table. This enables additional optimizations. See new tests under `window.slt` as an example)
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   ## Are these changes tested?
   Yes new tests are added.
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   ## Are there any user-facing changes?
   `api change` `ordering_equivalence_properties` is removed from the `ExecutionPlan` and now `equivalence_properties` contains additional information.
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1379670001


##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -226,17 +228,78 @@ pub fn physical_exprs_contains(
         .any(|physical_expr| physical_expr.eq(expr))
 }
 
+/// Checks whether the given slices have any common entries.
+pub fn have_common_entries(
+    lhs: &[Arc<dyn PhysicalExpr>],
+    rhs: &[Arc<dyn PhysicalExpr>],
+) -> bool {
+    lhs.iter().any(|expr| physical_exprs_contains(rhs, expr))
+}
+
+/// Checks whether the given physical expression slices are equal.
+pub fn physical_exprs_equal(
+    lhs: &[Arc<dyn PhysicalExpr>],
+    rhs: &[Arc<dyn PhysicalExpr>],
+) -> bool {
+    lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
+}
+
+/// Checks whether the given physical expression slices are equal in the sense
+/// of bags (multi-sets), disregarding their orderings.
+pub fn physical_exprs_bag_equal(
+    lhs: &[Arc<dyn PhysicalExpr>],
+    rhs: &[Arc<dyn PhysicalExpr>],
+) -> bool {
+    // TODO: Once we can use `HashMap`s with `Arc<dyn PhysicalExpr>`, this
+    //       function should use a `HashMap` to reduce computational complexity.
+    if lhs.len() == rhs.len() {
+        let mut rhs_vec = rhs.to_vec();
+        for expr in lhs {
+            if let Some(idx) = rhs_vec.iter().position(|e| expr.eq(e)) {
+                rhs_vec.swap_remove(idx);
+            } else {
+                return false;
+            }
+        }
+        true
+    } else {
+        false
+    }
+}
+
+/// This utility function removes duplicates from the given `exprs` vector.
+/// Note that this function does not necessarily preserve its input ordering.
+pub fn deduplicate_physical_exprs(exprs: &mut Vec<Arc<dyn PhysicalExpr>>) {
+    // TODO: Once we can use `HashSet`s with `Arc<dyn PhysicalExpr>`, this

Review Comment:
   I have opened the issue [#8027](https://github.com/apache/arrow-datafusion/issues/8027) to track this support



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#issuecomment-1792707729

   Let's merge this to keep the code flowing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1377573175


##########
datafusion/sqllogictest/test_files/window.slt:
##########
@@ -2338,10 +2335,11 @@ Limit: skip=0, fetch=5
 ----------TableScan: aggregate_test_100 projection=[c9]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
---ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
-----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-------SortExec: expr=[c9@0 DESC]
---------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
+--SortExec: TopK(fetch=5), expr=[rn1@1 ASC NULLS LAST,c9@0 ASC NULLS LAST]

Review Comment:
   This sort is theoretically unnecessary because `rn1` is unique. However, our previous code did not remove it for this reason -- we actually removed it due to a bug and simply lucked out. 
   
   In a follow-on PR, we plan to track uniqueness properties in this struct too so we will remove it again -- this time for the right reasons.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1380494221


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -15,62 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::equivalence::{EquivalenceProperties, OrderingEquivalenceProperties};
-use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
-use crate::sort_properties::{ExprOrdering, SortProperties};
-use crate::update_ordering;
-use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement};
+use std::borrow::Borrow;
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use crate::expressions::{BinaryExpr, Column};
+use crate::{PhysicalExpr, PhysicalSortExpr};
 
 use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData};
 use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
 use arrow::datatypes::SchemaRef;
-use arrow_schema::SortOptions;
 use datafusion_common::tree_node::{
     Transformed, TreeNode, TreeNodeRewriter, VisitRecursion,
 };
-use datafusion_common::utils::longest_consecutive_prefix;
 use datafusion_common::Result;
 use datafusion_expr::Operator;
 
 use itertools::Itertools;
 use petgraph::graph::NodeIndex;
 use petgraph::stable_graph::StableGraph;
-use std::borrow::Borrow;
-use std::collections::HashMap;
-use std::collections::HashSet;
-use std::sync::Arc;
-
-/// Compare the two expr lists are equal no matter the order.
-/// For example two InListExpr can be considered to be equals no matter the order:
-///
-/// In('a','b','c') == In('c','b','a')
-pub fn expr_list_eq_any_order(

Review Comment:
   Agreed, we sent most of the drive by cleanups/refactors we did as separate small PRs, but this must have slipped through the cracks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1378897737


##########
datafusion/core/src/physical_optimizer/sort_pushdown.rs:
##########
@@ -298,34 +302,34 @@ fn pushdown_requirement_to_children(
 /// If the the parent requirements are more specific, push down the parent requirements
 /// If they are not compatible, need to add Sort.
 fn determine_children_requirement(
-    parent_required: Option<&[PhysicalSortRequirement]>,
-    request_child: Option<&[PhysicalSortRequirement]>,
+    parent_required: LexRequirementRef,

Review Comment:
   I almost wonder if `LexRequirement` / `LexRequirementRef` can eventually get the same treatment (encapsulate in a struct) -- definitely not for this PR, though ;)



##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -226,17 +228,78 @@ pub fn physical_exprs_contains(
         .any(|physical_expr| physical_expr.eq(expr))
 }
 
+/// Checks whether the given slices have any common entries.
+pub fn have_common_entries(
+    lhs: &[Arc<dyn PhysicalExpr>],
+    rhs: &[Arc<dyn PhysicalExpr>],
+) -> bool {
+    lhs.iter().any(|expr| physical_exprs_contains(rhs, expr))
+}
+
+/// Checks whether the given physical expression slices are equal.
+pub fn physical_exprs_equal(
+    lhs: &[Arc<dyn PhysicalExpr>],
+    rhs: &[Arc<dyn PhysicalExpr>],
+) -> bool {
+    lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
+}
+
+/// Checks whether the given physical expression slices are equal in the sense
+/// of bags (multi-sets), disregarding their orderings.
+pub fn physical_exprs_bag_equal(
+    lhs: &[Arc<dyn PhysicalExpr>],
+    rhs: &[Arc<dyn PhysicalExpr>],
+) -> bool {
+    // TODO: Once we can use `HashMap`s with `Arc<dyn PhysicalExpr>`, this
+    //       function should use a `HashMap` to reduce computational complexity.
+    if lhs.len() == rhs.len() {
+        let mut rhs_vec = rhs.to_vec();
+        for expr in lhs {
+            if let Some(idx) = rhs_vec.iter().position(|e| expr.eq(e)) {
+                rhs_vec.swap_remove(idx);
+            } else {
+                return false;
+            }
+        }
+        true
+    } else {
+        false
+    }
+}
+
+/// This utility function removes duplicates from the given `exprs` vector.
+/// Note that this function does not necessarily preserve its input ordering.
+pub fn deduplicate_physical_exprs(exprs: &mut Vec<Arc<dyn PhysicalExpr>>) {
+    // TODO: Once we can use `HashSet`s with `Arc<dyn PhysicalExpr>`, this

Review Comment:
   Maybe it is worth filing a ticket about this and adding a link to that ticket in the comments



##########
datafusion/core/src/physical_optimizer/sort_pushdown.rs:
##########
@@ -127,29 +124,27 @@ pub(crate) fn pushdown_sorts(
     requirements: SortPushDown,
 ) -> Result<Transformed<SortPushDown>> {
     let plan = &requirements.plan;
-    let parent_required = requirements.required_ordering.as_deref();
-    const ERR_MSG: &str = "Expects parent requirement to contain something";
-    let err = || plan_datafusion_err!("{}", ERR_MSG);
+    let parent_required = requirements.required_ordering.as_deref().unwrap_or(&[]);
     if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
-        let mut new_plan = plan.clone();
-        if !ordering_satisfy_requirement(
-            plan.output_ordering(),
-            parent_required,
-            || plan.equivalence_properties(),
-            || plan.ordering_equivalence_properties(),
-        ) {
+        let new_plan = if !plan

Review Comment:
   This is a really good example of how much nicer the new API is. 👨‍🍳 👌  -- really nice



##########
datafusion/core/src/physical_optimizer/sort_pushdown.rs:
##########
@@ -127,29 +124,27 @@ pub(crate) fn pushdown_sorts(
     requirements: SortPushDown,
 ) -> Result<Transformed<SortPushDown>> {
     let plan = &requirements.plan;
-    let parent_required = requirements.required_ordering.as_deref();
-    const ERR_MSG: &str = "Expects parent requirement to contain something";
-    let err = || plan_datafusion_err!("{}", ERR_MSG);
+    let parent_required = requirements.required_ordering.as_deref().unwrap_or(&[]);
     if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
-        let mut new_plan = plan.clone();
-        if !ordering_satisfy_requirement(
-            plan.output_ordering(),
-            parent_required,
-            || plan.equivalence_properties(),
-            || plan.ordering_equivalence_properties(),
-        ) {
+        let new_plan = if !plan

Review Comment:
   This is a really good example of how much nicer the new API is. 👨‍🍳 👌  -- really nice



##########
datafusion/physical-expr/src/sort_expr.rs:
##########
@@ -77,18 +77,26 @@ impl PhysicalSortExpr {
         })
     }
 
-    /// Check whether sort expression satisfies [`PhysicalSortRequirement`].
-    ///
-    /// If sort options is Some in `PhysicalSortRequirement`, `expr`
-    /// and `options` field are compared for equality.
-    ///
-    /// If sort options is None in `PhysicalSortRequirement`, only
-    /// `expr` is compared for equality.
-    pub fn satisfy(&self, requirement: &PhysicalSortRequirement) -> bool {
+    /// Checks whether this sort expression satisfies the given `requirement`.
+    /// If sort options are unspecified in `requirement`, only expressions are
+    /// compared for inequality.
+    pub fn satisfy(
+        &self,
+        requirement: &PhysicalSortRequirement,
+        schema: &Schema,
+    ) -> bool {
+        // If the column is not nullable, NULLS FIRST/LAST is not important.

Review Comment:
   👍 



##########
datafusion/physical-expr/src/sort_expr.rs:
##########
@@ -77,18 +77,26 @@ impl PhysicalSortExpr {
         })
     }
 
-    /// Check whether sort expression satisfies [`PhysicalSortRequirement`].
-    ///
-    /// If sort options is Some in `PhysicalSortRequirement`, `expr`
-    /// and `options` field are compared for equality.
-    ///
-    /// If sort options is None in `PhysicalSortRequirement`, only
-    /// `expr` is compared for equality.
-    pub fn satisfy(&self, requirement: &PhysicalSortRequirement) -> bool {
+    /// Checks whether this sort expression satisfies the given `requirement`.
+    /// If sort options are unspecified in `requirement`, only expressions are
+    /// compared for inequality.
+    pub fn satisfy(
+        &self,
+        requirement: &PhysicalSortRequirement,
+        schema: &Schema,
+    ) -> bool {
+        // If the column is not nullable, NULLS FIRST/LAST is not important.

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1380559285


##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -60,15 +56,12 @@ pub struct ProjectionExec {
     input: Arc<dyn ExecutionPlan>,
     /// The output ordering
     output_ordering: Option<Vec<PhysicalSortExpr>>,
-    /// The columns map used to normalize out expressions like Partitioning and PhysicalSortExpr
-    /// The key is the column from the input schema and the values are the columns from the output schema
-    columns_map: HashMap<Column, Vec<Column>>,
+    /// The mapping used to normalize expressions like Partitioning and
+    /// PhysicalSortExpr. The key is the expression from the input schema
+    /// and the value is the expression from the output schema.
+    projection_mapping: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>,

Review Comment:
   BTW I made a PR to do this here -- https://github.com/apache/arrow-datafusion/pull/8033 and it worked out well. Once this PR is merged I'll rebase it and get it ready for review



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1380626690


##########
datafusion/physical-plan/src/common.rs:
##########
@@ -373,6 +375,38 @@ pub fn batch_byte_size(batch: &RecordBatch) -> usize {
     batch.get_array_memory_size()
 }
 
+/// Constructs the mapping between a projection's input and output
+pub fn calculate_projection_mapping(

Review Comment:
   FWIW I hacked up a version here and it worked great: https://github.com/apache/arrow-datafusion/pull/8033



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1378552538


##########
datafusion/sqllogictest/test_files/tpch/q17.slt.part:
##########
@@ -58,21 +58,19 @@ ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0 AS Float64) / 7 as av
 --------ProjectionExec: expr=[l_extendedprice@1 as l_extendedprice]
 ----------CoalesceBatchesExec: target_batch_size=8192
 ------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@2, l_partkey@1)], filter=CAST(l_quantity@0 AS Decimal128(30, 15)) < Float64(0.2) * AVG(lineitem.l_quantity)@1
---------------CoalesceBatchesExec: target_batch_size=8192
-----------------RepartitionExec: partitioning=Hash([p_partkey@2], 4), input_partitions=4

Review Comment:
   👍 seems like a minor improvement to the plan



##########
datafusion/sqllogictest/test_files/tpch/q17.slt.part:
##########
@@ -58,21 +58,19 @@ ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0 AS Float64) / 7 as av
 --------ProjectionExec: expr=[l_extendedprice@1 as l_extendedprice]
 ----------CoalesceBatchesExec: target_batch_size=8192
 ------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@2, l_partkey@1)], filter=CAST(l_quantity@0 AS Decimal128(30, 15)) < Float64(0.2) * AVG(lineitem.l_quantity)@1
---------------CoalesceBatchesExec: target_batch_size=8192
-----------------RepartitionExec: partitioning=Hash([p_partkey@2], 4), input_partitions=4

Review Comment:
   👍 seems like a minor improvement to the plan



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#issuecomment-1791024449

   One of the test failures internally looks like the following
   
   The input looks like:
   ```
   2023-11-02T15:58:06.601675Z TRACE log: Optimized physical plan by CombinePartialFinalAggregate:
   OutputRequirementExec
     SortExec: expr=[time@1 ASC NULLS LAST]
       CoalescePartitionsExec
         ProjectionExec: expr=[cpu as iox::measurement, time@0 as time, (selector_last(sum_idle,time)@1).[value] as last, (selector_last(sum_system,time)@2).[value] as last_1]
           AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)], ordering_mode=Sorted
             SortPreservingRepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16, sort_exprs=time@0 ASC NULLS LAST
               AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)]
                 RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
                   SortExec: expr=[time@0 ASC NULLS LAST]
                     CoalescePartitionsExec
                       ProjectionExec: expr=[time@0 as time, SUM(cpu.usage_idle)@1 as sum_idle, SUM(cpu.usage_system)@2 as sum_system]
                         AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)]
                           RepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16
                             AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)]
                               RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
                                 ProjectionExec: expr=[time@1 as time, usage_idle@2 as usage_idle, usage_system@3 as usage_system]
                                   FilterExec: date_bin(10000000000, time@1, 0) <= 1698940686290451000 AND time@1 <= 1698940686290451000 AND cpu@0 = cpu-total
                                     ParquetExec: file_groups={1 group: [[2/8/0649f0e8b1abed092a356ec6181369fcf585431d1cc0694a0cc4ab45cf78b49d/0c5ac9b2-f6d4-4004-9036-15412da47647.parquet]]}, projection=[cpu, time, usage_idle, usage_system], predicate=date_bin(10000000000, time@2, 0) <= 1698940686290451000 AND time@2 <= 1698940686290451000 AND cpu@0 = cpu-total, pruning_predicate=time_min@0 <= 1698940686290451000 AND cpu_min@1 <= cpu-total AND cpu-total <= cpu_max@2
   ```
   
   But then after EnforceSorting the `SortPreservingMergeExec` seems to have to sort exprs anymore:
   ```
   2023-11-02T15:58:06.605925Z TRACE log: Optimized physical plan by EnforceSorting:
   OutputRequirementExec
     SortPreservingMergeExec: [time@1 ASC NULLS LAST] 
       SortExec: expr=[time@1 ASC NULLS LAST]
         ProjectionExec: expr=[cpu as iox::measurement, time@0 as time, (selector_last(sum_idle,time)@1).[value] as last, (selector_last(sum_system,time)@2).[value] as last_1]
           AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)]
       ----> SortPreservingRepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16 
               AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)]
                 RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=16
                   ProjectionExec: expr=[time@0 as time, SUM(cpu.usage_idle)@1 as sum_idle, SUM(cpu.usage_system)@2 as sum_system]
                     AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)]
                       RepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16
                         AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)]
                           RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
                             ProjectionExec: expr=[time@1 as time, usage_idle@2 as usage_idle, usage_system@3 as usage_system]
                               FilterExec: date_bin(10000000000, time@1, 0) <= 1698940686290451000 AND time@1 <= 1698940686290451000 AND cpu@0 = cpu-total
                                 ParquetExec: file_groups={1 group: [[2/8/0649f0e8b1abed092a356ec6181369fcf585431d1cc0694a0cc4ab45cf78b49d/0c5ac9b2-f6d4-4004-9036-15412da47647.parquet]]}, projection=[cpu, time, usage_idle, usage_system], predicate=date_bin(10000000000, time@2, 0) <= 1698940686290451000 AND time@2 <= 1698940686290451000 AND cpu@0 = cpu-total, pruning_predicate=time_min@0 <= 1698940686290451000 AND cpu_min@1 <= cpu-total AND cpu-total <= cpu_max@2
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1380619472


##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -60,15 +56,12 @@ pub struct ProjectionExec {
     input: Arc<dyn ExecutionPlan>,
     /// The output ordering
     output_ordering: Option<Vec<PhysicalSortExpr>>,
-    /// The columns map used to normalize out expressions like Partitioning and PhysicalSortExpr
-    /// The key is the column from the input schema and the values are the columns from the output schema
-    columns_map: HashMap<Column, Vec<Column>>,
+    /// The mapping used to normalize expressions like Partitioning and
+    /// PhysicalSortExpr. The key is the expression from the input schema
+    /// and the value is the expression from the output schema.
+    projection_mapping: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>,

Review Comment:
   Here is a similar one for EquivalenceClass -- https://github.com/apache/arrow-datafusion/pull/8034 that might be more of a conflict magnet so leaving it for a while



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#issuecomment-1794688101

   Partially resolves issue [#8064](https://github.com/apache/arrow-datafusion/issues/8064)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1379381529


##########
datafusion/core/src/physical_optimizer/sort_pushdown.rs:
##########
@@ -298,34 +302,34 @@ fn pushdown_requirement_to_children(
 /// If the the parent requirements are more specific, push down the parent requirements
 /// If they are not compatible, need to add Sort.
 fn determine_children_requirement(
-    parent_required: Option<&[PhysicalSortRequirement]>,
-    request_child: Option<&[PhysicalSortRequirement]>,
+    parent_required: LexRequirementRef,

Review Comment:
   I think this is worthwhile to explore -- let's open an issue so we don't forget



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1379670001


##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -226,17 +228,78 @@ pub fn physical_exprs_contains(
         .any(|physical_expr| physical_expr.eq(expr))
 }
 
+/// Checks whether the given slices have any common entries.
+pub fn have_common_entries(
+    lhs: &[Arc<dyn PhysicalExpr>],
+    rhs: &[Arc<dyn PhysicalExpr>],
+) -> bool {
+    lhs.iter().any(|expr| physical_exprs_contains(rhs, expr))
+}
+
+/// Checks whether the given physical expression slices are equal.
+pub fn physical_exprs_equal(
+    lhs: &[Arc<dyn PhysicalExpr>],
+    rhs: &[Arc<dyn PhysicalExpr>],
+) -> bool {
+    lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
+}
+
+/// Checks whether the given physical expression slices are equal in the sense
+/// of bags (multi-sets), disregarding their orderings.
+pub fn physical_exprs_bag_equal(
+    lhs: &[Arc<dyn PhysicalExpr>],
+    rhs: &[Arc<dyn PhysicalExpr>],
+) -> bool {
+    // TODO: Once we can use `HashMap`s with `Arc<dyn PhysicalExpr>`, this
+    //       function should use a `HashMap` to reduce computational complexity.
+    if lhs.len() == rhs.len() {
+        let mut rhs_vec = rhs.to_vec();
+        for expr in lhs {
+            if let Some(idx) = rhs_vec.iter().position(|e| expr.eq(e)) {
+                rhs_vec.swap_remove(idx);
+            } else {
+                return false;
+            }
+        }
+        true
+    } else {
+        false
+    }
+}
+
+/// This utility function removes duplicates from the given `exprs` vector.
+/// Note that this function does not necessarily preserve its input ordering.
+pub fn deduplicate_physical_exprs(exprs: &mut Vec<Arc<dyn PhysicalExpr>>) {
+    // TODO: Once we can use `HashSet`s with `Arc<dyn PhysicalExpr>`, this

Review Comment:
   I have opened the issue [#8027](https://github.com/apache/arrow-datafusion/issues/8027) to track this support. Also updated comment, now it includes issue link also.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1381203743


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -191,18 +190,6 @@ impl ExecutionPlan for NestedLoopJoinExec {
         distribution_from_join_type(&self.join_type)
     }
 
-    fn equivalence_properties(&self) -> EquivalenceProperties {

Review Comment:
   > Shouldn't this still have `join_equivalent_properties` rather than being removed?
   
   Thanks @alamb, for pointing this out. I have implemented `join_equivalent_properties` for ` nest_loop_join` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#issuecomment-1791156127

   > One of the test failures internally looks like the following
   > 
   > ...
   
   We will investigate tomorrow and include the fix in this PR if it is a quick fix. Thanks for battle-testing :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1379382576


##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -226,17 +228,78 @@ pub fn physical_exprs_contains(
         .any(|physical_expr| physical_expr.eq(expr))
 }
 
+/// Checks whether the given slices have any common entries.
+pub fn have_common_entries(
+    lhs: &[Arc<dyn PhysicalExpr>],
+    rhs: &[Arc<dyn PhysicalExpr>],
+) -> bool {
+    lhs.iter().any(|expr| physical_exprs_contains(rhs, expr))
+}
+
+/// Checks whether the given physical expression slices are equal.
+pub fn physical_exprs_equal(
+    lhs: &[Arc<dyn PhysicalExpr>],
+    rhs: &[Arc<dyn PhysicalExpr>],
+) -> bool {
+    lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
+}
+
+/// Checks whether the given physical expression slices are equal in the sense
+/// of bags (multi-sets), disregarding their orderings.
+pub fn physical_exprs_bag_equal(
+    lhs: &[Arc<dyn PhysicalExpr>],
+    rhs: &[Arc<dyn PhysicalExpr>],
+) -> bool {
+    // TODO: Once we can use `HashMap`s with `Arc<dyn PhysicalExpr>`, this
+    //       function should use a `HashMap` to reduce computational complexity.
+    if lhs.len() == rhs.len() {
+        let mut rhs_vec = rhs.to_vec();
+        for expr in lhs {
+            if let Some(idx) = rhs_vec.iter().position(|e| expr.eq(e)) {
+                rhs_vec.swap_remove(idx);
+            } else {
+                return false;
+            }
+        }
+        true
+    } else {
+        false
+    }
+}
+
+/// This utility function removes duplicates from the given `exprs` vector.
+/// Note that this function does not necessarily preserve its input ordering.
+pub fn deduplicate_physical_exprs(exprs: &mut Vec<Arc<dyn PhysicalExpr>>) {
+    // TODO: Once we can use `HashSet`s with `Arc<dyn PhysicalExpr>`, this

Review Comment:
   Agreed, @mustafasrepo let's open a ticket to track this. I think there are few sister TODOs with basically the same comment, we can track them all via that ticket



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#issuecomment-1787402507

   Thank you @mustafasrepo  and @ozankabak  -- I will review this hopefully sometime today / test it downstream with IOx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#issuecomment-1792625383

   FWIW I reran our internal tests against the code on this branch and they now pass. Thank you @mustafasrepo 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#issuecomment-1792680614

   BTW I still think give the size of this PR we should merge it now and then fix the issue next week. I will file a follow on ticket to serve as a placeholder


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1381205423


##########
datafusion/physical-plan/src/coalesce_partitions.rs:
##########
@@ -100,10 +100,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
         None
     }
 
-    fn equivalence_properties(&self) -> EquivalenceProperties {

Review Comment:
   Thanks @alamb for catching, it. I implemented `CoalescePartitionsExec`, and it improved a test 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#issuecomment-1788965935

   Starting to review this onr


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#issuecomment-1790589800

   Hi @alamb. Did you get any problems testing downstream with IOx? We have some follow-ons to this in the pipeline once this merges


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1380501861


##########
datafusion/physical-plan/src/common.rs:
##########
@@ -373,6 +375,38 @@ pub fn batch_byte_size(batch: &RecordBatch) -> usize {
     batch.get_array_memory_size()
 }
 
+/// Constructs the mapping between a projection's input and output
+pub fn calculate_projection_mapping(

Review Comment:
   For now `ProjectionMapping` is just a type alias, but we plan to promote it to a full-fledged type in a follow on PR -- I agree that it makes sense to convert this into a method then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1378621424


##########
datafusion/sqllogictest/test_files/tpch/q17.slt.part:
##########
@@ -58,21 +58,19 @@ ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0 AS Float64) / 7 as av
 --------ProjectionExec: expr=[l_extendedprice@1 as l_extendedprice]
 ----------CoalesceBatchesExec: target_batch_size=8192
 ------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@2, l_partkey@1)], filter=CAST(l_quantity@0 AS Decimal128(30, 15)) < Float64(0.2) * AVG(lineitem.l_quantity)@1
---------------CoalesceBatchesExec: target_batch_size=8192
-----------------RepartitionExec: partitioning=Hash([p_partkey@2], 4), input_partitions=4

Review Comment:
   Yes, we optimize away one hash repartition, because we know that existing hashing satisfies requirement (existing and requirement have equal expressions.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#issuecomment-1790826703

   >  Hi @alamb. Did you get any problems testing downstream with IOx? We have some follow-ons to this in the pipeline once this merges
   
   I have two test failures downstream that I am looking into. I don't know if they are problems with this PR or something pre-existing that it exposed
   
   Getting this PR reviewed / tested in my highest priority today. I will have an update in a few hours


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1380508127


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -191,18 +190,6 @@ impl ExecutionPlan for NestedLoopJoinExec {
         distribution_from_join_type(&self.join_type)
     }
 
-    fn equivalence_properties(&self) -> EquivalenceProperties {

Review Comment:
   Hmm, seems like it. We will discuss tomorrow and update if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1380428109


##########
datafusion/physical-plan/src/aggregates/order/mod.rs:
##########
@@ -18,13 +18,12 @@
 use arrow_array::ArrayRef;
 use arrow_schema::Schema;
 use datafusion_common::Result;
-use datafusion_physical_expr::EmitTo;
-
-use super::{AggregationOrdering, GroupByOrderMode};
+use datafusion_physical_expr::{EmitTo, PhysicalSortExpr};
 
 mod full;
 mod partial;
 
+use crate::windows::PartitionSearchMode;

Review Comment:
   This change effectively removes the duplication between `GroupByOrderMode` and `PartitionSearchMode` that represent the same thing right?
   
   I find `PartitionSearchMode` a confusing name as the term "partition" is pretty overloaded already (like each ExecutionPlan has input/output partitions, and WindowExec deals with partitions). 
   
   Also the fact it is in the `windows` module seems to be a mismatch given it is now used in the Aggregation logic
   
   Maybe it could go into its own module 🤔 and be called `SortOrderMode` or something
   
   Also, can we clarify what `PartitionSearchMode::PartiallySorted` means -- specifically, does it represent a prefix of the columns that is sorted?
   
   This could all be done as follow on PRs



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -331,16 +333,14 @@ impl GroupedHashAggregateStream {
         let reservation = MemoryConsumer::new(name)
             .with_can_spill(true)
             .register(context.memory_pool());
-
-        let group_ordering = agg
-            .aggregation_ordering
-            .as_ref()
-            .map(|aggregation_ordering| {
-                GroupOrdering::try_new(&group_schema, aggregation_ordering)
-            })
-            // return error if any
-            .transpose()?
-            .unwrap_or(GroupOrdering::None);
+        let (ordering, _) = agg

Review Comment:
   👌  very nice



##########
datafusion/physical-plan/src/coalesce_partitions.rs:
##########
@@ -100,10 +100,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
         None
     }
 
-    fn equivalence_properties(&self) -> EquivalenceProperties {

Review Comment:
   It seems like the equivalence properties would not be changed by `CoalescePartitionsExec` (sort order would , of course)



##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -100,63 +93,20 @@ impl ProjectionExec {
             input_schema.metadata().clone(),
         ));
 
-        // construct a map from the input columns to the output columns of the Projection
-        let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
-        for (expr_idx, (expression, name)) in expr.iter().enumerate() {
-            if let Some(column) = expression.as_any().downcast_ref::<Column>() {
-                // For some executors, logical and physical plan schema fields
-                // are not the same. The information in a `Column` comes from
-                // the logical plan schema. Therefore, to produce correct results
-                // we use the field in the input schema with the same index. This
-                // corresponds to the physical plan `Column`.
-                let idx = column.index();
-                let matching_input_field = input_schema.field(idx);
-                let matching_input_column = Column::new(matching_input_field.name(), idx);
-                let entry = columns_map.entry(matching_input_column).or_default();
-                entry.push(Column::new(name, expr_idx));
-            };
-        }
-
-        // Output Ordering need to respect the alias
-        let child_output_ordering = input.output_ordering();
-        let output_ordering = match child_output_ordering {
-            Some(sort_exprs) => {
-                let normalized_exprs = sort_exprs
-                    .iter()
-                    .map(|sort_expr| {
-                        let expr = normalize_out_expr_with_columns_map(
-                            sort_expr.expr.clone(),
-                            &columns_map,
-                        );
-                        PhysicalSortExpr {
-                            expr,
-                            options: sort_expr.options,
-                        }
-                    })
-                    .collect::<Vec<_>>();
-                Some(normalized_exprs)
-            }
-            None => None,
-        };
-
-        let orderings = find_orderings_of_exprs(
-            &expr,
-            input.output_ordering(),
-            input.equivalence_properties(),
-            input.ordering_equivalence_properties(),
-        )?;
+        // construct a map from the input expressions to the output expression of the Projection
+        let projection_mapping = calculate_projection_mapping(&expr, &input_schema)?;

Review Comment:
   😍 



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -15,62 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::equivalence::{EquivalenceProperties, OrderingEquivalenceProperties};
-use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
-use crate::sort_properties::{ExprOrdering, SortProperties};
-use crate::update_ordering;
-use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement};
+use std::borrow::Borrow;
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use crate::expressions::{BinaryExpr, Column};
+use crate::{PhysicalExpr, PhysicalSortExpr};
 
 use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData};
 use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
 use arrow::datatypes::SchemaRef;
-use arrow_schema::SortOptions;
 use datafusion_common::tree_node::{
     Transformed, TreeNode, TreeNodeRewriter, VisitRecursion,
 };
-use datafusion_common::utils::longest_consecutive_prefix;
 use datafusion_common::Result;
 use datafusion_expr::Operator;
 
 use itertools::Itertools;
 use petgraph::graph::NodeIndex;
 use petgraph::stable_graph::StableGraph;
-use std::borrow::Borrow;
-use std::collections::HashMap;
-use std::collections::HashSet;
-use std::sync::Arc;
-
-/// Compare the two expr lists are equal no matter the order.
-/// For example two InListExpr can be considered to be equals no matter the order:
-///
-/// In('a','b','c') == In('c','b','a')
-pub fn expr_list_eq_any_order(

Review Comment:
   FWIW in this kind of change (moving functions to other modules and renaming them) is something that we could probably do as individual PRs that would be quick to review as they would be mostly mechanical. 
   
   That would help make it easier to find the parts of a PR such as this one that needed more careful review 



##########
datafusion/physical-plan/src/aggregates/order/mod.rs:
##########
@@ -18,13 +18,12 @@
 use arrow_array::ArrayRef;
 use arrow_schema::Schema;
 use datafusion_common::Result;
-use datafusion_physical_expr::EmitTo;
-
-use super::{AggregationOrdering, GroupByOrderMode};
+use datafusion_physical_expr::{EmitTo, PhysicalSortExpr};
 
 mod full;
 mod partial;
 
+use crate::windows::PartitionSearchMode;

Review Comment:
   This change effectively removes the duplication between `GroupByOrderMode` and `PartitionSearchMode` that represent the same thing right?
   
   I find `PartitionSearchMode` a confusing name as the term "partition" is pretty overloaded already (like each ExecutionPlan has input/output partitions, and WindowExec deals with partitions). 
   
   Also the fact it is in the `windows` module seems to be a mismatch given it is now used in the Aggregation logic
   
   Maybe it could go into its own module 🤔 and be called `SortOrderMode` or something
   
   Also, can we clarify what `PartitionSearchMode::PartiallySorted` means -- specifically, does it represent a prefix of the columns that is sorted?
   
   This could all be done as follow on PRs



##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -100,63 +93,20 @@ impl ProjectionExec {
             input_schema.metadata().clone(),
         ));
 
-        // construct a map from the input columns to the output columns of the Projection
-        let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
-        for (expr_idx, (expression, name)) in expr.iter().enumerate() {
-            if let Some(column) = expression.as_any().downcast_ref::<Column>() {
-                // For some executors, logical and physical plan schema fields
-                // are not the same. The information in a `Column` comes from
-                // the logical plan schema. Therefore, to produce correct results
-                // we use the field in the input schema with the same index. This
-                // corresponds to the physical plan `Column`.
-                let idx = column.index();
-                let matching_input_field = input_schema.field(idx);
-                let matching_input_column = Column::new(matching_input_field.name(), idx);
-                let entry = columns_map.entry(matching_input_column).or_default();
-                entry.push(Column::new(name, expr_idx));
-            };
-        }
-
-        // Output Ordering need to respect the alias
-        let child_output_ordering = input.output_ordering();
-        let output_ordering = match child_output_ordering {
-            Some(sort_exprs) => {
-                let normalized_exprs = sort_exprs
-                    .iter()
-                    .map(|sort_expr| {
-                        let expr = normalize_out_expr_with_columns_map(
-                            sort_expr.expr.clone(),
-                            &columns_map,
-                        );
-                        PhysicalSortExpr {
-                            expr,
-                            options: sort_expr.options,
-                        }
-                    })
-                    .collect::<Vec<_>>();
-                Some(normalized_exprs)
-            }
-            None => None,
-        };
-
-        let orderings = find_orderings_of_exprs(
-            &expr,
-            input.output_ordering(),
-            input.equivalence_properties(),
-            input.ordering_equivalence_properties(),
-        )?;
+        // construct a map from the input expressions to the output expression of the Projection
+        let projection_mapping = calculate_projection_mapping(&expr, &input_schema)?;

Review Comment:
   😍 



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -191,18 +190,6 @@ impl ExecutionPlan for NestedLoopJoinExec {
         distribution_from_join_type(&self.join_type)
     }
 
-    fn equivalence_properties(&self) -> EquivalenceProperties {

Review Comment:
   Shouldn't this still have `join_equivalent_properties` rather than being removed?



##########
datafusion/physical-plan/src/common.rs:
##########
@@ -373,6 +375,38 @@ pub fn batch_byte_size(batch: &RecordBatch) -> usize {
     batch.get_array_memory_size()
 }
 
+/// Constructs the mapping between a projection's input and output
+pub fn calculate_projection_mapping(

Review Comment:
   To make this easier to discover, perhaps it could be a method in `ProjectionMapping` such as `ProjectionMapping::try_new`



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -331,16 +333,14 @@ impl GroupedHashAggregateStream {
         let reservation = MemoryConsumer::new(name)
             .with_can_spill(true)
             .register(context.memory_pool());
-
-        let group_ordering = agg
-            .aggregation_ordering
-            .as_ref()
-            .map(|aggregation_ordering| {
-                GroupOrdering::try_new(&group_schema, aggregation_ordering)
-            })
-            // return error if any
-            .transpose()?
-            .unwrap_or(GroupOrdering::None);
+        let (ordering, _) = agg

Review Comment:
   👌  very nice



##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -60,15 +56,12 @@ pub struct ProjectionExec {
     input: Arc<dyn ExecutionPlan>,
     /// The output ordering
     output_ordering: Option<Vec<PhysicalSortExpr>>,
-    /// The columns map used to normalize out expressions like Partitioning and PhysicalSortExpr
-    /// The key is the column from the input schema and the values are the columns from the output schema
-    columns_map: HashMap<Column, Vec<Column>>,
+    /// The mapping used to normalize expressions like Partitioning and
+    /// PhysicalSortExpr. The key is the expression from the input schema
+    /// and the value is the expression from the output schema.
+    projection_mapping: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>,

Review Comment:
   This is another pattern that has appeared more than once (also in GroupBy) -- maybe it could be `ProjectionMapping` (which could also be made into a struct w/ a constructor)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1381205423


##########
datafusion/physical-plan/src/coalesce_partitions.rs:
##########
@@ -100,10 +100,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
         None
     }
 
-    fn equivalence_properties(&self) -> EquivalenceProperties {

Review Comment:
   Thanks @alamb for catching, it. I implemented `CoalescePartitionsExec`, and it improved a test (😄 )



##########
datafusion/physical-plan/src/coalesce_partitions.rs:
##########
@@ -100,10 +100,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
         None
     }
 
-    fn equivalence_properties(&self) -> EquivalenceProperties {

Review Comment:
   Thanks @alamb for catching, it. I implemented `CoalescePartitionsExec`, and it improved a test (😄 )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#issuecomment-1792694086

   > BTW I still think give the size of this PR we should merge it now and then fix the issue next week. Follow on ticket to serve as a placeholder
   > 
   > #8043
   
   That would be great. I actually understood the reason for the bug, and working on it. However, I also think that it is better to merge this PR without waiting.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1379662628


##########
datafusion/core/src/physical_optimizer/sort_pushdown.rs:
##########
@@ -298,34 +302,34 @@ fn pushdown_requirement_to_children(
 /// If the the parent requirements are more specific, push down the parent requirements
 /// If they are not compatible, need to add Sort.
 fn determine_children_requirement(
-    parent_required: Option<&[PhysicalSortRequirement]>,
-    request_child: Option<&[PhysicalSortRequirement]>,
+    parent_required: LexRequirementRef,

Review Comment:
   I opened the issue [#8026](https://github.com/apache/arrow-datafusion/issues/8026) to track this feature 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Combine Equivalence and Ordering equivalence to simplify state [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#issuecomment-1805645447

   While working on the upgrade in IOx, I also filed https://github.com/apache/arrow-datafusion/issues/8120 to maybe help avoid issues for others upgrading


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org