You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/12 15:45:08 UTC

[arrow-datafusion] branch main updated: Improve parallelism of repartition operator (#6310)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d0aadd6bae Improve parallelism of repartition operator (#6310)
d0aadd6bae is described below

commit d0aadd6bae545030ad6e2b5d519a916691225998
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri May 12 11:45:01 2023 -0400

    Improve parallelism of repartition operator (#6310)
---
 .../core/src/physical_plan/repartition/mod.rs      | 34 ++++++++++++++++++++--
 1 file changed, 31 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs
index 67fc63d235..32605cd977 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -220,6 +220,14 @@ impl BatchPartitioner {
 
         Ok(it)
     }
+
+    // return the number of output partitions
+    fn num_partitions(&self) -> usize {
+        match self.state {
+            BatchPartitionerState::RoundRobin { num_partitions, .. } => num_partitions,
+            BatchPartitionerState::Hash { num_partitions, .. } => num_partitions,
+        }
+    }
 }
 
 /// The repartition operator maps N input partitions to M output partitions based on a
@@ -502,6 +510,7 @@ impl RepartitionExec {
 
         // While there are still outputs to send to, keep
         // pulling inputs
+        let mut batches_until_yield = partitioner.num_partitions();
         while !txs.is_empty() {
             // fetch the next batch
             let timer = r_metrics.fetch_time.timer();
@@ -532,9 +541,28 @@ impl RepartitionExec {
                 timer.done();
             }
 
-            // If the input stream is endless, we may spin forever and never yield back to tokio. Hence let us yield.
-            // See https://github.com/apache/arrow-datafusion/issues/5278.
-            tokio::task::yield_now().await;
+            // If the input stream is endless, we may spin forever and
+            // never yield back to tokio.  See
+            // https://github.com/apache/arrow-datafusion/issues/5278.
+            //
+            // However, yielding on every batch causes a bottleneck
+            // when running with multiple cores. See
+            // https://github.com/apache/arrow-datafusion/issues/6290
+            //
+            // Thus, heuristically yield after producing num_partition
+            // batches
+            //
+            // In round robin this is ideal as each input will get a
+            // new batch. In hash partitioning it may yield too often
+            // on uneven distributions even if some partition can not
+            // make progress, but parallelism is going to be limited
+            // in that case anyways
+            if batches_until_yield == 0 {
+                tokio::task::yield_now().await;
+                batches_until_yield = partitioner.num_partitions();
+            } else {
+                batches_until_yield -= 1;
+            }
         }
 
         Ok(())