You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/18 07:51:14 UTC

[GitHub] [airflow] hterik opened a new issue #19673: More dynamic pod_mutation_hook, allowing access to task context and inputs

hterik opened a new issue #19673:
URL: https://github.com/apache/airflow/issues/19673


   ### Description
   
   We have some use cases which require fairly complex and dynamic configuration of Kubernets pods. For example configuring the nodeSelector VM size based on task inputs. Also juggling of Volumes, and in particular persistent volume claims. Imagine tasks allocating and preparing volumes which get passed on as input to other tasks.
   
   The mechanism for this today in airflow is 
    * `pod_template_file` -  static and global yaml configuration affecting all tasks
   * `pod_mutation_hook` - Sortof dynamic, can run code and has full access to V1Pod object, but is not aware of which Dag, DagRun, Task or TaskInstance is about to be run, can not access the `get_current_context`.
   * `executor_config.pod_override`  - Static per task configuration. Limited to what `pod_generator.reconcile_xxx` supports, which is a bit limited and can have a some unexpected behavior at times. Not possible to read any DagRun or TaskInstance inputs.
   
   Would it be possible to add a new more rich pod mutation function with following additions:
   * Without the reconcile-limitations. (like pod_mutation_hook)
   * Aware which dag and task is currently executing. (like pod_override) 
   * Can read runtime information from the DagRun and TaskInstance. Most importantly `dag_run.conf` and task inputs, eg XCcom return_value from upstream tasks, or perhaps even any xcom value.
   
   ### Use case/motivation
   
   _No response_
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] easontm commented on issue #19673: More dynamic pod_mutation_hook, allowing access to task context and inputs

Posted by GitBox <gi...@apache.org>.
easontm commented on issue #19673:
URL: https://github.com/apache/airflow/issues/19673#issuecomment-991423383


   FYI @hterik, the pods should have some basic metadata attached to them in the labels section. I modify my pods' `nodeSelector` with this hook based on DAG and task information.
   ```python
   labels = pod.metadata.labels
   dag_id = labels['dag_id']}
   task_id = labels['task_id']
   ```
   
   etc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] shiying2012 commented on issue #19673: More dynamic pod_mutation_hook, allowing access to task context and inputs

Posted by GitBox <gi...@apache.org>.
shiying2012 commented on issue #19673:
URL: https://github.com/apache/airflow/issues/19673#issuecomment-978987381


   My pod is in a similar situation. We are currently using taskflow API and KubernetesExecutor. 
   
   - task1 executes some SQL queries to get some strings. We would like task2 to use the strings obtained by task1 as labels for Kubernetes affinity scheduling, but unfortunately, it is not possible to dynamically generate pod_yaml for task2.
   
   - We would also like to set up the pool for task2 using some other strings output by task1, but for the same reason, this doesn't seem to work either.
   
   Any ideas?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #19673: More dynamic pod_mutation_hook, allowing access to task context and inputs

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #19673:
URL: https://github.com/apache/airflow/issues/19673#issuecomment-975658761


   Aren't Cluster Policies the exact answer to your question?
   
   https://airflow.apache.org/docs/apache-airflow/stable/concepts/cluster-policies.html
   
   Specifycally `task_instance_mutation_hook`  seems to be launched at the very time you want and have all the information needed from the context?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] easontm commented on issue #19673: More dynamic pod_mutation_hook, allowing access to task context and inputs

Posted by GitBox <gi...@apache.org>.
easontm commented on issue #19673:
URL: https://github.com/apache/airflow/issues/19673#issuecomment-991423383


   FYI @hterik, the pods should have some basic metadata attached to them in the labels section. I modify my pods' `nodeSelector` with this hook based on DAG and task information.
   ```python
   labels = pod.metadata.labels
   dag_id = labels['dag_id']}
   task_id = labels['task_id']
   ```
   
   etc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #19673: More dynamic pod_mutation_hook, allowing access to task context and inputs

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #19673:
URL: https://github.com/apache/airflow/issues/19673#issuecomment-979234493


   So just to suggest the possible solution again:
   
   > Aren't Cluster Policies the exact answer to your question?
   > 
   > https://airflow.apache.org/docs/apache-airflow/stable/concepts/cluster-policies.html
   
   I am not 100% if it will work and whether you will be able to modify both POD and queue this way, but I think there is no reason you shoud not be able to. Can you try it @shiying2012  ?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] hterik commented on issue #19673: More dynamic pod_mutation_hook, allowing access to task context and inputs

Posted by GitBox <gi...@apache.org>.
hterik commented on issue #19673:
URL: https://github.com/apache/airflow/issues/19673#issuecomment-984574489


   I've tried out `task_instance_mutation_hook` from Cluster Policies now. It sounds a bit strange to use something called policies for this use case but if it works the way it's explained it's probably good enough.
   
   Unfortunately it seems to not behave the way it's described in the documentation, eg _"Called right before task execution."_.
   What happens instead is the hook is being called on all task instances whenever a dagrun is scheduled. Additionally it's not called at all when manually triggering a dag, only when triggered by schedule.
   
   
   Following example demonstrates:
   ```py
       @task
       def pvc_provider():
           time.sleep(5)
           return "pvc://blablabla"
   
       @task    
       def pvc_consumer(inparam):
           time.sleep(5)        
           return "build result"
   
       pvc_consumer(pvc_provider())
   ```
   
   The only thing my `task_instance_mutation_hook` does is to log the task instances being passed into it.
   Observe the time stamps, in the log we see the mutation _hook being called in the very same moment for both the tasks_, when they should be 5 seconds apart.
   
   ```
   [2021-12-02 12:50:03,024] {airflow_local_settings.py:16} INFO - In the mutation hook for ti='\x1b[01mpvc_provider\x1b[22m', run_id='\x1b[01mscheduled__2021-12-02T11:49:52.937626+00:00\x1b[22m'
   [2021-12-02 12:50:03,025] {airflow_local_settings.py:16} INFO - In the mutation hook for ti='\x1b[01mpvc_consumer\x1b[22m', run_id='\x1b[01mscheduled__2021-12-02T11:49:52.937626+00:00\x1b[22m'
   ```
   
   producer task starts shortly thereafter
   
   ```
   [2021-12-02 12:50:03,060] {dag.py:2928} INFO - Setting next_dagrun for pvc to 2021-12-02T11:50:02.937626+00:00
   [2021-12-02 12:50:03,097] {scheduler_job.py:288} INFO - 1 tasks up for execution:
   	<TaskInstance: pvc.pvc_provider scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
   [2021-12-02 12:50:03,098] {scheduler_job.py:410} INFO - Setting the following tasks to queued state:
   	<TaskInstance: pvc.pvc_provider scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
   [2021-12-02 12:50:03,099] {scheduler_job.py:450} INFO - Sending TaskInstanceKey(dag_id='pvc', task_id='pvc_provider', run_id='scheduled__2021-12-02T11:49:52.937626+00:00', try_number=1) to executor with priority 2 and queue default
   [2021-12-02 12:50:03,099] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'pvc', 'pvc_provider', 'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir', 'DAGS_FOLDER/dag_pvc.py']
   [2021-12-02 12:50:03,105] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'pvc', 'pvc_provider', 'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir', 'DAGS_FOLDER/dag_pvc.py']
   Running <TaskInstance: pvc.pvc_provider scheduled__2021-12-02T11:49:52.937626+00:00 [queued]> on host 
   [2021-12-02 12:50:09,307] {scheduler_job.py:504} INFO - Executor reports execution of pvc.pvc_provider run_id=scheduled__2021-12-02T11:49:52.937626+00:00 exited with status success for try_number 1
   [2021-12-02 12:50:09,311] {scheduler_job.py:547} INFO - TaskInstance Finished: dag_id=pvc, task_id=pvc_provider, run_id=scheduled__2021-12-02T11:49:52.937626+00:00, run_start_date=2021-12-02 11:50:04.024368+00:00, run_end_date=2021-12-02 11:50:09.107072+00:00, run_duration=5.082704, state=success, executor_state=success, try_number=1, max_tries=0, job_id=288, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator
   ```
   
   5 seconds later producer finishes and consumer is scheduled. Here is where the mutation is desired to happen.
   
   ```
   [2021-12-02 12:50:09,458] {scheduler_job.py:288} INFO - 1 tasks up for execution:
   	<TaskInstance: pvc.pvc_consumer scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
   [2021-12-02 12:50:09,459] {scheduler_job.py:317} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued
   [2021-12-02 12:50:09,459] {scheduler_job.py:345} INFO - DAG pvc has 0/16 running and queued tasks
   [2021-12-02 12:50:09,459] {scheduler_job.py:410} INFO - Setting the following tasks to queued state:
   	<TaskInstance: pvc.pvc_consumer scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
   [2021-12-02 12:50:09,460] {scheduler_job.py:450} INFO - Sending TaskInstanceKey(dag_id='pvc', task_id='pvc_consumer', run_id='scheduled__2021-12-02T11:49:52.937626+00:00', try_number=1) to executor with priority 1 and queue default
   [2021-12-02 12:50:09,460] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'pvc', 'pvc_consumer', 'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir', 'DAGS_FOLDER/dag_pvc.py']
   [2021-12-02 12:50:09,465] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'pvc', 'pvc_consumer', 'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir', 'DAGS_FOLDER/dag_pvc.py']
   Running <TaskInstance: pvc.pvc_consumer scheduled__2021-12-02T11:49:52.937626+00:00 [queued]> on host
   ```
   
   5 seconds later consumer finishes
   
   ```
   [2021-12-02 12:50:15,675] {scheduler_job.py:504} INFO - Executor reports execution of pvc.pvc_consumer run_id=scheduled__2021-12-02T11:49:52.937626+00:00 exited with status success for try_number 1
   [2021-12-02 12:50:15,679] {scheduler_job.py:547} INFO - TaskInstance Finished: dag_id=pvc, task_id=pvc_consumer, run_id=scheduled__2021-12-02T11:49:52.937626+00:00, run_start_date=2021-12-02 11:50:10.399690+00:00, run_end_date=2021-12-02 11:50:15.498334+00:00, run_duration=5.098644, state=success, executor_state=success, try_number=1, max_tries=0, job_id=289, pool=default_pool, queue=default, priority_weight=1, operator=_PythonDecoratedOperator
   [2021-12-02 12:50:15,700] {manager.py:1051} INFO - Finding 'running' jobs without a recent heartbeat
   [2021-12-02 12:50:15,700] {manager.py:1055} INFO - Failing jobs without heartbeat after 2021-12-02 11:45:15.700869+00:00
   [2021-12-02 12:50:15,815] {airflow_local_settings.py:16} INFO - In the mutation hook for ti='\x1b[01mpvc_provider\x1b[22m', run_id='\x1b[01mscheduled__2021-12-02T11:50:02.937626+00:00\x1b[22m'
   [2021-12-02 12:50:15,816] {airflow_local_settings.py:16} INFO - In the mutation hook for ti='\x1b[01mpvc_consumer\x1b[22m', run_id='\x1b[01mscheduled__2021-12-02T11:50:02.937626+00:00\x1b[22m'
   ```
   
   This was tried out using the SequentialScheduler and sqllite, maybe other schedulers behave differently?  Though i doubt it, skimming through the DagRun.verify_integrity where the hook is called it appears to always apply to all taskinstances at the same time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] easontm edited a comment on issue #19673: More dynamic pod_mutation_hook, allowing access to task context and inputs

Posted by GitBox <gi...@apache.org>.
easontm edited a comment on issue #19673:
URL: https://github.com/apache/airflow/issues/19673#issuecomment-991423383


   FYI @hterik, the pods should have some basic metadata attached to them in the labels section. I modify my pods' `nodeSelector` with the hook based on DAG and task information.
   ```python
   labels = pod.metadata.labels
   dag_id = labels['dag_id']}
   task_id = labels['task_id']
   ```
   
   etc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] easontm edited a comment on issue #19673: More dynamic pod_mutation_hook, allowing access to task context and inputs

Posted by GitBox <gi...@apache.org>.
easontm edited a comment on issue #19673:
URL: https://github.com/apache/airflow/issues/19673#issuecomment-991423383


   FYI @hterik, the pods should have some basic metadata attached to them in the labels section. I modify my pods' `nodeSelector` with the hook based on DAG and task information.
   ```python
   labels = pod.metadata.labels
   dag_id = labels['dag_id']}
   task_id = labels['task_id']
   ```
   
   etc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org