You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/12 11:35:35 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4184: [FOLLOWUP] Enforcement Rule: resolve review comments, refactor adjust_input_keys_ordering()

alamb commented on code in PR #4184:
URL: https://github.com/apache/arrow-datafusion/pull/4184#discussion_r1020749712


##########
datafusion/core/src/physical_plan/mod.rs:
##########
@@ -656,3 +657,81 @@ use crate::execution::context::TaskContext;
 pub use datafusion_physical_expr::{
     expressions, functions, hash_utils, type_coercion, udf,
 };
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::datatypes::DataType;
+    use arrow::datatypes::Schema;
+
+    use crate::physical_plan::Distribution;
+    use crate::physical_plan::Partitioning;
+    use crate::physical_plan::PhysicalExpr;
+    use datafusion_physical_expr::expressions::Column;
+
+    use std::sync::Arc;
+
+    #[tokio::test]
+    async fn partitioning_satisfy_distribution() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            arrow::datatypes::Field::new("column_1", DataType::Int64, false),
+            arrow::datatypes::Field::new("column_2", DataType::Utf8, false),
+        ]));
+
+        let partition_exprs1: Vec<Arc<dyn PhysicalExpr>> = vec![
+            Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
+            Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
+        ];
+
+        let partition_exprs2: Vec<Arc<dyn PhysicalExpr>> = vec![
+            Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
+            Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
+        ];
+
+        let distribution_types = vec![
+            Distribution::UnspecifiedDistribution,
+            Distribution::SinglePartition,
+            Distribution::HashPartitioned(partition_exprs1.clone()),
+        ];
+
+        let single_partition = Partitioning::UnknownPartitioning(1);
+        let unspecified_partition = Partitioning::UnknownPartitioning(10);
+        let round_robin_partition = Partitioning::RoundRobinBatch(10);
+        let hash_partition1 = Partitioning::Hash(partition_exprs1, 10);
+        let hash_partition2 = Partitioning::Hash(partition_exprs2, 10);
+
+        for distribution in distribution_types {

Review Comment:
   thank you



##########
datafusion/core/src/physical_optimizer/merge_exec.rs:
##########
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   `merge_exec` is a strange name for this module -- I am glad we got rid of it!



##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -247,11 +247,16 @@ mod tests {
         eq_properties.add_equal_conditions(new_condition);
         assert_eq!(eq_properties.classes().len(), 1);
         assert_eq!(eq_properties.classes()[0].len(), 2);
+        assert!(eq_properties.classes()[0].contains(&Column::new("a", 0)));
+        assert!(eq_properties.classes()[0].contains(&Column::new("b", 1)));
 
         let new_condition = (&Column::new("b", 1), &Column::new("c", 2));
         eq_properties.add_equal_conditions(new_condition);
         assert_eq!(eq_properties.classes().len(), 1);
         assert_eq!(eq_properties.classes()[0].len(), 3);
+        assert!(eq_properties.classes()[0].contains(&Column::new("a", 0)));

Review Comment:
   Thank you



##########
datafusion/core/src/physical_plan/rewrite.rs:
##########
@@ -149,14 +149,16 @@ pub enum RewriteRecursion {
     Skip,
 }
 
+#[allow(clippy::vtable_address_comparisons)]

Review Comment:
   Do you know why  `[vtable](clippy::vtable_address_comparisons)` is needed here but not in the seemingly very similar other changes in this PR?



##########
datafusion/core/src/execution/context.rs:
##########
@@ -1589,8 +1589,9 @@ impl SessionState {
             )));
         }
         physical_optimizers.push(Arc::new(Repartition::new()));
+        // Repartition rule could introduce additional RepartitionExec with RoundRobin partitioning.
+        // To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here.

Review Comment:
   👍 



##########
datafusion/core/src/physical_optimizer/enforcement.rs:
##########
@@ -885,6 +956,70 @@ struct JoinKeyPairs {
     right_keys: Vec<Arc<dyn PhysicalExpr>>,
 }
 
+#[derive(Debug, Clone)]

Review Comment:
   Nice



##########
datafusion/core/src/physical_optimizer/enforcement.rs:
##########
@@ -99,13 +105,51 @@ impl PhysicalOptimizerRule for BasicEnforcement {
 
 /// When the physical planner creates the Joins, the ordering of join keys is from the original query.
 /// That might not match with the output partitioning of the join node's children
-/// This method runs a top-down process and try to adjust the output partitioning of the children
-/// if the children themselves are Joins or Aggregations.
-fn adjust_input_keys_down_recursively(
-    plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
-    parent_required: Vec<Arc<dyn PhysicalExpr>>,
-) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
-    let plan_any = plan.as_any();
+/// A Top-Down process will use this method to adjust children's output partitioning based on the parent key reordering requirements:

Review Comment:
   ❤️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org