You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/31 14:16:17 UTC

[GitHub] [arrow-datafusion] mingmwang opened a new pull request, #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

mingmwang opened a new pull request, #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Partially Closes #3854.
   Closes #3653
   Closes #3400
   Closes #189,
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   1. Add methods required_input_ordering() to ExecutionPlan trait to specify the ordering requirements
   2. Fix output_partitioning(), output_ordering(), required_input_distribution() in couple of trait implementations
   3. Add method equivalence_properties() to ExecutionPlan trait to discover the equivalence properties in the Physical plan tree
   4. Support partition aware UnionExec
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012379795


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -255,25 +276,58 @@ impl ExecutionPlan for AggregateExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+        match &self.mode {
+            AggregateMode::Partial => {
+                // Partial Aggregation will not change the output partitioning but need to respect the Alias
+                let input_partition = self.input.output_partitioning();
+                match input_partition {
+                    Partitioning::Hash(exprs, part) => {
+                        let normalized_exprs = exprs
+                            .into_iter()
+                            .map(|expr| {
+                                normalize_out_expr_with_alias_schema(
+                                    expr,
+                                    &self.alias_map,
+                                    &self.schema,
+                                )
+                            })
+                            .collect::<Vec<_>>();
+                        Partitioning::Hash(normalized_exprs, part)
+                    }
+                    _ => input_partition,
+                }
+            }
+            // Final Aggregation's output partitioning is the same as its real input
+            _ => self.input.output_partitioning(),
+        }
     }
 
+    // TODO check the output ordering of AggregateExec

Review Comment:
   Yes, I will remove it. The AggregateExec can not keep the ordering.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1013569410


##########
datafusion/core/src/physical_plan/mod.rs:
##########
@@ -472,7 +508,10 @@ pub enum Distribution {
     HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),

Review Comment:
   Maybe we need to add partition number and schema to the `HashPartitioned ` in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#issuecomment-1297601777

   Thanks @mingmwang  -- I will review this carefully tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1014228851


##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec {
         true
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        true
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        let sort_keys = self.sort_keys.as_deref();
+        vec![sort_keys]
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        if self
-            .window_expr()
-            .iter()
-            .all(|expr| expr.partition_by().is_empty())
-        {
-            Distribution::SinglePartition
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if self.partition_keys.is_empty() {
+            warn!("No partition defined for WindowAggExec!!!");

Review Comment:
   I would recommend removing the warning because it isn't clear to me what a user / administrator of the system would do in this case and so the warning will end up as spam in the logs I think. 
   
   Perhaps we can just change it to `debug!`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1014237468


##########
datafusion/core/src/physical_plan/union.rs:
##########
@@ -194,6 +258,73 @@ impl ExecutionPlan for UnionExec {
     }
 }
 
+/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one

Review Comment:
   I feel there was already a piece of code that does this -- maybe @tustvold  can remind me 🤔 



##########
datafusion/core/src/dataframe.rs:
##########
@@ -1605,4 +1607,74 @@ mod tests {
             Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count * 2));
         Ok(())
     }
+
+    #[tokio::test]
+    async fn verify_join_output_partitioning() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c2"])?
+            .with_column_renamed("c2.c1", "c2_c1")?
+            .with_column_renamed("c2.c2", "c2_c2")?;
+
+        let all_join_types = vec![
+            JoinType::Inner,
+            JoinType::Left,
+            JoinType::Right,
+            JoinType::Full,
+            JoinType::LeftSemi,
+            JoinType::RightSemi,
+            JoinType::LeftAnti,
+            JoinType::RightAnti,
+        ];
+
+        let default_partition_count =
+            SessionContext::new().copied_config().target_partitions;
+
+        for join_type in all_join_types {
+            let join = left.join(
+                right.clone(),
+                join_type,
+                &["c1", "c2"],
+                &["c2_c1", "c2_c2"],
+                None,
+            )?;
+            let physical_plan = join.create_physical_plan().await?;
+            let out_partitioning = physical_plan.output_partitioning();
+            let join_schema = physical_plan.schema();
+
+            match join_type {
+                JoinType::Inner
+                | JoinType::Left
+                | JoinType::LeftSemi
+                | JoinType::LeftAnti => {
+                    let left_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
+                        Arc::new(Column::new_with_schema("c1", &join_schema).unwrap()),
+                        Arc::new(Column::new_with_schema("c2", &join_schema).unwrap()),
+                    ];
+                    assert_eq!(

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012588817


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }
+    }
+
+    if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+        // need to merge the two existing properties
+        let second_properties = eq_properties.get(idx2 as usize).unwrap().clone();
+        let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+        for prop in second_properties.iter() {
+            if !first_properties.contains(prop) {
+                first_properties.insert(prop.clone());
+            }
+        }
+        eq_properties.remove(idx2 as usize);
+    } else if idx1 == -1 && idx2 == -1 {
+        // adding new pairs
+        eq_properties.push(EquivalenceProperties::new(
+            new_condition.0.clone(),
+            vec![new_condition.1.clone()],
+        ))
+    }
+}
+
+pub fn remove_equivalence_properties(

Review Comment:
   I will remove the related logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1013554279


##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::expressions::Column;
+
+use arrow::datatypes::SchemaRef;
+
+use std::collections::HashMap;
+use std::collections::HashSet;
+
+/// Equivalence Properties is a vec of EquivalentClass.
+#[derive(Debug, Default, Clone)]
+pub struct EquivalenceProperties {
+    classes: Vec<EquivalentClass>,
+}
+
+impl EquivalenceProperties {
+    pub fn new() -> Self {
+        EquivalenceProperties { classes: vec![] }
+    }
+
+    pub fn classes(&self) -> &[EquivalentClass] {
+        &self.classes
+    }
+
+    pub fn extend<I: IntoIterator<Item = EquivalentClass>>(&mut self, iter: I) {
+        self.classes.extend(iter)
+    }
+
+    /// Add new equal conditions into the EquivalenceProperties, the new equal conditions are usually comming from the
+    /// equality predicates in Join or Filter
+    pub fn add_equal_conditions(&mut self, new_conditions: (&Column, &Column)) {
+        let mut idx1: Option<usize> = None;
+        let mut idx2: Option<usize> = None;

Review Comment:
   An option is much more correct now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012521536


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }
+    }
+
+    if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+        // need to merge the two existing properties
+        let second_properties = eq_properties.get(idx2 as usize).unwrap().clone();
+        let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+        for prop in second_properties.iter() {
+            if !first_properties.contains(prop) {
+                first_properties.insert(prop.clone());
+            }
+        }
+        eq_properties.remove(idx2 as usize);
+    } else if idx1 == -1 && idx2 == -1 {
+        // adding new pairs
+        eq_properties.push(EquivalenceProperties::new(
+            new_condition.0.clone(),
+            vec![new_condition.1.clone()],
+        ))
+    }
+}
+
+pub fn remove_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    remove_condition: (&Column, &Column),
+) {
+    let mut match_idx = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(remove_condition.0);
+        let contains_second = prop.contains(remove_condition.1);
+        if contains_first && contains_second {
+            match_idx = idx as i32;
+            break;
+        }
+    }
+    if match_idx >= 0 {
+        let matches = eq_properties.get_mut(match_idx as usize).unwrap();
+        matches.remove(remove_condition.0);
+        matches.remove(remove_condition.1);
+        if matches.len() <= 1 {
+            eq_properties.remove(match_idx as usize);
+        }
+    }
+}
+
+pub fn merge_equivalence_properties_with_alias(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+) {
+    for (column, columns) in alias_map {
+        let mut find_match = false;
+        for (_idx, prop) in eq_properties.iter_mut().enumerate() {
+            if prop.contains(column) {
+                for col in columns {
+                    prop.insert(col.clone());

Review Comment:
   Although it can be corrected by `truncate_equivalence_properties_not_in_schema`, I still think it's better to construct a new one directly rather than do the merge based on the input `EquivalenceProperties`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#issuecomment-1301863207

   @alamb @yahoNanJing 
   Please help to take look again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012476479


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }
+    }
+
+    if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+        // need to merge the two existing properties
+        let second_properties = eq_properties.get(idx2 as usize).unwrap().clone();
+        let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+        for prop in second_properties.iter() {
+            if !first_properties.contains(prop) {
+                first_properties.insert(prop.clone());
+            }
+        }
+        eq_properties.remove(idx2 as usize);
+    } else if idx1 == -1 && idx2 == -1 {
+        // adding new pairs
+        eq_properties.push(EquivalenceProperties::new(
+            new_condition.0.clone(),
+            vec![new_condition.1.clone()],
+        ))
+    }
+}
+
+pub fn remove_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    remove_condition: (&Column, &Column),
+) {
+    let mut match_idx = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(remove_condition.0);
+        let contains_second = prop.contains(remove_condition.1);
+        if contains_first && contains_second {
+            match_idx = idx as i32;
+            break;
+        }
+    }
+    if match_idx >= 0 {
+        let matches = eq_properties.get_mut(match_idx as usize).unwrap();
+        matches.remove(remove_condition.0);
+        matches.remove(remove_condition.1);
+        if matches.len() <= 1 {

Review Comment:
   This logic may be not correct. For example, original two equivalence properties, left side (l1,l2), right side (r1,r2), then after `combine_equivalence_properties`, it becomes one equivalence properties, (l1,l2,r1,r2). Then we comes to `remove_equivalence_properties` with remove condition (l1,r1). 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012473618


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(

Review Comment:
   Good interface design. It can be leveraged by both the Join and Filter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#issuecomment-1304780786

   Sorry for the delay


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012168379


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }
+    }
+
+    if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+        // need to merge the two existing properties
+        let second_properties = eq_properties.get(idx2 as usize).unwrap().clone();
+        let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+        for prop in second_properties.iter() {
+            if !first_properties.contains(prop) {
+                first_properties.insert(prop.clone());
+            }
+        }
+        eq_properties.remove(idx2 as usize);
+    } else if idx1 == -1 && idx2 == -1 {
+        // adding new pairs
+        eq_properties.push(EquivalenceProperties::new(
+            new_condition.0.clone(),
+            vec![new_condition.1.clone()],
+        ))
+    }
+}
+
+pub fn remove_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    remove_condition: (&Column, &Column),
+) {
+    let mut match_idx = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(remove_condition.0);
+        let contains_second = prop.contains(remove_condition.1);
+        if contains_first && contains_second {
+            match_idx = idx as i32;
+            break;
+        }
+    }
+    if match_idx >= 0 {
+        let matches = eq_properties.get_mut(match_idx as usize).unwrap();
+        matches.remove(remove_condition.0);
+        matches.remove(remove_condition.1);
+        if matches.len() <= 1 {
+            eq_properties.remove(match_idx as usize);
+        }
+    }
+}
+
+pub fn merge_equivalence_properties_with_alias(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+) {
+    for (column, columns) in alias_map {
+        let mut find_match = false;
+        for (_idx, prop) in eq_properties.iter_mut().enumerate() {
+            if prop.contains(column) {
+                for col in columns {
+                    prop.insert(col.clone());
+                }
+                find_match = true;
+                break;
+            }
+        }
+        if !find_match {
+            eq_properties
+                .push(EquivalenceProperties::new(column.clone(), columns.clone()));
+        }
+    }
+}
+
+pub fn truncate_equivalence_properties_not_in_schema(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    schema: &SchemaRef,
+) {
+    for props in eq_properties.iter_mut() {
+        let mut columns_to_remove = vec![];
+        for column in props.iter() {
+            if let Ok(idx) = schema.index_of(column.name()) {
+                if idx != column.index() {
+                    columns_to_remove.push(column.clone());
+                }
+            } else {
+                columns_to_remove.push(column.clone());
+            }
+        }
+        for column in columns_to_remove {
+            props.remove(&column);
+        }
+    }
+    eq_properties.retain(|props| props.len() > 1);
+}
+
+/// Normalize the output expressions based on Alias Map and SchemaRef.
+///
+/// 1) If there is mapping in Alias Map, replace the Column in the output expressions with the 1st Column in Alias Map
+/// 2) If the Column is invalid for the current Schema, replace the Column with a place holder UnKnownColumn
+///
+pub fn normalize_out_expr_with_alias_schema(
+    expr: Arc<dyn PhysicalExpr>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+    schema: &SchemaRef,
+) -> Arc<dyn PhysicalExpr> {
+    let expr_clone = expr.clone();
+    expr_clone
+        .transform(&|expr| {
+            let normalized_form: Option<Arc<dyn PhysicalExpr>> =
+                match expr.as_any().downcast_ref::<Column>() {
+                    Some(column) => {
+                        let out = alias_map
+                            .get(column)
+                            .map(|c| {
+                                let out_col: Arc<dyn PhysicalExpr> =
+                                    Arc::new(c[0].clone());
+                                out_col
+                            })
+                            .or_else(|| match schema.index_of(column.name()) {
+                                // Exactly matching, return None, no need to do the transform
+                                Ok(idx) if column.index() == idx => None,
+                                _ => {
+                                    let out_col: Arc<dyn PhysicalExpr> =
+                                        Arc::new(UnKnownColumn::new(column.name()));
+                                    Some(out_col)
+                                }
+                            });
+                        out
+                    }
+                    None => None,
+                };
+            normalized_form
+        })
+        .unwrap_or(expr)
+}
+
+pub fn normalize_expr_with_equivalence_properties(
+    expr: Arc<dyn PhysicalExpr>,
+    eq_properties: &[EquivalenceProperties],
+) -> Arc<dyn PhysicalExpr> {
+    let mut normalized = expr.clone();
+    if let Some(column) = expr.as_any().downcast_ref::<Column>() {

Review Comment:
   Does this need to recursively rewrite exprs?
   
   Like what if `expr` was `A + B` and you had an equivalence class with `B = C`
   
   Wouldn't you have to rewrite `A + B` into `A + C`? But I don't see this code recursing.
   
   This kind of rewrite could be tested as well I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#issuecomment-1299146934

   @liukun4515 and @Ted-Jiang   perhaps you have some time to help review this as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012520355


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }
+    }
+
+    if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+        // need to merge the two existing properties
+        let second_properties = eq_properties.get(idx2 as usize).unwrap().clone();
+        let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+        for prop in second_properties.iter() {
+            if !first_properties.contains(prop) {
+                first_properties.insert(prop.clone());
+            }
+        }
+        eq_properties.remove(idx2 as usize);
+    } else if idx1 == -1 && idx2 == -1 {
+        // adding new pairs
+        eq_properties.push(EquivalenceProperties::new(
+            new_condition.0.clone(),
+            vec![new_condition.1.clone()],
+        ))
+    }
+}
+
+pub fn remove_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    remove_condition: (&Column, &Column),
+) {
+    let mut match_idx = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(remove_condition.0);
+        let contains_second = prop.contains(remove_condition.1);
+        if contains_first && contains_second {
+            match_idx = idx as i32;
+            break;
+        }
+    }
+    if match_idx >= 0 {
+        let matches = eq_properties.get_mut(match_idx as usize).unwrap();
+        matches.remove(remove_condition.0);
+        matches.remove(remove_condition.1);
+        if matches.len() <= 1 {
+            eq_properties.remove(match_idx as usize);
+        }
+    }
+}
+
+pub fn merge_equivalence_properties_with_alias(
+    eq_properties: &mut Vec<EquivalenceProperties>,

Review Comment:
   The `eq_properties` is the `EquivalenceProperties` of some input for the current operator.
   
   Here, the goal of this function to construct a new `EquivalenceProperties` for the current operator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012581673


##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -270,6 +271,75 @@ impl ExecutionPlan for HashJoinExec {
         self.schema.clone()
     }
 
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        match self.mode {
+            PartitionMode::CollectLeft => vec![
+                Distribution::SinglePartition,
+                Distribution::UnspecifiedDistribution,
+            ],
+            PartitionMode::Partitioned => {
+                let (left_expr, right_expr) = self
+                    .on
+                    .iter()
+                    .map(|(l, r)| {
+                        (
+                            Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
+                            Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
+                        )
+                    })
+                    .unzip();
+                vec![

Review Comment:
   It is possible, but at current phase I will not implement it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012388551


##########
datafusion/core/src/dataframe.rs:
##########
@@ -1515,4 +1515,84 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn partition_aware_union() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c3"])?
+            .with_column_renamed("c2.c1", "c2_c1")?;
+
+        let left_rows = left.collect().await?;
+        let right_rows = right.collect().await?;
+        let join1 =

Review Comment:
   Sure, this UT verifies the union is partition aware, by checking the output partitioning count is the default partition count(not the combinations of the input) and also the result is correct. I will add more comments here, and I will also add more code here to verify the output partition is same with the union's inputs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012474997


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }
+    }
+
+    if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+        // need to merge the two existing properties
+        let second_properties = eq_properties.get(idx2 as usize).unwrap().clone();
+        let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+        for prop in second_properties.iter() {
+            if !first_properties.contains(prop) {
+                first_properties.insert(prop.clone());
+            }
+        }
+        eq_properties.remove(idx2 as usize);
+    } else if idx1 == -1 && idx2 == -1 {
+        // adding new pairs
+        eq_properties.push(EquivalenceProperties::new(
+            new_condition.0.clone(),
+            vec![new_condition.1.clone()],
+        ))
+    }
+}
+
+pub fn remove_equivalence_properties(

Review Comment:
   Why does the eq_properties contain the none equal columns?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#issuecomment-1297183481

   @alamb @andygrove @Dandandan @isidentical @yahoNanJing 
   Please help to take a look
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#issuecomment-1299145927

   I am sorry -- I ran out of time today -- will try and find time tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012578689


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }
+    }
+
+    if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+        // need to merge the two existing properties
+        let second_properties = eq_properties.get(idx2 as usize).unwrap().clone();
+        let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+        for prop in second_properties.iter() {
+            if !first_properties.contains(prop) {
+                first_properties.insert(prop.clone());
+            }
+        }
+        eq_properties.remove(idx2 as usize);
+    } else if idx1 == -1 && idx2 == -1 {
+        // adding new pairs
+        eq_properties.push(EquivalenceProperties::new(
+            new_condition.0.clone(),
+            vec![new_condition.1.clone()],
+        ))
+    }
+}
+
+pub fn remove_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    remove_condition: (&Column, &Column),
+) {
+    let mut match_idx = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(remove_condition.0);
+        let contains_second = prop.contains(remove_condition.1);
+        if contains_first && contains_second {
+            match_idx = idx as i32;
+            break;
+        }
+    }
+    if match_idx >= 0 {
+        let matches = eq_properties.get_mut(match_idx as usize).unwrap();
+        matches.remove(remove_condition.0);
+        matches.remove(remove_condition.1);
+        if matches.len() <= 1 {

Review Comment:
   I agree it is confusing. I will remove the remove_equivalence_properties related logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#issuecomment-1304780769

   >  Hi @alamb, should we merge this PR first so that @mingmwang will be able to continue the part 3 of this unnecessary shuffling optimization?
   
   
   
   Yes absolutely!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1013553521


##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::expressions::Column;
+
+use arrow::datatypes::SchemaRef;
+
+use std::collections::HashMap;
+use std::collections::HashSet;
+
+/// Equivalence Properties is a vec of EquivalentClass.
+#[derive(Debug, Default, Clone)]
+pub struct EquivalenceProperties {
+    classes: Vec<EquivalentClass>,
+}
+
+impl EquivalenceProperties {
+    pub fn new() -> Self {
+        EquivalenceProperties { classes: vec![] }
+    }
+
+    pub fn classes(&self) -> &[EquivalentClass] {
+        &self.classes
+    }
+
+    pub fn extend<I: IntoIterator<Item = EquivalentClass>>(&mut self, iter: I) {
+        self.classes.extend(iter)
+    }
+
+    /// Add new equal conditions into the EquivalenceProperties, the new equal conditions are usually comming from the
+    /// equality predicates in Join or Filter
+    pub fn add_equal_conditions(&mut self, new_conditions: (&Column, &Column)) {
+        let mut idx1: Option<usize> = None;
+        let mut idx2: Option<usize> = None;
+        for (idx, class) in self.classes.iter_mut().enumerate() {
+            let contains_first = class.contains(new_conditions.0);
+            let contains_second = class.contains(new_conditions.1);
+            match (contains_first, contains_second) {
+                (true, false) => {
+                    class.insert(new_conditions.1.clone());
+                    idx1 = Some(idx);
+                }
+                (false, true) => {
+                    class.insert(new_conditions.0.clone());
+                    idx2 = Some(idx);
+                }
+                (true, true) => {
+                    idx1 = Some(idx);
+                    idx2 = Some(idx);
+                    break;
+                }
+                (false, false) => {}
+            }
+        }
+
+        match (idx1, idx2) {
+            (Some(idx_1), Some(idx_2)) if idx_1 != idx_2 => {
+                // need to merge the two existing EquivalentClasses
+                let second_eq_class = self.classes.get(idx_2).unwrap().clone();
+                let first_eq_class = self.classes.get_mut(idx_1).unwrap();
+                for prop in second_eq_class.iter() {
+                    if !first_eq_class.contains(prop) {
+                        first_eq_class.insert(prop.clone());
+                    }
+                }
+                self.classes.remove(idx_2);
+            }
+            (None, None) => {
+                // adding new pairs
+                self.classes.push(EquivalentClass::new(
+                    new_conditions.0.clone(),
+                    vec![new_conditions.1.clone()],
+                ));
+            }
+            _ => {}
+        }
+    }
+
+    pub fn merge_properties_with_alias(
+        &mut self,
+        alias_map: &HashMap<Column, Vec<Column>>,
+    ) {
+        for (column, columns) in alias_map {
+            let mut find_match = false;
+            for class in self.classes.iter_mut() {
+                if class.contains(column) {
+                    for col in columns {
+                        class.insert(col.clone());
+                    }
+                    find_match = true;
+                    break;
+                }
+            }
+            if !find_match {
+                self.classes
+                    .push(EquivalentClass::new(column.clone(), columns.clone()));
+            }
+        }
+    }
+
+    pub fn truncate_properties_not_in_schema(&mut self, schema: &SchemaRef) {
+        for class in self.classes.iter_mut() {
+            let mut columns_to_remove = vec![];
+            for column in class.iter() {
+                if let Ok(idx) = schema.index_of(column.name()) {
+                    if idx != column.index() {
+                        columns_to_remove.push(column.clone());
+                    }
+                } else {
+                    columns_to_remove.push(column.clone());
+                }
+            }
+            for column in columns_to_remove {
+                class.remove(&column);
+            }
+        }
+        self.classes.retain(|props| props.len() > 1);
+    }
+}
+
+/// Equivalent Class is a set of Columns that are known to have the same value in all tuples in a relation
+/// Equivalence Class is generated by equality predicates, typically equijoin conditions and equality conditions in filters.

Review Comment:
   I like this abstraction an the comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#issuecomment-1303090955

   > Should we make the Equivalence Properties schema aware ?
   
   It would be great to add this schema constraint. Then we can avoid the ambiguous in 
   https://github.com/apache/arrow-datafusion/blob/e945c37d25cb173d03929084bcd8aac31f71580e/datafusion/core/src/physical_plan/projection.rs#L202-L209


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012473317


##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
     }
 }
 
+#[derive(Debug, Clone)]
+pub struct EquivalenceProperties {

Review Comment:
   Hi @alamb,  is there a kind of linked hash map in Rust? Maybe better to leverage a similar data structure here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012377321


##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
     }
 }
 
+#[derive(Debug, Clone)]

Review Comment:
   Sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012600182


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }
+    }
+
+    if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+        // need to merge the two existing properties
+        let second_properties = eq_properties.get(idx2 as usize).unwrap().clone();
+        let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+        for prop in second_properties.iter() {
+            if !first_properties.contains(prop) {
+                first_properties.insert(prop.clone());
+            }
+        }
+        eq_properties.remove(idx2 as usize);
+    } else if idx1 == -1 && idx2 == -1 {
+        // adding new pairs
+        eq_properties.push(EquivalenceProperties::new(
+            new_condition.0.clone(),
+            vec![new_condition.1.clone()],
+        ))
+    }
+}
+
+pub fn remove_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    remove_condition: (&Column, &Column),
+) {
+    let mut match_idx = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(remove_condition.0);
+        let contains_second = prop.contains(remove_condition.1);
+        if contains_first && contains_second {
+            match_idx = idx as i32;
+            break;
+        }
+    }
+    if match_idx >= 0 {
+        let matches = eq_properties.get_mut(match_idx as usize).unwrap();
+        matches.remove(remove_condition.0);
+        matches.remove(remove_condition.1);
+        if matches.len() <= 1 {
+            eq_properties.remove(match_idx as usize);
+        }
+    }
+}
+
+pub fn merge_equivalence_properties_with_alias(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+) {
+    for (column, columns) in alias_map {
+        let mut find_match = false;
+        for (_idx, prop) in eq_properties.iter_mut().enumerate() {
+            if prop.contains(column) {
+                for col in columns {
+                    prop.insert(col.clone());
+                }
+                find_match = true;
+                break;
+            }
+        }
+        if !find_match {
+            eq_properties
+                .push(EquivalenceProperties::new(column.clone(), columns.clone()));
+        }
+    }
+}
+
+pub fn truncate_equivalence_properties_not_in_schema(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    schema: &SchemaRef,
+) {
+    for props in eq_properties.iter_mut() {
+        let mut columns_to_remove = vec![];
+        for column in props.iter() {
+            if let Ok(idx) = schema.index_of(column.name()) {
+                if idx != column.index() {
+                    columns_to_remove.push(column.clone());
+                }
+            } else {
+                columns_to_remove.push(column.clone());
+            }
+        }
+        for column in columns_to_remove {
+            props.remove(&column);
+        }
+    }
+    eq_properties.retain(|props| props.len() > 1);
+}
+
+/// Normalize the output expressions based on Alias Map and SchemaRef.
+///
+/// 1) If there is mapping in Alias Map, replace the Column in the output expressions with the 1st Column in Alias Map
+/// 2) If the Column is invalid for the current Schema, replace the Column with a place holder UnKnownColumn
+///
+pub fn normalize_out_expr_with_alias_schema(
+    expr: Arc<dyn PhysicalExpr>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+    schema: &SchemaRef,
+) -> Arc<dyn PhysicalExpr> {
+    let expr_clone = expr.clone();
+    expr_clone
+        .transform(&|expr| {
+            let normalized_form: Option<Arc<dyn PhysicalExpr>> =
+                match expr.as_any().downcast_ref::<Column>() {
+                    Some(column) => {
+                        let out = alias_map
+                            .get(column)
+                            .map(|c| {
+                                let out_col: Arc<dyn PhysicalExpr> =
+                                    Arc::new(c[0].clone());
+                                out_col
+                            })
+                            .or_else(|| match schema.index_of(column.name()) {
+                                // Exactly matching, return None, no need to do the transform
+                                Ok(idx) if column.index() == idx => None,
+                                _ => {
+                                    let out_col: Arc<dyn PhysicalExpr> =
+                                        Arc::new(UnKnownColumn::new(column.name()));
+                                    Some(out_col)
+                                }
+                            });
+                        out
+                    }
+                    None => None,
+                };
+            normalized_form
+        })
+        .unwrap_or(expr)
+}
+
+pub fn normalize_expr_with_equivalence_properties(
+    expr: Arc<dyn PhysicalExpr>,
+    eq_properties: &[EquivalenceProperties],
+) -> Arc<dyn PhysicalExpr> {
+    let mut normalized = expr.clone();
+    if let Some(column) = expr.as_any().downcast_ref::<Column>() {

Review Comment:
   Yes, rewriting recursively is more safe.  Currently the equal join conditions are just Columns, 
   and  for AggregateExec, the output_group_expr are also Columns. For WindowAggExec, does DataFusion support Partition by complex exprs ?
     



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1013139898


##########
datafusion/core/src/physical_plan/mod.rs:
##########
@@ -122,10 +122,20 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// have any particular output order here
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
 
-    /// Specifies the data distribution requirements of all the
-    /// children for this operator
-    fn required_child_distribution(&self) -> Distribution {
-        Distribution::UnspecifiedDistribution
+    /// Specifies the data distribution requirements for all the
+    /// children for this operator, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
+    fn required_input_distribution(&self) -> Vec<Distribution> {

Review Comment:
   @alamb 
   How do you think, for leaf nodes, should we return an empty `vec![] `here or return 
   `vec![Distribution::UnspecifiedDistribution] `?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012388922


##########
datafusion/core/src/physical_plan/joins/cross_join.rs:
##########
@@ -153,16 +156,27 @@ impl ExecutionPlan for CrossJoinExec {
         )?))
     }
 
+    // TODO optimize CrossJoin implementation to generate M * N partitions
     fn output_partitioning(&self) -> Partitioning {
-        self.right.output_partitioning()
+        let left_columns_len = self.left.schema().fields.len();
+        adjust_right_output_partitioning(
+            self.right.output_partitioning(),
+            left_columns_len,
+        )
     }
 
+    // TODO check the output ordering of CrossJoin

Review Comment:
   Yeah, I'm not sure whether our CrossJoin implementation can keep the ordering of right side or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012389265


##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
     }
 }
 
+#[derive(Debug, Clone)]
+pub struct EquivalenceProperties {

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012480320


##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
     }
 }
 
+#[derive(Debug, Clone)]
+pub struct EquivalenceProperties {

Review Comment:
   The preferred data structure is LinkedHashSet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012572815


##########
datafusion/core/src/physical_plan/mod.rs:
##########
@@ -122,10 +122,20 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// have any particular output order here
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
 
-    /// Specifies the data distribution requirements of all the
-    /// children for this operator
-    fn required_child_distribution(&self) -> Distribution {
-        Distribution::UnspecifiedDistribution
+    /// Specifies the data distribution requirements for all the
+    /// children for this operator, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
+    fn required_input_distribution(&self) -> Vec<Distribution> {

Review Comment:
   Maybe we can just use
   `vec![Distribution::UnspecifiedDistribution; self.children().len()]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012469233


##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -231,6 +246,38 @@ impl RecordBatchStream for FilterExecStream {
     }
 }
 
+/// Return the equals Column-Pairs and Non-equals Column-Pairs
+fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> EqualAndNonEqual {

Review Comment:
   Since this is only used by FilterExec, I would prefer to keep this as a private func in filter.rs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012378962


##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec {
         true
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        true
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        let sort_keys = self.sort_keys.as_deref();
+        vec![sort_keys]
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        if self
-            .window_expr()
-            .iter()
-            .all(|expr| expr.partition_by().is_empty())
-        {
-            Distribution::SinglePartition
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if self.partition_keys.is_empty() {
+            warn!("No partition defined for WindowAggExec!!!");

Review Comment:
   Yes, this is a valid case, but the SQL might run very slowly without any `Partition By` clause due to collapsed to the `Distribution::SinglePartition`.   I can remove the warning if we think the warning is useless.  There is one optimization we can do here in future after we add
   the `Range Partitioning` (I can work on this maybe next month). When there is not `Partition By` clause but only `Order By`, and depends on the window funcs, for some cases we can make the  `required_input_distribution` to be `SortDistribution`, so that the `WindowAggExec` can still run in parallel. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1014763562


##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec {
         true
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        true
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        let sort_keys = self.sort_keys.as_deref();
+        vec![sort_keys]
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        if self
-            .window_expr()
-            .iter()
-            .all(|expr| expr.partition_by().is_empty())
-        {
-            Distribution::SinglePartition
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if self.partition_keys.is_empty() {
+            warn!("No partition defined for WindowAggExec!!!");

Review Comment:
   Sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ursabot commented on pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#issuecomment-1304782555

   Benchmark runs are scheduled for baseline = 238e179224661f681b20b9ae32f59efd5a3b0713 and contender = b7a33317c2abf265f4ab6b3fe636f87c4d01334c. b7a33317c2abf265f4ab6b3fe636f87c4d01334c is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/74c063638fb14320947eb9ebc781dbf0...bdc9db9a6d1a460887f02d26e7c6f4d0/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/6d60ef8ea91e4d7eaaf940dfad75931d...e51f18d23c8c406c9873542dade20d6a/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/1500f1fdca064e84b49e8926debf610b...b09184714450467b97d93bde7b21c285/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/78c52e2c3322468490cc21e245460cb5...77460c7c6e244a19ac99d841ffe2a1a0/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012576631


##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -270,6 +271,75 @@ impl ExecutionPlan for HashJoinExec {
         self.schema.clone()
     }
 
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        match self.mode {
+            PartitionMode::CollectLeft => vec![
+                Distribution::SinglePartition,
+                Distribution::UnspecifiedDistribution,
+            ],
+            PartitionMode::Partitioned => {
+                let (left_expr, right_expr) = self
+                    .on
+                    .iter()
+                    .map(|(l, r)| {
+                        (
+                            Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
+                            Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
+                        )
+                    })
+                    .unzip();
+                vec![

Review Comment:
   Currently it only supports exactly matched case. Is it possible to support partial matching case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012389014


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#issuecomment-1302862514

   retest please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1014764494


##########
datafusion/core/src/dataframe.rs:
##########
@@ -1605,4 +1607,74 @@ mod tests {
             Partitioning::UnknownPartitioning(partition_count) if partition_count == default_partition_count * 2));
         Ok(())
     }
+
+    #[tokio::test]
+    async fn verify_join_output_partitioning() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c2"])?
+            .with_column_renamed("c2.c1", "c2_c1")?
+            .with_column_renamed("c2.c2", "c2_c2")?;
+
+        let all_join_types = vec![
+            JoinType::Inner,
+            JoinType::Left,
+            JoinType::Right,
+            JoinType::Full,
+            JoinType::LeftSemi,
+            JoinType::RightSemi,
+            JoinType::LeftAnti,
+            JoinType::RightAnti,
+        ];
+
+        let default_partition_count =
+            SessionContext::new().copied_config().target_partitions;
+
+        for join_type in all_join_types {
+            let join = left.join(
+                right.clone(),
+                join_type,
+                &["c1", "c2"],
+                &["c2_c1", "c2_c2"],
+                None,
+            )?;
+            let physical_plan = join.create_physical_plan().await?;
+            let out_partitioning = physical_plan.output_partitioning();
+            let join_schema = physical_plan.schema();
+
+            match join_type {
+                JoinType::Inner
+                | JoinType::Left
+                | JoinType::LeftSemi
+                | JoinType::LeftAnti => {
+                    let left_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
+                        Arc::new(Column::new_with_schema("c1", &join_schema).unwrap()),
+                        Arc::new(Column::new_with_schema("c2", &join_schema).unwrap()),
+                    ];
+                    assert_eq!(

Review Comment:
   https://github.com/apache/arrow-datafusion/issues/4116
   https://github.com/apache/arrow-datafusion/issues/4117
   https://github.com/apache/arrow-datafusion/issues/4118
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
alamb merged PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012384969


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -186,13 +194,26 @@ impl AggregateExec {
 
         let schema = Arc::new(schema);
 
+        let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();

Review Comment:
   This is to deal with the case that there are alias in the group exprs, in this case we can not derive the output partitioning from the input/child directly, need to take the alias into consideration. This is similar to the `ProjectionExec`.
   
   For example, the input has the output partitioning 'a', the `ProjectionExec` or `AggregateExec` might have alias 'a as a1', the output partitioning of the ProjectionExec or AggregateExec should be 'a1'.
   `ProjectionExec` and `AggregateExec` will never change the real data distribution, but need to respect the alias.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012521536


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }
+    }
+
+    if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+        // need to merge the two existing properties
+        let second_properties = eq_properties.get(idx2 as usize).unwrap().clone();
+        let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+        for prop in second_properties.iter() {
+            if !first_properties.contains(prop) {
+                first_properties.insert(prop.clone());
+            }
+        }
+        eq_properties.remove(idx2 as usize);
+    } else if idx1 == -1 && idx2 == -1 {
+        // adding new pairs
+        eq_properties.push(EquivalenceProperties::new(
+            new_condition.0.clone(),
+            vec![new_condition.1.clone()],
+        ))
+    }
+}
+
+pub fn remove_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    remove_condition: (&Column, &Column),
+) {
+    let mut match_idx = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(remove_condition.0);
+        let contains_second = prop.contains(remove_condition.1);
+        if contains_first && contains_second {
+            match_idx = idx as i32;
+            break;
+        }
+    }
+    if match_idx >= 0 {
+        let matches = eq_properties.get_mut(match_idx as usize).unwrap();
+        matches.remove(remove_condition.0);
+        matches.remove(remove_condition.1);
+        if matches.len() <= 1 {
+            eq_properties.remove(match_idx as usize);
+        }
+    }
+}
+
+pub fn merge_equivalence_properties_with_alias(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+) {
+    for (column, columns) in alias_map {
+        let mut find_match = false;
+        for (_idx, prop) in eq_properties.iter_mut().enumerate() {
+            if prop.contains(column) {
+                for col in columns {
+                    prop.insert(col.clone());

Review Comment:
   Although it can be corrected by `truncate_equivalence_properties_not_in_schema`, I still think it's better to construct a new one rather than do the merge based on the input `EquivalenceProperties`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012519437


##########
datafusion/core/src/physical_plan/projection.rs:
##########
@@ -51,6 +56,10 @@ pub struct ProjectionExec {
     schema: SchemaRef,
     /// The input plan
     input: Arc<dyn ExecutionPlan>,
+    /// The output ordering
+    output_ordering: Option<Vec<PhysicalSortExpr>>,
+    /// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr
+    alias_map: HashMap<Column, Vec<Column>>,

Review Comment:
   Better to add comments to indicate what does the key & value stand for.
   
   For my understanding, the key is the column in the input schema of this Projection operator. While the values are the columns in this output schema of this Projection operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#issuecomment-1303081180

   Should we make the Equivalence Properties schema aware ?
   
   ````
   /// Equivalence Properties is a vec of EquivalentClass.
   #[derive(Debug, Default, Clone)]
   pub struct EquivalenceProperties {
       classes: Vec<EquivalentClass>,
       schema: SchemaRef,
   }
   ````


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012139283


##########
datafusion/core/src/dataframe.rs:
##########
@@ -1515,4 +1515,84 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn partition_aware_union() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c3"])?
+            .with_column_renamed("c2.c1", "c2_c1")?;
+
+        let left_rows = left.collect().await?;
+        let right_rows = right.collect().await?;
+        let join1 =
+            left.join(right.clone(), JoinType::Inner, &["c1"], &["c2_c1"], None)?;
+        let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"], None)?;
+
+        let union = join1.union(join2)?;
+
+        let union_rows = union.collect().await?;
+
+        assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+        assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+        assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+
+        let physical_plan = union.create_physical_plan().await?;
+        let default_partition_count =
+            SessionContext::new().copied_config().target_partitions;
+        assert_eq!(
+            physical_plan.output_partitioning().partition_count(),
+            default_partition_count
+        );
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn non_partition_aware_union() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c2"])?
+            .with_column_renamed("c2.c1", "c2_c1")?
+            .with_column_renamed("c2.c2", "c2_c2")?;
+
+        let left_rows = left.collect().await?;
+        let right_rows = right.collect().await?;
+        let join1 = left.join(
+            right.clone(),
+            JoinType::Inner,
+            &["c1", "c2"],
+            &["c2_c1", "c2_c2"],
+            None,
+        )?;
+
+        // join key ordering is different

Review Comment:
   👍 



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;

Review Comment:
   I think typically in rust such a sentinel is signaled using `Option
   
   So like
   
   ```rust
       let mut idx1: Option<usize> = None;
       let mut idx2: Option<usize> = None;
   ```



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -255,25 +276,58 @@ impl ExecutionPlan for AggregateExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+        match &self.mode {
+            AggregateMode::Partial => {
+                // Partial Aggregation will not change the output partitioning but need to respect the Alias
+                let input_partition = self.input.output_partitioning();
+                match input_partition {
+                    Partitioning::Hash(exprs, part) => {
+                        let normalized_exprs = exprs
+                            .into_iter()
+                            .map(|expr| {
+                                normalize_out_expr_with_alias_schema(
+                                    expr,
+                                    &self.alias_map,
+                                    &self.schema,
+                                )
+                            })
+                            .collect::<Vec<_>>();
+                        Partitioning::Hash(normalized_exprs, part)
+                    }
+                    _ => input_partition,
+                }
+            }
+            // Final Aggregation's output partitioning is the same as its real input
+            _ => self.input.output_partitioning(),
+        }
     }
 
+    // TODO check the output ordering of AggregateExec

Review Comment:
   // is it still TODO?



##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
     }
 }
 
+#[derive(Debug, Clone)]
+pub struct EquivalenceProperties {

Review Comment:
   Having `EquivalenceProperties` would also provide a single location to add docstrings explaining the structures, and their assumptions and what they are good for



##########
datafusion/core/src/dataframe.rs:
##########
@@ -1515,4 +1515,84 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn partition_aware_union() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c3"])?
+            .with_column_renamed("c2.c1", "c2_c1")?;
+
+        let left_rows = left.collect().await?;
+        let right_rows = right.collect().await?;
+        let join1 =

Review Comment:
   Could you possibly add some comments here about what this test is verifying? It seems like perhaps it is verifying that when the joins are on the same key the partitioning is  the same and thus union can be done without bringing everything to a single stream?



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }

Review Comment:
   You could also use a match statement here and let the compiler heck that all important cases are covered:
   
   ```suggestion
           match (contains_first, contains_second) {
               (true, false) => {
                 prop.insert(new_condition.1.clone());
                 idx1 = idx as i32;
               } 
               (false, true)=> {
                 prop.insert(new_condition.0.clone());
                 idx2 = idx as i32;
               }
               (true, true) =>  {
                 idx1 = idx as i32;
                 idx2 = idx as i32;
                 break;
               }
               (false, false) => {}
           }
   ```



##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
     }
 }
 
+#[derive(Debug, Clone)]
+pub struct EquivalenceProperties {

Review Comment:
   I wonder if `EquivalenceClass` is a more specific name?
   
   Then you could make
   
   ```rust
   struct EquivalenceProperties {
     classes: Vec<EquivalentClass>
   }
   ```
   
   And move functions like `truncate_equivalence_properties_not_in_schema` on to 
   
   ```rust
   impl EquivalenceProperties {
     fn truncate_equivalence_properties_not_in_schema(&self, ..)
   }
   ```
   
   I don't think it is required, but it might keep the code easier to reason about / keep it behind an abstraction



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {

Review Comment:
   This is called `split_conjunction` in the logical optimizer -- perhaps it could be called the same thing in the physical layer. The logical expr implementation also avoids creating quite as many `Vec`s
   
   https://github.com/apache/arrow-datafusion/blob/345234550712173477e7807ba2cf67dd2ffb9ed5/datafusion/optimizer/src/utils.rs#L58-L78



##########
datafusion/core/src/physical_plan/joins/cross_join.rs:
##########
@@ -153,16 +156,27 @@ impl ExecutionPlan for CrossJoinExec {
         )?))
     }
 
+    // TODO optimize CrossJoin implementation to generate M * N partitions
     fn output_partitioning(&self) -> Partitioning {
-        self.right.output_partitioning()
+        let left_columns_len = self.left.schema().fields.len();
+        adjust_right_output_partitioning(
+            self.right.output_partitioning(),
+            left_columns_len,
+        )
     }
 
+    // TODO check the output ordering of CrossJoin

Review Comment:
   is this still a todo?



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -186,13 +194,26 @@ impl AggregateExec {
 
         let schema = Arc::new(schema);
 
+        let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();

Review Comment:
   Can you explain what this code is for?  It doesn't seem correct to me as I don't understand the  circumstances under which the output of be different 🤔 
   
   It seems like in this case the input logical plan maybe was incorrect?



##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
     }
 }
 
+#[derive(Debug, Clone)]

Review Comment:
   What would you think about moving this into `datafusion/physical-expr/src/equivalence.rs` or something? Then we could move all the code that deals with equivalence classes into that module and keep them and the tests together
   
   



##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -231,6 +246,38 @@ impl RecordBatchStream for FilterExecStream {
     }
 }
 
+/// Return the equals Column-Pairs and Non-equals Column-Pairs
+fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> EqualAndNonEqual {

Review Comment:
   Perhaps this would be better in utils.rs



##########
datafusion/core/src/physical_plan/coalesce_batches.rs:
##########
@@ -96,12 +96,15 @@ impl ExecutionPlan for CoalesceBatchesExec {
         self.input.output_partitioning()
     }
 
+    // Depends on how the CoalesceBatches was implemented, it is possible to keep

Review Comment:
   There is also `SortPreservingMerge` that can be used to preserve order but there are tradeoffs there (specifically it takes more effort to keep the sort order than it does to append batches together)



##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec {
         true
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        true
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        let sort_keys = self.sort_keys.as_deref();
+        vec![sort_keys]
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        if self
-            .window_expr()
-            .iter()
-            .all(|expr| expr.partition_by().is_empty())
-        {
-            Distribution::SinglePartition
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if self.partition_keys.is_empty() {
+            warn!("No partition defined for WindowAggExec!!!");

Review Comment:
   I don't know why this would generate a warning -- can't this occur with a query like `SELECT ROW_NUMBER OVER () from foo` (as in an empty over clause)?



##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec {
         true
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        true
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        let sort_keys = self.sort_keys.as_deref();
+        vec![sort_keys]
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        if self
-            .window_expr()
-            .iter()
-            .all(|expr| expr.partition_by().is_empty())
-        {
-            Distribution::SinglePartition
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if self.partition_keys.is_empty() {
+            warn!("No partition defined for WindowAggExec!!!");
+            vec![Distribution::SinglePartition]
         } else {
-            Distribution::UnspecifiedDistribution
+            //TODO support PartitionCollections if there is no common partition columns in the window_expr
+            vec![Distribution::HashPartitioned(self.partition_keys.clone())]

Review Comment:
   👍  I agree this sounds good



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }
+    }
+
+    if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+        // need to merge the two existing properties
+        let second_properties = eq_properties.get(idx2 as usize).unwrap().clone();
+        let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+        for prop in second_properties.iter() {
+            if !first_properties.contains(prop) {
+                first_properties.insert(prop.clone());
+            }
+        }
+        eq_properties.remove(idx2 as usize);
+    } else if idx1 == -1 && idx2 == -1 {
+        // adding new pairs
+        eq_properties.push(EquivalenceProperties::new(
+            new_condition.0.clone(),
+            vec![new_condition.1.clone()],
+        ))
+    }
+}
+
+pub fn remove_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    remove_condition: (&Column, &Column),
+) {
+    let mut match_idx = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(remove_condition.0);
+        let contains_second = prop.contains(remove_condition.1);
+        if contains_first && contains_second {
+            match_idx = idx as i32;
+            break;
+        }
+    }
+    if match_idx >= 0 {
+        let matches = eq_properties.get_mut(match_idx as usize).unwrap();
+        matches.remove(remove_condition.0);
+        matches.remove(remove_condition.1);
+        if matches.len() <= 1 {
+            eq_properties.remove(match_idx as usize);
+        }
+    }
+}
+
+pub fn merge_equivalence_properties_with_alias(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+) {
+    for (column, columns) in alias_map {
+        let mut find_match = false;
+        for (_idx, prop) in eq_properties.iter_mut().enumerate() {
+            if prop.contains(column) {
+                for col in columns {
+                    prop.insert(col.clone());
+                }
+                find_match = true;
+                break;
+            }
+        }
+        if !find_match {
+            eq_properties
+                .push(EquivalenceProperties::new(column.clone(), columns.clone()));
+        }
+    }
+}
+
+pub fn truncate_equivalence_properties_not_in_schema(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    schema: &SchemaRef,
+) {
+    for props in eq_properties.iter_mut() {
+        let mut columns_to_remove = vec![];
+        for column in props.iter() {
+            if let Ok(idx) = schema.index_of(column.name()) {
+                if idx != column.index() {
+                    columns_to_remove.push(column.clone());
+                }
+            } else {
+                columns_to_remove.push(column.clone());
+            }
+        }
+        for column in columns_to_remove {
+            props.remove(&column);
+        }
+    }
+    eq_properties.retain(|props| props.len() > 1);
+}
+
+/// Normalize the output expressions based on Alias Map and SchemaRef.
+///
+/// 1) If there is mapping in Alias Map, replace the Column in the output expressions with the 1st Column in Alias Map
+/// 2) If the Column is invalid for the current Schema, replace the Column with a place holder UnKnownColumn
+///
+pub fn normalize_out_expr_with_alias_schema(
+    expr: Arc<dyn PhysicalExpr>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+    schema: &SchemaRef,
+) -> Arc<dyn PhysicalExpr> {
+    let expr_clone = expr.clone();
+    expr_clone
+        .transform(&|expr| {
+            let normalized_form: Option<Arc<dyn PhysicalExpr>> =
+                match expr.as_any().downcast_ref::<Column>() {
+                    Some(column) => {
+                        let out = alias_map
+                            .get(column)
+                            .map(|c| {
+                                let out_col: Arc<dyn PhysicalExpr> =
+                                    Arc::new(c[0].clone());
+                                out_col
+                            })
+                            .or_else(|| match schema.index_of(column.name()) {
+                                // Exactly matching, return None, no need to do the transform
+                                Ok(idx) if column.index() == idx => None,
+                                _ => {
+                                    let out_col: Arc<dyn PhysicalExpr> =
+                                        Arc::new(UnKnownColumn::new(column.name()));
+                                    Some(out_col)
+                                }
+                            });
+                        out
+                    }
+                    None => None,
+                };
+            normalized_form
+        })
+        .unwrap_or(expr)
+}
+
+pub fn normalize_expr_with_equivalence_properties(
+    expr: Arc<dyn PhysicalExpr>,
+    eq_properties: &[EquivalenceProperties],
+) -> Arc<dyn PhysicalExpr> {
+    let mut normalized = expr.clone();
+    if let Some(column) = expr.as_any().downcast_ref::<Column>() {

Review Comment:
   Does this need to recursively rewrite exprs?
   
   Like what if `expr` was `A + B` and you had an equivalence class with `B = C`
   
   Wouldn't you have to rewrite `A + ` into `A + C`? But I don't see this code recursing.
   
   This kind of rewrite could be tested as well I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012391732


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;

Review Comment:
   > Looking very impressive @mingmwang -- thank you very much
   > 
   > My biggest question is how are the changes to distribution tested? I see code that verifies partitioning (or rather not partitioning) with UnionExec but there are changes made to all the other physical operators.
   > 
   > For example what about tests for `WindowAggregate` and outer joins and sort merge join?
   > 
   > I saw tests for some of the functions for operating on `EquivalenceProperties` 👍 but not all of them.
   > 
   > I left some style questions about encapsulating `EquivalenceProperties` that might also help
   > 
   > So TLDR is I think the changes to the physical operators need more tests.
   > 
   > Maybe you could break out the equivalence class code into a separate PR?
   
   In the 3rd Part of the PR, I will have 7~8 UTs to verify the distribution of the physical plans. Maybe they are more like
   INTG tests, but the logic will be carefully verified. 



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;

Review Comment:
   > Looking very impressive @mingmwang -- thank you very much
   > 
   > My biggest question is how are the changes to distribution tested? I see code that verifies partitioning (or rather not partitioning) with UnionExec but there are changes made to all the other physical operators.
   > 
   > For example what about tests for `WindowAggregate` and outer joins and sort merge join?
   > 
   > I saw tests for some of the functions for operating on `EquivalenceProperties` 👍 but not all of them.
   > 
   > I left some style questions about encapsulating `EquivalenceProperties` that might also help
   > 
   > So TLDR is I think the changes to the physical operators need more tests.
   > 
   > Maybe you could break out the equivalence class code into a separate PR?
   
   In the 3rd Part of the PR, I will have 7~8 UTs to verify the distribution of the physical plans. Maybe they are more like INTG tests, but the logic will be carefully verified. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012583663


##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -270,6 +271,75 @@ impl ExecutionPlan for HashJoinExec {
         self.schema.clone()
     }
 
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        match self.mode {
+            PartitionMode::CollectLeft => vec![
+                Distribution::SinglePartition,
+                Distribution::UnspecifiedDistribution,
+            ],
+            PartitionMode::Partitioned => {
+                let (left_expr, right_expr) = self
+                    .on
+                    .iter()
+                    .map(|(l, r)| {
+                        (
+                            Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
+                            Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
+                        )
+                    })
+                    .unzip();
+                vec![
+                    Distribution::HashPartitioned(left_expr),
+                    Distribution::HashPartitioned(right_expr),
+                ]
+            }
+        }
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        let left_columns_len = self.left.schema().fields.len();
+        match self.mode {
+            PartitionMode::CollectLeft => match self.join_type {
+                JoinType::Inner | JoinType::Right => adjust_right_output_partitioning(
+                    self.right.output_partitioning(),
+                    left_columns_len,
+                ),
+                JoinType::RightSemi | JoinType::RightAnti => {
+                    self.right.output_partitioning()
+                }
+                JoinType::Left

Review Comment:
   Should these cases exist when the partition mode is CollectLeft?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012581673


##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -270,6 +271,75 @@ impl ExecutionPlan for HashJoinExec {
         self.schema.clone()
     }
 
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        match self.mode {
+            PartitionMode::CollectLeft => vec![
+                Distribution::SinglePartition,
+                Distribution::UnspecifiedDistribution,
+            ],
+            PartitionMode::Partitioned => {
+                let (left_expr, right_expr) = self
+                    .on
+                    .iter()
+                    .map(|(l, r)| {
+                        (
+                            Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
+                            Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
+                        )
+                    })
+                    .unzip();
+                vec![

Review Comment:
   It is possible, but this PR will not include it . Originally I have plan to implement such optimizations In Phase 2
   with a more dynamic Enforcement rules, but it has the risk to introduce skewed joins and currently we do not have good way to handle skewed joins.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012378962


##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec {
         true
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        true
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        let sort_keys = self.sort_keys.as_deref();
+        vec![sort_keys]
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        if self
-            .window_expr()
-            .iter()
-            .all(|expr| expr.partition_by().is_empty())
-        {
-            Distribution::SinglePartition
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if self.partition_keys.is_empty() {
+            warn!("No partition defined for WindowAggExec!!!");

Review Comment:
   Yes, this is a valid case, but the SQL might run very slow without any "partition by" clause . I can remove the warning if we think the warning is useless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1014231076


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }
+    }
+
+    if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+        // need to merge the two existing properties
+        let second_properties = eq_properties.get(idx2 as usize).unwrap().clone();
+        let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+        for prop in second_properties.iter() {
+            if !first_properties.contains(prop) {
+                first_properties.insert(prop.clone());
+            }
+        }
+        eq_properties.remove(idx2 as usize);
+    } else if idx1 == -1 && idx2 == -1 {
+        // adding new pairs
+        eq_properties.push(EquivalenceProperties::new(
+            new_condition.0.clone(),
+            vec![new_condition.1.clone()],
+        ))
+    }
+}
+
+pub fn remove_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    remove_condition: (&Column, &Column),
+) {
+    let mut match_idx = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(remove_condition.0);
+        let contains_second = prop.contains(remove_condition.1);
+        if contains_first && contains_second {
+            match_idx = idx as i32;
+            break;
+        }
+    }
+    if match_idx >= 0 {
+        let matches = eq_properties.get_mut(match_idx as usize).unwrap();
+        matches.remove(remove_condition.0);
+        matches.remove(remove_condition.1);
+        if matches.len() <= 1 {
+            eq_properties.remove(match_idx as usize);
+        }
+    }
+}
+
+pub fn merge_equivalence_properties_with_alias(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+) {
+    for (column, columns) in alias_map {
+        let mut find_match = false;
+        for (_idx, prop) in eq_properties.iter_mut().enumerate() {
+            if prop.contains(column) {
+                for col in columns {
+                    prop.insert(col.clone());
+                }
+                find_match = true;
+                break;
+            }
+        }
+        if !find_match {
+            eq_properties
+                .push(EquivalenceProperties::new(column.clone(), columns.clone()));
+        }
+    }
+}
+
+pub fn truncate_equivalence_properties_not_in_schema(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    schema: &SchemaRef,
+) {
+    for props in eq_properties.iter_mut() {
+        let mut columns_to_remove = vec![];
+        for column in props.iter() {
+            if let Ok(idx) = schema.index_of(column.name()) {
+                if idx != column.index() {
+                    columns_to_remove.push(column.clone());
+                }
+            } else {
+                columns_to_remove.push(column.clone());
+            }
+        }
+        for column in columns_to_remove {
+            props.remove(&column);
+        }
+    }
+    eq_properties.retain(|props| props.len() > 1);
+}
+
+/// Normalize the output expressions based on Alias Map and SchemaRef.
+///
+/// 1) If there is mapping in Alias Map, replace the Column in the output expressions with the 1st Column in Alias Map
+/// 2) If the Column is invalid for the current Schema, replace the Column with a place holder UnKnownColumn
+///
+pub fn normalize_out_expr_with_alias_schema(
+    expr: Arc<dyn PhysicalExpr>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+    schema: &SchemaRef,
+) -> Arc<dyn PhysicalExpr> {
+    let expr_clone = expr.clone();
+    expr_clone
+        .transform(&|expr| {
+            let normalized_form: Option<Arc<dyn PhysicalExpr>> =
+                match expr.as_any().downcast_ref::<Column>() {
+                    Some(column) => {
+                        let out = alias_map
+                            .get(column)
+                            .map(|c| {
+                                let out_col: Arc<dyn PhysicalExpr> =
+                                    Arc::new(c[0].clone());
+                                out_col
+                            })
+                            .or_else(|| match schema.index_of(column.name()) {
+                                // Exactly matching, return None, no need to do the transform
+                                Ok(idx) if column.index() == idx => None,
+                                _ => {
+                                    let out_col: Arc<dyn PhysicalExpr> =
+                                        Arc::new(UnKnownColumn::new(column.name()));
+                                    Some(out_col)
+                                }
+                            });
+                        out
+                    }
+                    None => None,
+                };
+            normalized_form
+        })
+        .unwrap_or(expr)
+}
+
+pub fn normalize_expr_with_equivalence_properties(
+    expr: Arc<dyn PhysicalExpr>,
+    eq_properties: &[EquivalenceProperties],
+) -> Arc<dyn PhysicalExpr> {
+    let mut normalized = expr.clone();
+    if let Some(column) = expr.as_any().downcast_ref::<Column>() {

Review Comment:
   >  does DataFusion support Partition by complex exprs ?
   
   Yes I think so:
   
   ```sql
   DataFusion CLI v13.0.0
   ❯ create table foo as values (1,2), (3,4), (3,2), (2,1), (null, 0);
   
   ❯ select first_value(column1) over (partition by (column2%2) order by column2) from foo;
   +--------------------------+
   | FIRST_VALUE(foo.column1) |
   +--------------------------+
   | 2                        |
   |                          |
   |                          |
   |                          |
   |                          |
   +--------------------------+
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1014763709


##########
datafusion/core/src/physical_plan/mod.rs:
##########
@@ -122,10 +122,20 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// have any particular output order here
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
 
-    /// Specifies the data distribution requirements of all the
-    /// children for this operator
-    fn required_child_distribution(&self) -> Distribution {
-        Distribution::UnspecifiedDistribution
+    /// Specifies the data distribution requirements for all the
+    /// children for this operator, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
+    fn required_input_distribution(&self) -> Vec<Distribution> {

Review Comment:
   Sure, I will change it in the following PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1014232136


##########
datafusion/core/src/physical_plan/mod.rs:
##########
@@ -122,10 +122,20 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// have any particular output order here
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
 
-    /// Specifies the data distribution requirements of all the
-    /// children for this operator
-    fn required_child_distribution(&self) -> Distribution {
-        Distribution::UnspecifiedDistribution
+    /// Specifies the data distribution requirements for all the
+    /// children for this operator, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
+    fn required_input_distribution(&self) -> Vec<Distribution> {

Review Comment:
   I personally think @yahoNanJing 's suggestion of
   
   ```rust
   vec![Distribution::UnspecifiedDistribution; self.children().len()]
   ```
   
   would make the intent clearer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

Posted by GitBox <gi...@apache.org>.
yahoNanJing commented on PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#issuecomment-1304691277

   Hi @alamb, should we merge this PR first so that @mingmwang will be able to continue the part 3 of this unnecessary shuffling optimization?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org