You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "jacobtomlinson (via GitHub)" <gi...@apache.org> on 2023/05/11 13:59:20 UTC

[GitHub] [beam] jacobtomlinson opened a new issue, #26669: [Bug]: Dask runner limited to 100 concurrent tasks

jacobtomlinson opened a new issue, #26669:
URL: https://github.com/apache/beam/issues/26669

   ### What happened?
   
   If I call `beam.Create` with an iterable that has >=200 items then Dask only executes 100 tasks concurrently. If it has <199 items it executes all items concurrently.
   
   If my Dask cluster has more than 200 worker I cannot fully saturate the cluster.
   
   ```python
   import apache_beam as beam
   from apache_beam.options.pipeline_options import PipelineOptions
   from apache_beam.runners.dask.dask_runner import DaskRunner
   
   
   class NoopDoFn(beam.DoFn):
       def process(self, item):
           import time
           time.sleep(10)
           return [item]
   
   
   def main() -> None:
       # If this is 199 I get one task per item per stage (800 tasks, 199 concurrent)
       # If this is 200 I get max 100 tasks per stage (404 tasks, 100 concurrent)
       n_items = 199
   
       pipeline = beam.Pipeline(
           runner=DaskRunner(),
           options=PipelineOptions(["--dask_client_address", "localhost:8786"]),
       )
   
       (
           pipeline
           | "Create collection" >> beam.Create(range(n_items))
           | "Noop 1" >> beam.ParDo(NoopDoFn())
           | "Noop 2" >> beam.ParDo(NoopDoFn())
           | "Noop 3" >> beam.ParDo(NoopDoFn())
       )
   
       result = pipeline.run()
       result.wait_until_finish()
   
   
   if __name__ == "__main__":
       main()
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] alxmrs commented on issue #26669: [Bug]: Dask runner limited to 100 concurrent tasks

Posted by "alxmrs (via GitHub)" <gi...@apache.org>.
alxmrs commented on issue #26669:
URL: https://github.com/apache/beam/issues/26669#issuecomment-1581293444

   I put time on your cal for tomorrow :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jacobtomlinson commented on issue #26669: [Bug]: Dask runner limited to 100 concurrent tasks

Posted by "jacobtomlinson (via GitHub)" <gi...@apache.org>.
jacobtomlinson commented on issue #26669:
URL: https://github.com/apache/beam/issues/26669#issuecomment-1552877898

   After exploring this further there seem to be two separate things going on here.
   
   First is that Dask Bag has a limit on the number of partitions it creates by default. This limit is 100, but due to some integer rounding it will overshoot until it is a multiple of 100 and then collapse again. This is why we see a ~50% performance drop between 199 items and 200 items, then a small increase before dropping again at 300, etc. I've opened https://github.com/dask/dask/pull/10294 to address this so that the number of tasks continues to grow with the number of items, just in a non-linear way.
   
   The second is that Beam is always using a Dask Distributed cluster, and at the time of execution has an instance if `dask.distributed.Client`. Therefore we could override the default partitioning scheme altogether and explicitly set the number of partitions to match the number of workers in the cluster. This would require Beam to somehow communicate this information to the `Create` node when it walks the pipeline.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on issue #26669: [Bug]: Dask runner limited to 100 concurrent tasks

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on issue #26669:
URL: https://github.com/apache/beam/issues/26669#issuecomment-1549910895

   cc: @alxmrs 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jacobtomlinson commented on issue #26669: [Bug]: Dask runner limited to 100 concurrent tasks

Posted by "jacobtomlinson (via GitHub)" <gi...@apache.org>.
jacobtomlinson commented on issue #26669:
URL: https://github.com/apache/beam/issues/26669#issuecomment-1579019317

   Quick update. The latest Dask release (`2023.5.1`) includes the fix to Dask Bag's partitioning heuristic which means that the number of partitions will continue to increase as more items are added to the bag. This partially works around the problem we are seeing here because in theory with enough items in the PCollection you can saturate any number of Dask workers. However in reality you may not actually have enough items in your dataset so it may not solve this problem for everyone.
   
   A complete fix for this would be for Beam to explicitly set the number of partitions in the bag equal to the number of workers in the cluster. Beam knows this information at execution time so it should be a case of passing this along. I started looking into proposing a fix for this but it seems the bag gets created lazily at a point in time where we don't yet have a Dask Distributed Client. I think I'm at the point where I need someone more familiar with Beam like @alxmrs to point me in the right direction of what to do next.
   
   cc @rabernat who may have a passing interest in this issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Dask runner limited to 100 concurrent tasks [beam]

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm closed issue #26669: [Bug]: Dask runner limited to 100 concurrent tasks
URL: https://github.com/apache/beam/issues/26669


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on issue #26669: [Bug]: Dask runner limited to 100 concurrent tasks

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on issue #26669:
URL: https://github.com/apache/beam/issues/26669#issuecomment-1544094464

   cc: @pabloem 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] alxmrs commented on issue #26669: [Bug]: Dask runner limited to 100 concurrent tasks

Posted by "alxmrs (via GitHub)" <gi...@apache.org>.
alxmrs commented on issue #26669:
URL: https://github.com/apache/beam/issues/26669#issuecomment-1579601997

   Hey Jacob,
   
   Thanks for looking into this! Are you free this Thursday or Friday after 1pm PST? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jacobtomlinson commented on issue #26669: [Bug]: Dask runner limited to 100 concurrent tasks

Posted by "jacobtomlinson (via GitHub)" <gi...@apache.org>.
jacobtomlinson commented on issue #26669:
URL: https://github.com/apache/beam/issues/26669#issuecomment-1580788591

   Hey Alex, I'm afraid I'm in the UK so that time could be tricky. Feel free to find something in my calendar  https://jacobtomlinson.dev/calendar. If nothing there works ping me again and we can sort something out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org