You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/12/25 19:17:43 UTC

[GitHub] [airflow] lssatvik opened a new issue #20498: Dynamically determine queue for a dag or task by passing a function that takes context as input

lssatvik opened a new issue #20498:
URL: https://github.com/apache/airflow/issues/20498


   ### Description
   
   Right now the queue of a dag/task is determined by the queue parameter in dag/task definition. I want this parameter to take a function as input. If a function is given as input, it should pass a context variable and use the return value as the queue for the dag/task. 
   
   Airflow dags and tasks support callback functions like for on_success_callback, which is given the context variable and executed on success. I want similar capability for determining queue.
   
   ### Use case/motivation
   
   I use an independent ec2 instance as a celery worker for every "dagrun". The queue for any dagrun is dag_id-run_id. In all my dags my first task is always an operator working in the "master" queue that sets up an ec2 instance and starts the worker with the custom queue name.
   
   I modified a line in the _enqueue_task_instances_with_queued_state function in scheduler_job.py:-
   `queue = ti.queue if ti.queue == "master" else f"{ti.dag_id}-{ti.run_id}"`
   
   So finally the queue for every task that is not the ec2 operator is dag_id-run_id. As the ec2_operator starts a celery worker with that specific queue name all tasks not defined with "master" queue (which has a worker running locally) are executed in the celery worker.
   
   So my setup required a small modification to the airflow code base. It would be helpful if the scheduler can determine the queue name through a user-defined function using the context variable.
   
   In the present state, I can only define a queues for each dag but not for each dag-run, as the run_id is determined at runtime. Also the changed queue name does not reflect in the UI as the queue name is only changed when a task instance is enqueued.
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #20498: Dynamically determine queue for a dag or task by calling a function that takes context as input

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #20498:
URL: https://github.com/apache/airflow/issues/20498#issuecomment-1001560845


   That is unlikely to happen because of security.
   
   Callbacks are executed in the context of Worker or DagFileProcessor, Scheuler is not supposed to execute any code provided by the user in the DAG. IT's the scheduler that dermines which executor can be used, and it sends prepared task to the executor (sometimes based on the "queue") parameter. And as you mentioned - the celery workers pick the tasks from the queu that they are configured with, so by the time the task start, their queue already pre-determined where they should be run.
   
   The only real place where you can change queue for the tasks is at teh DAG parsing time - which effectively means that once the task has been plced in the DAG structure it's queue has to be determined. You canot dynamically change it in scheduler. Schedulers just schedules whatever is declared in the code that comes "pre-installed" with airflow. - for example custom triggers, or custom timetables have to be pre-installed and DAGs cannot define their logic - they can at most declare and configure which timetable/trigger will be used. 
   
   So the only way it could be implemented is by defining some "customizable" mechanism of queue selection - rather than allow DAG writer to define it in the way that callbacks are defined.
   
   I will convert it into discussion - maybe it will be picked by someone who would like to have similar mechanism, but at the very least it would require extensive discussion in devlist and AIP (Airflow Improvement Proposal).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org