You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/12 15:45:08 UTC
[arrow-datafusion] branch main updated: Improve parallelism of repartition operator (#6310)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d0aadd6bae Improve parallelism of repartition operator (#6310)
d0aadd6bae is described below
commit d0aadd6bae545030ad6e2b5d519a916691225998
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri May 12 11:45:01 2023 -0400
Improve parallelism of repartition operator (#6310)
---
.../core/src/physical_plan/repartition/mod.rs | 34 ++++++++++++++++++++--
1 file changed, 31 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs
index 67fc63d235..32605cd977 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -220,6 +220,14 @@ impl BatchPartitioner {
Ok(it)
}
+
+ // return the number of output partitions
+ fn num_partitions(&self) -> usize {
+ match self.state {
+ BatchPartitionerState::RoundRobin { num_partitions, .. } => num_partitions,
+ BatchPartitionerState::Hash { num_partitions, .. } => num_partitions,
+ }
+ }
}
/// The repartition operator maps N input partitions to M output partitions based on a
@@ -502,6 +510,7 @@ impl RepartitionExec {
// While there are still outputs to send to, keep
// pulling inputs
+ let mut batches_until_yield = partitioner.num_partitions();
while !txs.is_empty() {
// fetch the next batch
let timer = r_metrics.fetch_time.timer();
@@ -532,9 +541,28 @@ impl RepartitionExec {
timer.done();
}
- // If the input stream is endless, we may spin forever and never yield back to tokio. Hence let us yield.
- // See https://github.com/apache/arrow-datafusion/issues/5278.
- tokio::task::yield_now().await;
+ // If the input stream is endless, we may spin forever and
+ // never yield back to tokio. See
+ // https://github.com/apache/arrow-datafusion/issues/5278.
+ //
+ // However, yielding on every batch causes a bottleneck
+ // when running with multiple cores. See
+ // https://github.com/apache/arrow-datafusion/issues/6290
+ //
+ // Thus, heuristically yield after producing num_partition
+ // batches
+ //
+ // In round robin this is ideal as each input will get a
+ // new batch. In hash partitioning it may yield too often
+ // on uneven distributions even if some partition can not
+ // make progress, but parallelism is going to be limited
+ // in that case anyways
+ if batches_until_yield == 0 {
+ tokio::task::yield_now().await;
+ batches_until_yield = partitioner.num_partitions();
+ } else {
+ batches_until_yield -= 1;
+ }
}
Ok(())