You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "jossM (via GitHub)" <gi...@apache.org> on 2023/02/11 08:24:15 UTC

[GitHub] [airflow] jossM opened a new issue, #29474: Pool slots > 1 handling with priority weight

jossM opened a new issue, #29474:
URL: https://github.com/apache/airflow/issues/29474

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   We have a task very prio with pool_slots that are more than 1 and lots of small task low prio with a pool slot of 1.
   We notice that most often the small task get run before the prio task even though its priority_weight is indeed bigger than the other ones.
   
   (If you need concrete example of why we do this, here is the detailed context : We use redshift to process data with wlm that limits the number of parrallel query to a given number. Airflow's priority weight is used to pilot what we expect from our redshift. To give more power to a given query we sometimes attribute it the slot of several query in redshift thourgh the wlm_slot_count paramater and represent this in airflow using the pool_slots. We see that these queries have important delays before being scheduled even though they are high prio.)
   
   ### What you think should happen instead
   
   The high prio task should be running first.
   
   ### How to reproduce
   
   This small dag reproduces the issue : 
   ```
   import time
   
   from airflow.models.dag import DAG
   from airflow.models.baseoperator import BaseOperator
   from airflow.utils.dates import days_ago
   
   pool = "max_of_two_slots"
   
   
   class Sleep(BaseOperator):
       def __init__(self, sleep_time: int, *args, **kwargs):
           self.sleep_time = sleep_time
           super().__init__(*args, **kwargs)
   
       def execute(self, context):
           time.sleep(self.sleep_time)
   
   
   task_duration = 10
   
   def make_low_prio_exec_chain(tasks_ids: [str]):
       steps = [Sleep(task_id=tid, priority_weight=0, pool=pool, sleep_time=task_duration) for tid in tasks_ids]
       for i in range(len(steps) - 1):
           steps[i] >> steps[i+1]
       return steps
   
   
   with DAG(dag_id='priority_weight_issue', default_args={"start_date": days_ago(1)}) as dag:
       initial_delay = Sleep(task_id=f'prevent_high_prio_from_running_straight_away', pool=pool, sleep_time=int(task_duration/2), )
       make_low_prio_exec_chain([f'low_prio_chain_{chain_index}'for chain_index in range(10)])
       delayed_chain = make_low_prio_exec_chain([f'delayed_low_prio_chain_{chain_index}' for chain_index in range(10)])
       initial_delay >> delayed_chain[0]
       initial_delay >> Sleep(task_id=f'high_prio', priority_weight=1, pool=pool, pool_slots=2, sleep_time=1)
   
   ```
   Gant Chart : 
   <img width="1332" alt="Screenshot 2023-02-11 at 09 09 32" src="https://user-images.githubusercontent.com/13378149/218248067-951802b0-5fa1-45a7-a9d1-1d7a1ef4832f.png">
   
   
   ### Operating System
   
   ubuntu
   
   ### Versions of Apache Airflow Providers
   
   N/A
   
   ### Deployment
   
   Composer
   
   ### Deployment details
   
   The example was reproduced using docket compose but we also see it in k8s helm chart deployment.
   
   ### Anything else
   
   More than 90% of the time on our stack for high pool slots.
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jossM commented on issue #29474: Pool slots > 1 handling with priority weight

Posted by "jossM (via GitHub)" <gi...@apache.org>.
jossM commented on issue #29474:
URL: https://github.com/apache/airflow/issues/29474#issuecomment-1426764274

   Thanks for the answer!
   With your link, I also found this [thread](https://github.com/apache/airflow/discussions/28809) that details the reason why we have this design and that I missed previously.
   This still makes it rather tricky for us as this means we cannot use pool for our use case and that we have no way of doing this with airflow natively to my knowledge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jossM closed issue #29474: Pool slots > 1 handling with priority weight

Posted by "jossM (via GitHub)" <gi...@apache.org>.
jossM closed issue #29474: Pool slots > 1 handling with priority weight
URL: https://github.com/apache/airflow/issues/29474


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on issue #29474: Pool slots > 1 handling with priority weight

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on issue #29474:
URL: https://github.com/apache/airflow/issues/29474#issuecomment-1426765019

   > This still makes it rather tricky for us as this means we cannot use pool for our use case and that we have no way of doing this with airflow natively to my knowledge.
   
   You are welcome to open GitHub discussion / StackOverflow question raising your specific use case and ask for advises.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on issue #29474: Pool slots > 1 handling with priority weight

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on issue #29474:
URL: https://github.com/apache/airflow/issues/29474#issuecomment-1426678945

   Priority is taken into account only when there are more tasks than pool slots.
   
   Reson for this is explained in the docs https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/scheduler.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jossM commented on issue #29474: Pool slots > 1 handling with priority weight

Posted by "jossM (via GitHub)" <gi...@apache.org>.
jossM commented on issue #29474:
URL: https://github.com/apache/airflow/issues/29474#issuecomment-1427073833

   Thanks for the precision.
   
   The idea I had in mind was to attribute the free pool slots to the most prioritary one, even if it might not be enough to have it run right away. In practice, I think this would mean replacing the `continue` by a `break` [here](https://github.com/apache/airflow/blob/6ae0a80cbaf1d33343b763c7f82612b4522afc40/airflow/jobs/scheduler_job.py#L484).
   Basically this would boil down as airflow aquiring the pools for my prioritary tasks as soon as it can , and, once a number of slots corresponding to my slots have been reached, effectively run the task.
   In the previous example this would mean running, the prio task after the first 2 ones, which I find fairly intuitive and would expect to do actually.
   
   There is however an unfortunate side effect of this design change that I see : 
   One could now completely block any task using a slot simply by having one task with a pool_slots higher than the pool capacity. The current implementation prevent such things from happening. I believe this would need to be counteracted by implementing some sort of validation when creating the task in your dag or when modifying the pool. On that note, given my personal experience, I would have appreciated airflow telling me that my configuration makes the task impossible to run in the UI rather than having to read the scheduler logs to figure out what was going on. (Or there is a way to see this kind of thing that I don't know of maybe?)
   
   Concerning the use case behind our usage of pools, I tried explaining it in the `What happened` section, but it must have not been clear enough. Basically we pilot redshift processing using airflow. Redshift has a configured limit in the number of parrallel query it runs. We use Airflow's priority weight & pool logic to make sure Redshift works in accordance to our business SLAs. Pools_slots are used to represent the fact we sometimes attribute several query slots to a query to give it more power (RAM mainly and CPU as a side effect). This is represented in redshift as a wlm_slot_count paramater higher than 1 set in the query. This attribution is represented in airflow using the pool_slots parameter. 
   The reason I opened the case is that priority queries using several slots get run last even though they are the highest prio task arround making us miss our SLAs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #29474: Pool slots > 1 handling with priority weight

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on issue #29474:
URL: https://github.com/apache/airflow/issues/29474#issuecomment-1427086005

   > The idea I had in mind was to attribute the free pool slots to the most prioritary one, even if it might not be enough to have it run right away. In practice, I think this would mean replacing the continue by a break [here](https://github.com/apache/airflow/blob/6ae0a80cbaf1d33343b763c7f82612b4522afc40/airflow/jobs/scheduler_job.py#L484).
   
   Nope. That's not the right assesment. This is `task_concurrency_limits` case - corresponding to `max_active_tis_per_dag` case. This is very different from pools and this check is applied in a very different moment. The Pools decide on how many tasks gets converted from `scheduled` to `queued` (and then effectivelly `running` if there are enough workers) when scheduling loop happens. Pools decide on how many of the tasks in total are put in "queued + running" state and then the availability of workers for each case decide which one and how many will be run. T
   
   > The reason I opened the case is that priority queries using several slots get run last even though they are the highest prio task arround making us miss our SLAs.
   
   They are not run last. I think this is misunderstanding how it works. 
   
   They are run when there are enough slots available. Full stop. If you have almost all slots used, and you have a bunch of tasks that take 1 slot, and 1 task thet takes 10 slots, then of course - all the 1-slot tasks will get executed first - because they will continuously start taking the slots when they get free. Which mean that yes - they can starve the 10-slot tasks easily if they keep on coming - because the 1-slot tasks will get queued and the will keep on taking the slots. 
   
   And If you would come up with a concrete "logical" design how to solve all such isues - this is all welcome. This requires a lot of thinking, about a number of different cases. writing Airflow Improvement Proposal https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals - which you are absolutely welcome to work on - and - as I wrote above - most likely it involves some trade-offs. And assesing impact of such change on existing cases that are out there. It's certainly not a matter of changing one `continue` into `break`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #29474: Pool slots > 1 handling with priority weight

Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on issue #29474:
URL: https://github.com/apache/airflow/issues/29474#issuecomment-1426658701

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #29474: Pool slots > 1 handling with priority weight

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on issue #29474:
URL: https://github.com/apache/airflow/issues/29474#issuecomment-1426834638

   BTW. In practice, what you would like to achieve is extremely hard @jossM. If you look at various prioritisation mechanisms in computer science (starting from priorities of processes in Kernel, and ending with prioritisation of prioritisation of Pods in Kubernetes) there are no "perfect" solutions that work following "I want the high priority tasks to always be first, not preempt the low priority ones and by the way there should be 0 resource overhead for that". This does not work in practice. And there is a very sound math theory behind it.
   
   You have to always give up something:
   * priority
   * resources
   * preempting tasks
   
   It is impossible to have "perfect priority with low latency no resource overhead without preempting other tasks". Basic math says so.
   
   The setup you have (with fixed pool size) is a choice of giving up priority in exchange for no extra resource overhead, and no preemption. 
   
   You could make another choice - for example preempting low-priority tasks (and freeing slots) when the high priority task is about to start. Mainly because airflow never pre-empts running tasks on purpose. If that would be acceptable for you, you could likely add a monitor in your low-priority tasks that would exit immediately if new high priority tasks is in queued state. 
   
   Or you could sacrifice resources - and for example rather than using pools to limit the parallel numer of tasks would use separate queeues to run high and small priority tasks. This way when therea are no high priority tasks running the queue would go idle and you would loose some resources, but then the queue would be ready to pick up high priority tasks immediately when they are available.
   
   If - for example you could design (and likely implement) a proposal on how to implement preempting low priority tasks - that would be an interesting feature to consider. But preempting running tasks is not a good idea in general.
   
   But I think it's not that you have no other way. You have not explained why you are really using pools - but maybe if you decide what you can sacrifice a bit, with the right combination of queues and pools you can likely achieve what you want. But you have to decide which of the three is priority for you - same like with short time, high qualty, low cost managers often think they can all of those at once, but in practice they need to decide which of those three they want to give up on.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jossM commented on issue #29474: Pool slots > 1 handling with priority weight

Posted by "jossM (via GitHub)" <gi...@apache.org>.
jossM commented on issue #29474:
URL: https://github.com/apache/airflow/issues/29474#issuecomment-1427706282

   You're absolutely right, thanks for pointing out my mistake. I got confused with the different versions of this file.
   For compltenes sake, I was more thinking of this [one](https://github.com/apache/airflow/blob/6ae0a80cbaf1d33343b763c7f82612b4522afc40/airflow/jobs/scheduler_job.py#L425 though with the latest version of the code this is not straighforward as you said.
   I may create the improvement proposal too, depending on the results of the [github discussions](https://github.com/apache/airflow/issues/29474#issuecomment-1426765019) on the best way to do this correctly given our use case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org