You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/09/07 18:31:30 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #7488: Refactor the EnforceDistribution Rule

alamb commented on code in PR #7488:
URL: https://github.com/apache/arrow-datafusion/pull/7488#discussion_r1318958524


##########
datafusion/sqllogictest/test_files/joins.slt:
##########
@@ -3192,17 +3200,22 @@ Sort: l_table.a ASC NULLS FIRST, l_table.b ASC NULLS LAST, l_table.c ASC NULLS L
 ----------TableScan: annotated_data projection=[a0, a, b, c, d]
 physical_plan
 SortPreservingMergeExec: [a@1 ASC,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST,rn1@11 ASC NULLS LAST]
---SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
-----CoalesceBatchesExec: target_batch_size=4096
-------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
---------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1]
-----------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }], mode=[Sorted]
-------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true
-----CoalesceBatchesExec: target_batch_size=4096
-------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
---------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1]
-----------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }], mode=[Sorted]
-------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true
+--SortExec: expr=[a@1 ASC,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST,rn1@11 ASC NULLS LAST]
+----SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
+------SortExec: expr=[a@1 ASC]
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1

Review Comment:
   Just as a comment (nothing to change in this PR) -- I feel like this construction of a dual `RepartitionExec` is likely less ideal that a single `RepartitionExec` that repartitions using hash partitioning and handles the concurrency internally



##########
datafusion/core/src/physical_plan/repartition/mod.rs:
##########
@@ -637,7 +637,8 @@ impl RepartitionExec {
 
     /// Set Order preserving flag
     pub fn with_preserve_order(mut self, preserve_order: bool) -> Self {
-        self.preserve_order = preserve_order;
+        // Set "preserve order" mode only if the operator cannot maintain the order:
+        self.preserve_order = preserve_order && !self.maintains_input_order()[0];

Review Comment:
   I am confused about the need for this change. Specifically that `self.preserve_order` depends partially on its previous value, which is used to calculate `self.maintains_input_order()`
   
   https://github.com/apache/arrow-datafusion/blob/63e452ace04e8fc704aff99457b5503ebd56116d/datafusion/core/src/physical_plan/repartition/mod.rs#L453-L460
   
   Could we perhaps update the calculation to be in terms of `self.input().output_partitioning().partition_count()` to make it clearer ?



##########
datafusion/sqllogictest/test_files/union.slt:
##########
@@ -466,8 +466,8 @@ UnionExec
 ----AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1)], aggr=[]
 ------CoalesceBatchesExec: target_batch_size=8192
 --------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=4
-----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-------------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[]
+----------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[]

Review Comment:
   this seems like a better plan as it will repartition the input (though it probably doesn't matter as the input produces a single row)



##########
datafusion/core/src/physical_optimizer/dist_enforcement.rs:
##########
@@ -16,10 +16,20 @@
 // under the License.
 
 //! EnforceDistribution optimizer rule inspects the physical plan with respect
-//! to distribution requirements and adds [RepartitionExec]s to satisfy them
-//! when necessary.
+//! to distribution requirements and adds [`RepartitionExec`]s to satisfy them
+//! when necessary. If increasing parallelism is beneficial (and also desirable
+//! according to the configuration), this rule increases partition counts in
+//! the physical plan.
+
+use std::fmt;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
 use crate::config::ConfigOptions;
-use crate::error::Result;
+use crate::datasource::physical_plan::{CsvExec, ParquetExec};
+use crate::error::{DataFusionError, Result};
+use crate::physical_optimizer::sort_enforcement::{unbounded_output, ExecTree};

Review Comment:
   It seems like it would be good to pull the things that are now shared between `EnforceSort` and `EnforceDistribution` into a common location might make the dependency structure clearer
   
   `ExecTree` in particular seems like a valuable concept that is not specific to SortEnforcement



##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -2605,51 +2623,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    // With new change in SortEnforcement EnforceSorting->EnforceDistribution->EnforceSorting

Review Comment:
   What is the reason to remove this test? This PR's description implies that `EnforceDistribution` is now idempotent so I would expect this test to continue to work as before 🤔 



##########
datafusion/core/src/physical_optimizer/dist_enforcement.rs:
##########
@@ -29,29 +39,144 @@ use crate::physical_plan::joins::{
 use crate::physical_plan::projection::ProjectionExec;
 use crate::physical_plan::repartition::RepartitionExec;
 use crate::physical_plan::sorts::sort::SortOptions;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use crate::physical_plan::union::{can_interleave, InterleaveExec, UnionExec};
 use crate::physical_plan::windows::WindowAggExec;
 use crate::physical_plan::Partitioning;
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
+
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
 use datafusion_expr::logical_plan::JoinType;
 use datafusion_physical_expr::equivalence::EquivalenceProperties;
-use datafusion_physical_expr::expressions::Column;
-use datafusion_physical_expr::expressions::NoOp;
-use datafusion_physical_expr::utils::map_columns_before_projection;
+use datafusion_physical_expr::expressions::{Column, NoOp};
+use datafusion_physical_expr::utils::{
+    map_columns_before_projection, ordering_satisfy_requirement_concrete,
+};
 use datafusion_physical_expr::{
     expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, PhysicalExpr,
+    PhysicalSortRequirement,
 };
-use std::sync::Arc;
 
-/// The EnforceDistribution rule ensures that distribution requirements are met
-/// in the strictest way. It might add additional [RepartitionExec] to the plan tree
-/// and give a non-optimal plan, but it can avoid the possible data skew in joins.
+use datafusion_common::internal_err;
+use itertools::izip;
+
+/// The `EnforceDistribution` rule ensures that distribution requirements are
+/// met. In doing so, this rule will increase the parallelism in the plan by
+/// introducing repartitioning operators to the physical plan.
+///
+/// For example, given an input such as:
+///
 ///
-/// For example for a HashJoin with keys(a, b, c), the required Distribution(a, b, c) can be satisfied by
-/// several alternative partitioning ways: [(a, b, c), (a, b), (a, c), (b, c), (a), (b), (c), ( )].
+/// ```text

Review Comment:
   ❤️  thank you for keeping the comments



##########
datafusion/sqllogictest/test_files/decimal.slt:
##########
@@ -618,7 +618,12 @@ select a / b from foo;
 statement ok
 create table t as values (arrow_cast(123, 'Decimal256(5,2)'));
 
-query error DataFusion error: This feature is not implemented: AvgAccumulator for \(Decimal256\(5, 2\) \-\-> Decimal256\(9, 6\)\)
+# make sure query below runs in single partition

Review Comment:
   👍 



##########
datafusion/core/src/physical_optimizer/dist_enforcement.rs:
##########
@@ -16,10 +16,20 @@
 // under the License.
 
 //! EnforceDistribution optimizer rule inspects the physical plan with respect
-//! to distribution requirements and adds [RepartitionExec]s to satisfy them
-//! when necessary.
+//! to distribution requirements and adds [`RepartitionExec`]s to satisfy them

Review Comment:
   very minor comment is that calling this file `datafusion/core/src/physical_optimizer/enforce_distribution.rs` would be more consistent with the `EnforceDistribtion` rule it contains -- but I realize this is not a change you made in this PR



##########
datafusion/sqllogictest/test_files/decimal.slt:
##########
@@ -618,7 +618,12 @@ select a / b from foo;
 statement ok
 create table t as values (arrow_cast(123, 'Decimal256(5,2)'));
 
-query error DataFusion error: This feature is not implemented: AvgAccumulator for \(Decimal256\(5, 2\) \-\-> Decimal256\(9, 6\)\)
+# make sure query below runs in single partition

Review Comment:
   👍 



##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -2605,51 +2623,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    // With new change in SortEnforcement EnforceSorting->EnforceDistribution->EnforceSorting

Review Comment:
   What is the reason to remove this test? This PR's description implies that `EnforceDistribution` is now idempotent so I would expect this test to continue to work as before 🤔 



##########
datafusion/core/src/physical_optimizer/dist_enforcement.rs:
##########
@@ -29,29 +39,144 @@ use crate::physical_plan::joins::{
 use crate::physical_plan::projection::ProjectionExec;
 use crate::physical_plan::repartition::RepartitionExec;
 use crate::physical_plan::sorts::sort::SortOptions;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use crate::physical_plan::union::{can_interleave, InterleaveExec, UnionExec};
 use crate::physical_plan::windows::WindowAggExec;
 use crate::physical_plan::Partitioning;
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
+
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
 use datafusion_expr::logical_plan::JoinType;
 use datafusion_physical_expr::equivalence::EquivalenceProperties;
-use datafusion_physical_expr::expressions::Column;
-use datafusion_physical_expr::expressions::NoOp;
-use datafusion_physical_expr::utils::map_columns_before_projection;
+use datafusion_physical_expr::expressions::{Column, NoOp};
+use datafusion_physical_expr::utils::{
+    map_columns_before_projection, ordering_satisfy_requirement_concrete,
+};
 use datafusion_physical_expr::{
     expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, PhysicalExpr,
+    PhysicalSortRequirement,
 };
-use std::sync::Arc;
 
-/// The EnforceDistribution rule ensures that distribution requirements are met
-/// in the strictest way. It might add additional [RepartitionExec] to the plan tree
-/// and give a non-optimal plan, but it can avoid the possible data skew in joins.
+use datafusion_common::internal_err;
+use itertools::izip;
+
+/// The `EnforceDistribution` rule ensures that distribution requirements are
+/// met. In doing so, this rule will increase the parallelism in the plan by
+/// introducing repartitioning operators to the physical plan.
+///
+/// For example, given an input such as:
+///
 ///
-/// For example for a HashJoin with keys(a, b, c), the required Distribution(a, b, c) can be satisfied by
-/// several alternative partitioning ways: [(a, b, c), (a, b), (a, c), (b, c), (a), (b), (c), ( )].
+/// ```text

Review Comment:
   ❤️  thank you for keeping the comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org