You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/30 22:02:07 UTC

[GitHub] [airflow] ArtinSarraf opened a new issue #16739: Queue support for DaskExecutor using Dask Worker Resources

ArtinSarraf opened a new issue #16739:
URL: https://github.com/apache/airflow/issues/16739


   Currently airflow's DaskExecutor does  not support specifying queues for tasks, due to dask's lack of an explicit queue specification feature. However, this can be reliably mimicked using dask resources ([details here](https://distributed.dask.org/en/latest/resources.html)). So the set up would look something like this:
   
   ```
   # starting dask worker that can service airflow tasks submitted with queue=queue_name_1 or queue_name_2
   $ dask-worker <address> --resources "queue_name_1=9999999, queue_name_2=9999999"
   ```
   Unfortunately as far as I know you need to provide a finite resource limit for the workers, so you'd need to provide a large limit, but I think it's worth the minor inconvenience to allow a queue functionality in the dask executor.
   
   ```
   # airflow/executors/dask_executor.py
   def execute_async(
       self,
       key: TaskInstanceKey,
       command: CommandType,
       queue: Optional[str] = None,
       executor_config: Optional[Any] = None,
   ) -> None:
   
       self.validate_command(command)
   
       def airflow_run():
           return subprocess.check_call(command, close_fds=True)
   
       if not self.client:
           raise AirflowException(NOT_STARTED_MESSAGE)
   
       ################ change made here #################
       resources = None
       if  queue:
           resources = {queue: 1}
   
       future = self.client.submit(airflow_run, pure=False,  resources=resources)
       self.futures[future] = key  # type: ignore
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aa1371 commented on issue #16739: Queue support for DaskExecutor using Dask Worker Resources

Posted by GitBox <gi...@apache.org>.
aa1371 commented on issue #16739:
URL: https://github.com/apache/airflow/issues/16739#issuecomment-872005508


   I'd want to get feedback from some more experienced dask users/contributors to the airflow DaskExecutor on this before continuing with creating a PR.
   @fjetter @TomAugspurger 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aa1371 commented on issue #16739: Queue support for DaskExecutor using Dask Worker Resources

Posted by GitBox <gi...@apache.org>.
aa1371 commented on issue #16739:
URL: https://github.com/apache/airflow/issues/16739#issuecomment-873166328


   Even without the infinite limit I think that resources are still a good fit for this. Using workers would require keeping some additional mapping of `queue_name -> workers` which resources essentially already implicitly gives you. Also, based on my related [dask repo discussion here](https://github.com/dask/distributed/discussions/5010#discussioncomment-957752), I think that you don't even need an arbitrarily large resource limit, and that a resource limit of 1 should be sufficient.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aa1371 edited a comment on issue #16739: Queue support for DaskExecutor using Dask Worker Resources

Posted by GitBox <gi...@apache.org>.
aa1371 edited a comment on issue #16739:
URL: https://github.com/apache/airflow/issues/16739#issuecomment-874953870


   @fjetter thanks for the insight. I think worker resources is the best way forward. Since it lets you tag your workers at creation time and then dispatch your airflow tasks based on that tagged name  (i.e. queue name), without needing to keep track of explicit workers within airflow. Also it turns out there is a way to define infinite worker resources in dask workers (https://github.com/dask/distributed/discussions/5010#discussioncomment-971219), so this will let you define the resource on the worker without having to provide an arbitrarily large limit, or having to worry about how many tasks could possibly run concurrently on your worker.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #16739: Queue support for DaskExecutor using Dask Worker Resources

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #16739:
URL: https://github.com/apache/airflow/issues/16739#issuecomment-871755672


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] fjetter commented on issue #16739: Queue support for DaskExecutor using Dask Worker Resources

Posted by GitBox <gi...@apache.org>.
fjetter commented on issue #16739:
URL: https://github.com/apache/airflow/issues/16739#issuecomment-874578147


   I'm not familiar enough with the queue functionality of airflow to know what the expected behaviour should be. In dask we have broadly speaking two ti three mechanism to limit concurrency on task level and/or control assignments to workers.
   
   If you want to limit the number of assigned tasks, i.e. want to ensure that tasks are not yet assigned to a worker before it is allowed to be executed, resources are the way to go. 
   
   If you want to control which workers are allowed to work on a given task, the `workers` keyword might be a better fit but that doesn't control concurrency (other than the intrinsic limit a single worker exposes)
   
   If you want to ensure that only a limited number of tasks is executed but it is fine for them to be assigned to a worker and may even block a worker, we have a [Semaphore](https://distributed.dask.org/en/latest/api.html#distributed.Semaphore) which could be used. 
   
   Which is the best to pick depends on how queuing in airflow is supposed to work


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aa1371 commented on issue #16739: Queue support for DaskExecutor using Dask Worker Resources

Posted by GitBox <gi...@apache.org>.
aa1371 commented on issue #16739:
URL: https://github.com/apache/airflow/issues/16739#issuecomment-872838305


   In the meantime I opened a PR in my own forked repo to expand on the proposed changes above.
   https://github.com/aa1371/airflow/pull/1/files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aa1371 edited a comment on issue #16739: Queue support for DaskExecutor using Dask Worker Resources

Posted by GitBox <gi...@apache.org>.
aa1371 edited a comment on issue #16739:
URL: https://github.com/apache/airflow/issues/16739#issuecomment-872005508


   I'd want to get feedback from some more experienced dask users/contributors to the airflow DaskExecutor on this before continuing with creating a PR.
   @fjetter @TomAugspurger @jrbourbeau @jlowin


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed issue #16739: Queue support for DaskExecutor using Dask Worker Resources

Posted by GitBox <gi...@apache.org>.
potiuk closed issue #16739:
URL: https://github.com/apache/airflow/issues/16739


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aa1371 edited a comment on issue #16739: Queue support for DaskExecutor using Dask Worker Resources

Posted by GitBox <gi...@apache.org>.
aa1371 edited a comment on issue #16739:
URL: https://github.com/apache/airflow/issues/16739#issuecomment-873166328


   Even without the infinite limit I think that resources are still a good fit for this. Using workers would require keeping some additional mapping of `queue_name -> workers` which resources essentially already implicitly gives you. Also, based on my related [dask repo discussion here](https://github.com/dask/distributed/discussions/5010#discussioncomment-957752), I think that you don't even need an arbitrarily large resource limit, and that a resource limit of 1 should be sufficient.
   
   Edit: disregard my point about a limit of 1. This was only true in my specific case since I always create workers with nthreads=1, but otherwise it will block tasks that could otherwise run concurrently. So in the general case, turns out you can start your workers with `--resources=RESOURCE_NAME=inf`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #16739: Queue support for DaskExecutor using Dask Worker Resources

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #16739:
URL: https://github.com/apache/airflow/issues/16739#issuecomment-871976258


   Would you be willing to make a PR for that? I don't think there are many people using DaskExecutor and since you seem to know what you are doing and use Dask, that might be great contribution back. The contribution process is nicely described with some useful guidelines and tools https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] fjetter commented on issue #16739: Queue support for DaskExecutor using Dask Worker Resources

Posted by GitBox <gi...@apache.org>.
fjetter commented on issue #16739:
URL: https://github.com/apache/airflow/issues/16739#issuecomment-874578147


   I'm not familiar enough with the queue functionality of airflow to know what the expected behaviour should be. In dask we have broadly speaking two ti three mechanism to limit concurrency on task level and/or control assignments to workers.
   
   If you want to limit the number of assigned tasks, i.e. want to ensure that tasks are not yet assigned to a worker before it is allowed to be executed, resources are the way to go. 
   
   If you want to control which workers are allowed to work on a given task, the `workers` keyword might be a better fit but that doesn't control concurrency (other than the intrinsic limit a single worker exposes)
   
   If you want to ensure that only a limited number of tasks is executed but it is fine for them to be assigned to a worker and may even block a worker, we have a [Semaphore](https://distributed.dask.org/en/latest/api.html#distributed.Semaphore) which could be used. 
   
   Which is the best to pick depends on how queuing in airflow is supposed to work


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aa1371 edited a comment on issue #16739: Queue support for DaskExecutor using Dask Worker Resources

Posted by GitBox <gi...@apache.org>.
aa1371 edited a comment on issue #16739:
URL: https://github.com/apache/airflow/issues/16739#issuecomment-872005508


   I'd want to get feedback from some more experienced dask users/contributors to the airflow DaskExecutor on this before continuing with creating a PR.
   @fjetter @TomAugspurger @jrbourbeau


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jrbourbeau commented on issue #16739: Queue support for DaskExecutor using Dask Worker Resources

Posted by GitBox <gi...@apache.org>.
jrbourbeau commented on issue #16739:
URL: https://github.com/apache/airflow/issues/16739#issuecomment-873100045


   Specifying a resource as infinite isn't possible today. Perhaps using the `workers=` keyword for `Client.submit` is a more natural fit for what you want to accomplish? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aa1371 commented on issue #16739: Queue support for DaskExecutor using Dask Worker Resources

Posted by GitBox <gi...@apache.org>.
aa1371 commented on issue #16739:
URL: https://github.com/apache/airflow/issues/16739#issuecomment-874953870


   @fjetter thanks for the insight. I think worker resources is the best way forward. Since it lets you tag your workers at creation time and then dispatch your airflow tasks based on that tagged name  (i.e. queue name), without needing to keep track of explicit workers within airflow. Also it turns out there is a way to define infinite worker resources in dask workers, so this will let you define the resource on the worker without having to provide an arbitrarily large limit, or having to worry about how many tasks could possibly run concurrently on your worker.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org