You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/02 18:48:20 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4043: [Part2] Partition and Sort Enforcement, ExecutionPlan enhancement

alamb commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012139283


##########
datafusion/core/src/dataframe.rs:
##########
@@ -1515,4 +1515,84 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn partition_aware_union() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c3"])?
+            .with_column_renamed("c2.c1", "c2_c1")?;
+
+        let left_rows = left.collect().await?;
+        let right_rows = right.collect().await?;
+        let join1 =
+            left.join(right.clone(), JoinType::Inner, &["c1"], &["c2_c1"], None)?;
+        let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"], None)?;
+
+        let union = join1.union(join2)?;
+
+        let union_rows = union.collect().await?;
+
+        assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+        assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+        assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());
+
+        let physical_plan = union.create_physical_plan().await?;
+        let default_partition_count =
+            SessionContext::new().copied_config().target_partitions;
+        assert_eq!(
+            physical_plan.output_partitioning().partition_count(),
+            default_partition_count
+        );
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn non_partition_aware_union() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c2"])?
+            .with_column_renamed("c2.c1", "c2_c1")?
+            .with_column_renamed("c2.c2", "c2_c2")?;
+
+        let left_rows = left.collect().await?;
+        let right_rows = right.collect().await?;
+        let join1 = left.join(
+            right.clone(),
+            JoinType::Inner,
+            &["c1", "c2"],
+            &["c2_c1", "c2_c2"],
+            None,
+        )?;
+
+        // join key ordering is different

Review Comment:
   👍 



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;

Review Comment:
   I think typically in rust such a sentinel is signaled using `Option
   
   So like
   
   ```rust
       let mut idx1: Option<usize> = None;
       let mut idx2: Option<usize> = None;
   ```



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -255,25 +276,58 @@ impl ExecutionPlan for AggregateExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
+        match &self.mode {
+            AggregateMode::Partial => {
+                // Partial Aggregation will not change the output partitioning but need to respect the Alias
+                let input_partition = self.input.output_partitioning();
+                match input_partition {
+                    Partitioning::Hash(exprs, part) => {
+                        let normalized_exprs = exprs
+                            .into_iter()
+                            .map(|expr| {
+                                normalize_out_expr_with_alias_schema(
+                                    expr,
+                                    &self.alias_map,
+                                    &self.schema,
+                                )
+                            })
+                            .collect::<Vec<_>>();
+                        Partitioning::Hash(normalized_exprs, part)
+                    }
+                    _ => input_partition,
+                }
+            }
+            // Final Aggregation's output partitioning is the same as its real input
+            _ => self.input.output_partitioning(),
+        }
     }
 
+    // TODO check the output ordering of AggregateExec

Review Comment:
   // is it still TODO?



##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
     }
 }
 
+#[derive(Debug, Clone)]
+pub struct EquivalenceProperties {

Review Comment:
   Having `EquivalenceProperties` would also provide a single location to add docstrings explaining the structures, and their assumptions and what they are good for



##########
datafusion/core/src/dataframe.rs:
##########
@@ -1515,4 +1515,84 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn partition_aware_union() -> Result<()> {
+        let left = test_table().await?.select_columns(&["c1", "c2"])?;
+        let right = test_table_with_name("c2")
+            .await?
+            .select_columns(&["c1", "c3"])?
+            .with_column_renamed("c2.c1", "c2_c1")?;
+
+        let left_rows = left.collect().await?;
+        let right_rows = right.collect().await?;
+        let join1 =

Review Comment:
   Could you possibly add some comments here about what this test is verifying? It seems like perhaps it is verifying that when the joins are on the same key the partitioning is  the same and thus union can be done without bringing everything to a single stream?



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }

Review Comment:
   You could also use a match statement here and let the compiler heck that all important cases are covered:
   
   ```suggestion
           match (contains_first, contains_second) {
               (true, false) => {
                 prop.insert(new_condition.1.clone());
                 idx1 = idx as i32;
               } 
               (false, true)=> {
                 prop.insert(new_condition.0.clone());
                 idx2 = idx as i32;
               }
               (true, true) =>  {
                 idx1 = idx as i32;
                 idx2 = idx as i32;
                 break;
               }
               (false, false) => {}
           }
   ```



##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
     }
 }
 
+#[derive(Debug, Clone)]
+pub struct EquivalenceProperties {

Review Comment:
   I wonder if `EquivalenceClass` is a more specific name?
   
   Then you could make
   
   ```rust
   struct EquivalenceProperties {
     classes: Vec<EquivalentClass>
   }
   ```
   
   And move functions like `truncate_equivalence_properties_not_in_schema` on to 
   
   ```rust
   impl EquivalenceProperties {
     fn truncate_equivalence_properties_not_in_schema(&self, ..)
   }
   ```
   
   I don't think it is required, but it might keep the code easier to reason about / keep it behind an abstraction



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {

Review Comment:
   This is called `split_conjunction` in the logical optimizer -- perhaps it could be called the same thing in the physical layer. The logical expr implementation also avoids creating quite as many `Vec`s
   
   https://github.com/apache/arrow-datafusion/blob/345234550712173477e7807ba2cf67dd2ffb9ed5/datafusion/optimizer/src/utils.rs#L58-L78



##########
datafusion/core/src/physical_plan/joins/cross_join.rs:
##########
@@ -153,16 +156,27 @@ impl ExecutionPlan for CrossJoinExec {
         )?))
     }
 
+    // TODO optimize CrossJoin implementation to generate M * N partitions
     fn output_partitioning(&self) -> Partitioning {
-        self.right.output_partitioning()
+        let left_columns_len = self.left.schema().fields.len();
+        adjust_right_output_partitioning(
+            self.right.output_partitioning(),
+            left_columns_len,
+        )
     }
 
+    // TODO check the output ordering of CrossJoin

Review Comment:
   is this still a todo?



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -186,13 +194,26 @@ impl AggregateExec {
 
         let schema = Arc::new(schema);
 
+        let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();

Review Comment:
   Can you explain what this code is for?  It doesn't seem correct to me as I don't understand the  circumstances under which the output of be different 🤔 
   
   It seems like in this case the input logical plan maybe was incorrect?



##########
datafusion/physical-expr/src/physical_expr.rs:
##########
@@ -136,6 +138,67 @@ impl PhysicalExprStats for BasicExpressionStats {
     }
 }
 
+#[derive(Debug, Clone)]

Review Comment:
   What would you think about moving this into `datafusion/physical-expr/src/equivalence.rs` or something? Then we could move all the code that deals with equivalence classes into that module and keep them and the tests together
   
   



##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -231,6 +246,38 @@ impl RecordBatchStream for FilterExecStream {
     }
 }
 
+/// Return the equals Column-Pairs and Non-equals Column-Pairs
+fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> EqualAndNonEqual {

Review Comment:
   Perhaps this would be better in utils.rs



##########
datafusion/core/src/physical_plan/coalesce_batches.rs:
##########
@@ -96,12 +96,15 @@ impl ExecutionPlan for CoalesceBatchesExec {
         self.input.output_partitioning()
     }
 
+    // Depends on how the CoalesceBatches was implemented, it is possible to keep

Review Comment:
   There is also `SortPreservingMerge` that can be used to preserve order but there are tradeoffs there (specifically it takes more effort to keep the sort order than it does to append batches together)



##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec {
         true
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        true
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        let sort_keys = self.sort_keys.as_deref();
+        vec![sort_keys]
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        if self
-            .window_expr()
-            .iter()
-            .all(|expr| expr.partition_by().is_empty())
-        {
-            Distribution::SinglePartition
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if self.partition_keys.is_empty() {
+            warn!("No partition defined for WindowAggExec!!!");

Review Comment:
   I don't know why this would generate a warning -- can't this occur with a query like `SELECT ROW_NUMBER OVER () from foo` (as in an empty over clause)?



##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -119,22 +129,25 @@ impl ExecutionPlan for WindowAggExec {
         true
     }
 
-    fn relies_on_input_order(&self) -> bool {
-        true
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        let sort_keys = self.sort_keys.as_deref();
+        vec![sort_keys]
     }
 
-    fn required_child_distribution(&self) -> Distribution {
-        if self
-            .window_expr()
-            .iter()
-            .all(|expr| expr.partition_by().is_empty())
-        {
-            Distribution::SinglePartition
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if self.partition_keys.is_empty() {
+            warn!("No partition defined for WindowAggExec!!!");
+            vec![Distribution::SinglePartition]
         } else {
-            Distribution::UnspecifiedDistribution
+            //TODO support PartitionCollections if there is no common partition columns in the window_expr
+            vec![Distribution::HashPartitioned(self.partition_keys.clone())]

Review Comment:
   👍  I agree this sounds good



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }
+    }
+
+    if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+        // need to merge the two existing properties
+        let second_properties = eq_properties.get(idx2 as usize).unwrap().clone();
+        let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+        for prop in second_properties.iter() {
+            if !first_properties.contains(prop) {
+                first_properties.insert(prop.clone());
+            }
+        }
+        eq_properties.remove(idx2 as usize);
+    } else if idx1 == -1 && idx2 == -1 {
+        // adding new pairs
+        eq_properties.push(EquivalenceProperties::new(
+            new_condition.0.clone(),
+            vec![new_condition.1.clone()],
+        ))
+    }
+}
+
+pub fn remove_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    remove_condition: (&Column, &Column),
+) {
+    let mut match_idx = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(remove_condition.0);
+        let contains_second = prop.contains(remove_condition.1);
+        if contains_first && contains_second {
+            match_idx = idx as i32;
+            break;
+        }
+    }
+    if match_idx >= 0 {
+        let matches = eq_properties.get_mut(match_idx as usize).unwrap();
+        matches.remove(remove_condition.0);
+        matches.remove(remove_condition.1);
+        if matches.len() <= 1 {
+            eq_properties.remove(match_idx as usize);
+        }
+    }
+}
+
+pub fn merge_equivalence_properties_with_alias(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+) {
+    for (column, columns) in alias_map {
+        let mut find_match = false;
+        for (_idx, prop) in eq_properties.iter_mut().enumerate() {
+            if prop.contains(column) {
+                for col in columns {
+                    prop.insert(col.clone());
+                }
+                find_match = true;
+                break;
+            }
+        }
+        if !find_match {
+            eq_properties
+                .push(EquivalenceProperties::new(column.clone(), columns.clone()));
+        }
+    }
+}
+
+pub fn truncate_equivalence_properties_not_in_schema(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    schema: &SchemaRef,
+) {
+    for props in eq_properties.iter_mut() {
+        let mut columns_to_remove = vec![];
+        for column in props.iter() {
+            if let Ok(idx) = schema.index_of(column.name()) {
+                if idx != column.index() {
+                    columns_to_remove.push(column.clone());
+                }
+            } else {
+                columns_to_remove.push(column.clone());
+            }
+        }
+        for column in columns_to_remove {
+            props.remove(&column);
+        }
+    }
+    eq_properties.retain(|props| props.len() > 1);
+}
+
+/// Normalize the output expressions based on Alias Map and SchemaRef.
+///
+/// 1) If there is mapping in Alias Map, replace the Column in the output expressions with the 1st Column in Alias Map
+/// 2) If the Column is invalid for the current Schema, replace the Column with a place holder UnKnownColumn
+///
+pub fn normalize_out_expr_with_alias_schema(
+    expr: Arc<dyn PhysicalExpr>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+    schema: &SchemaRef,
+) -> Arc<dyn PhysicalExpr> {
+    let expr_clone = expr.clone();
+    expr_clone
+        .transform(&|expr| {
+            let normalized_form: Option<Arc<dyn PhysicalExpr>> =
+                match expr.as_any().downcast_ref::<Column>() {
+                    Some(column) => {
+                        let out = alias_map
+                            .get(column)
+                            .map(|c| {
+                                let out_col: Arc<dyn PhysicalExpr> =
+                                    Arc::new(c[0].clone());
+                                out_col
+                            })
+                            .or_else(|| match schema.index_of(column.name()) {
+                                // Exactly matching, return None, no need to do the transform
+                                Ok(idx) if column.index() == idx => None,
+                                _ => {
+                                    let out_col: Arc<dyn PhysicalExpr> =
+                                        Arc::new(UnKnownColumn::new(column.name()));
+                                    Some(out_col)
+                                }
+                            });
+                        out
+                    }
+                    None => None,
+                };
+            normalized_form
+        })
+        .unwrap_or(expr)
+}
+
+pub fn normalize_expr_with_equivalence_properties(
+    expr: Arc<dyn PhysicalExpr>,
+    eq_properties: &[EquivalenceProperties],
+) -> Arc<dyn PhysicalExpr> {
+    let mut normalized = expr.clone();
+    if let Some(column) = expr.as_any().downcast_ref::<Column>() {

Review Comment:
   Does this need to recursively rewrite exprs?
   
   Like what if `expr` was `A + B` and you had an equivalence class with `B = C`
   
   Wouldn't you have to rewrite `A + ` into `A + C`? But I don't see this code recursing.
   
   This kind of rewrite could be tested as well I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org