You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/05/09 15:16:58 UTC

[GitHub] [arrow-datafusion] alamb opened a new pull request, #6310: Improve parallelism of repartition operator

alamb opened a new pull request, #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310

   # Which issue does this PR close?
   
   Close https://github.com/apache/arrow-datafusion/issues/6290
   
   # Rationale for this change
   
   I was testing query performance for https://github.com/apache/arrow-datafusion/issues/6278 and noticed that only. a single core was being used on a query entirely in memory. When I spent some time looking into it, the plan looked correct with repartitioning but for some reason it wasn't properly repartitoning 
   
   # What changes are included in this PR?
   
   Don't yield on *every* batch -- yield only after we have made some decent progress (in this case at least `partition_count` batches)
   
   # Are these changes tested?
   I manually tested this -- I will add a benchmark for it shortly
   
   My manual test results are:
   
   On main (Keeps only 1 core busy for most of the time)
   ```
   4 rows in set. Query took 10.631 seconds.
   ```
   
   With this PR (keeps the cores much busier)
   ```
   4 rows in set. Query took 3.705 seconds.
   ```
   
   
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on pull request #6310: Improve parallelism of repartition operator with multiple cores

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on PR #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310#issuecomment-1545881609

   Side note: when I've added the yield statement I was wondering if this would be too much overhead, but my assumption was the the batches would hopefully be big enough so that wouldn't matter. Seems that I was wrong 😅


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #6310: Improve parallelism of repartition operator with multiple cores

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6310: Improve parallelism of repartition operator with multiple cores

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310#discussion_r1188816056


##########
datafusion/core/src/physical_plan/repartition/mod.rs:
##########
@@ -532,9 +541,28 @@ impl RepartitionExec {
                 timer.done();
             }
 
-            // If the input stream is endless, we may spin forever and never yield back to tokio. Hence let us yield.
-            // See https://github.com/apache/arrow-datafusion/issues/5278.
-            tokio::task::yield_now().await;
+            // If the input stream is endless, we may spin forever and

Review Comment:
   I think if the tokio executor has only a single thread and the input stream can provide data infinitely, without a yield it will buffer the entire input which seems non ideal 
   
   I agree https://github.com/apache/arrow-datafusion/issues/5278 as described seems somewhat more like "when we used blocking IO with a single tokio thread it blocked everything" -- as described on https://github.com/apache/arrow-datafusion/issues/5278#issuecomment-1432901387



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #6310: Improve parallelism of repartition operator with multiple cores

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310#issuecomment-1542956529

   Hi @alamb, @metesynnada is unavailable for this month, he will be back at the end of the month. I checked this PR out and it  looks better than the status quo to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6310: Improve parallelism of repartition operator with multiple cores

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310#issuecomment-1545866980

   I merged up from main -- and once CI passes I plan to merge this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6310: Improve parallelism of repartition operator with multiple cores

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310#discussion_r1188776479


##########
datafusion/core/src/physical_plan/repartition/mod.rs:
##########
@@ -532,9 +541,28 @@ impl RepartitionExec {
                 timer.done();
             }
 
-            // If the input stream is endless, we may spin forever and never yield back to tokio. Hence let us yield.
-            // See https://github.com/apache/arrow-datafusion/issues/5278.
-            tokio::task::yield_now().await;
+            // If the input stream is endless, we may spin forever and

Review Comment:
   I would love some thoughts from reviewers about better heuristics here -- as the comments say I am happy with this heuristic for round robin partitioning but there may be a better way when hash partitioning (like ensure that all channels have at least one batch 🤔 )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6310: Improve parallelism of repartition operator with multiple cores

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310#issuecomment-1541059949

   FYI @metesynnada I don't know if you have any thoughts on this approach (or the yield in general, as you reported https://github.com/apache/arrow-datafusion/issues/5278 initially)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6310: Improve parallelism of repartition operator

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310#issuecomment-1540362038

   cc @crepererum 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #6310: Improve parallelism of repartition operator with multiple cores

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on code in PR #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310#discussion_r1189155285


##########
datafusion/core/src/physical_plan/repartition/mod.rs:
##########
@@ -532,9 +541,28 @@ impl RepartitionExec {
                 timer.done();
             }
 
-            // If the input stream is endless, we may spin forever and never yield back to tokio. Hence let us yield.
-            // See https://github.com/apache/arrow-datafusion/issues/5278.
-            tokio::task::yield_now().await;
+            // If the input stream is endless, we may spin forever and

Review Comment:
   You can call it a bug or a design issue of DF / tokio. But if you run two spawned tasks and one never returns to Tokio then the other will never run. Unbounded buffers are NOT avoidable in the current DF design, because you cannot predict tokio scheduling and hash outputs. So the fix here is adequate. `consume_budget` would be the better solution but it's an unstable tokio feature, so that's not usable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6310: Improve parallelism of repartition operator with multiple cores

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310#issuecomment-1543884607

   I plan to leave this open for another day or two for comments and then will merge in 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6310: Improve parallelism of repartition operator with multiple cores

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310#discussion_r1188801003


##########
datafusion/core/src/physical_plan/repartition/mod.rs:
##########
@@ -532,9 +541,28 @@ impl RepartitionExec {
                 timer.done();
             }
 
-            // If the input stream is endless, we may spin forever and never yield back to tokio. Hence let us yield.
-            // See https://github.com/apache/arrow-datafusion/issues/5278.
-            tokio::task::yield_now().await;
+            // If the input stream is endless, we may spin forever and

Review Comment:
   You could use https://docs.rs/tokio/latest/tokio/task/fn.consume_budget.html but I'm honestly a little confused by this. "we may spin forever" would imply an issue with unbounded receivers, not an issue with the repartition operator?
   
   TLDR I'd vote to not yield at all, I don't agree that this fixes #5278 rather just papers over it with a dubious fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6310: Improve parallelism of repartition operator with multiple cores

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310#discussion_r1188830727


##########
datafusion/core/src/physical_plan/repartition/mod.rs:
##########
@@ -532,9 +541,28 @@ impl RepartitionExec {
                 timer.done();
             }
 
-            // If the input stream is endless, we may spin forever and never yield back to tokio. Hence let us yield.
-            // See https://github.com/apache/arrow-datafusion/issues/5278.
-            tokio::task::yield_now().await;
+            // If the input stream is endless, we may spin forever and

Review Comment:
   > it will buffer the entire input which seems non ideal
   
   That seems like a bug in whatever is using unbounded buffers, which I thought we had removed the last of? Basically we shouldn't be relying on yield_now to return control, but the buffer filling up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6310: Improve parallelism of repartition operator with multiple cores

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310#discussion_r1188830727


##########
datafusion/core/src/physical_plan/repartition/mod.rs:
##########
@@ -532,9 +541,28 @@ impl RepartitionExec {
                 timer.done();
             }
 
-            // If the input stream is endless, we may spin forever and never yield back to tokio. Hence let us yield.
-            // See https://github.com/apache/arrow-datafusion/issues/5278.
-            tokio::task::yield_now().await;
+            // If the input stream is endless, we may spin forever and

Review Comment:
   > it will buffer the entire input which seems non ideal
   
   That seems like a bug in whatever is using unbounded buffers, which I thought we had removed the last of?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6310: Improve parallelism of repartition operator with multiple cores

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6310:
URL: https://github.com/apache/arrow-datafusion/pull/6310#issuecomment-1545945539

   >  Side note: when I've added the yield statement I was wondering if this would be too much overhead, but my assumption was the the batches would hopefully be big enough so that wouldn't matter. Seems that I was wrong 😅
   
   I think the fact that the source in this case is a MemoryExec (where all the data is already in memory and can be provided almost instantaneously hurts us)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org