You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Ping Zhang <pi...@umich.edu> on 2022/09/06 03:52:51 UTC

Re: Airflow Pluggable Scheduler

Hi Jarek, Tomasz and Vikram,

Thanks for your comments.

To be clear, we are not removing the scheduler but to make it pluggable,
just like different executors in the core and the BackfillJob.

Pluggable scheduler can give the flexibility to the users to choose or
write a scheduler to meet their infra requirements while still leveraging
the rest of Airflow protocol e.g. dag authoring without forking airflow. It
does not make assumptions about how you should use/run airflow clusters. It
also avoids fitting all use cases into one scheduler, leading to convoluted
logic and being hard to test.

The pluggable scheduler framework
<https://gist.github.com/pingzh/6717ff99b4ca31d5b02161f7999a9dd8#file-airflowschedulerframework-py-L1-L41>
is a very lightweight class (less than 40 lines of code), which only
defines high level scheduler contracts. It does not change any features in
the current scheduler. It uses the dependency injection pattern, which
injects the scheduler_cls to the framework. It does not assume any
implementation. It only abstracts common interfaces for the scheduler
(similar to the BaseExecutor). On a very high level, it breaks down the `
_do_scheduling()
<https://github.com/apache/airflow/blob/a2db8fcb7df1a266e82e17b937c9c1cf01a16a42/airflow/jobs/scheduler_job.py#L918-L977>`
into three interfaces:

   1.

   make_scheduling_decisions, which corresponds to this code block
   <https://github.com/apache/airflow/blob/a2db8fcb7df1a266e82e17b937c9c1cf01a16a42/airflow/jobs/scheduler_job.py#L918-L943>
   .
   2.

   send_queuable_tasks_to_executor, which corresponds to this code block
   <https://github.com/apache/airflow/blob/a2db8fcb7df1a266e82e17b937c9c1cf01a16a42/airflow/jobs/scheduler_job.py#L945-L973>
   .
   3.

   process_system_events, which corresponds to executor events and zombies.

Just like Kubernetes, it was originally designed for web applications and
microservices. However, it has been extended to the offline data
world. The Scheduling
Framework
<https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/>
was proposed to make its scheduler more pluggable. Also, new Kubernetes
schedulers, like YuniKorn
<https://yunikorn.apache.org/docs/next/design/architecture/>, volcano
<https://volcano.sh/en/> were created for different use cases and plugged
to the Kubernetes ecosystem.

Please let me know your thoughts.

Ping


On Mon, Aug 29, 2022 at 3:44 PM Vikram Koka <vi...@astronomer.io.invalid>
wrote:

> Hi Ping,
>
> Conceptually, I have a similar reaction to Jarek and Tomek above, but I
> really want to understand the problem you have described with (2) before I
> comment further.
>
> Can you please elaborate on the problems:
> Airflow 2.0 treats all DagRuns with the same scheduling priority (see code
> <https://github.com/apache/airflow/blob/6b7a343b25b06ab592f19b7e70843dda2d7e0fdb/airflow/jobs/scheduler_job.py#L923>).
> This means DAGs with more DagRuns could be scheduled more often than others
> and large DAGs might slow down small DAGs scheduling. This may not be
> desired in some cases.
>
> Can you please share a use case for the above issue?
>
> Thanks,
> Vikram
>
>
> On Sun, Aug 28, 2022 at 1:45 AM Tomasz Urbaszek <tu...@apache.org>
> wrote:
>
>> Hey Ping,
>>
>> I somehow agree with Jarek that there's not much detail on what exactly
>> would be the "pluggable" part.
>>
>> Is "new" scheduler the only solution to those problems you have
>> mentioned? As said, maybe we should consider DagRuns prioritisation? If
>> growing tables is a perf problem then maybe you should consider
>> introducing some retention policy (it's done by the others as far as I
>> know)?
>>
>> Personally I think we should focus on how to improve "out of the box"
>> Airflow instead of adding more plug-something components.
>>
>> Cheers,
>> Tomek
>>
>> On Sat, 27 Aug 2022 at 02:05, Jarek Potiuk <ja...@potiuk.com> wrote:
>>
>>> Hey Ping,
>>>
>>> I don't think there is nearly enough information in what you described
>>> on what "pluggable" scheduler means. What I see in the doc and your
>>> description is a problem statement, but not a solution.
>>>
>>> But please don't jump on trying to describe it just yet.
>>>
>>> I am very skeptical about the idea in general. if you remove scheduler.
>>> from Ariflow Core, there is not much left. Airflow core is practically
>>> speaking only a scheduler when-  it comes to the internals. I almost think
>>> that if someone wants to make "scheduler" pluggable - that calls for
>>> forking Airflow - and forking airflow (if someone wants to do it and
>>> developing for themselves will be far more effective than trying to get a
>>> "pluggable" architecture. Also because this is at most a tactical solution
>>> IMHO.
>>>
>>> I believe a lot of this problem statement of yours is "past looking"
>>> rather than "forward looking" and it does not include the fact that not
>>> only Airflow changes but that the environment around it changes. And by the
>>> time the result of any design and development of any such pluggable
>>> solution might be even close to completion, the environment will change
>>> already and we will be in a different place - both in terms of what Airflow
>>> will be capable of and what external environment will be.
>>>
>>> MySQL 5.7 EOL is in a year - October 2023. And we will surely drop it
>>> then. And anyone using it should. Then the single scheduler only case will
>>> be gone (we will not support 5.7 anyway). I seriously doubt within a year
>>> we can develop "another" scheduler and even if we do, if the only reason
>>> for it is not supporting multiple schedulers, that would be the first thing
>>> to drop in October 2023. And if we have a pluggable scheduler with 2
>>> implementations by October 2023, I will be the first one to raise "let's
>>> drop this non-locking scheduler as we don't need it any more". If you look
>>> back - and imagine we are back in January 2019 and we would keep
>>> compatibility with Python 2.7. We would not be where we are now. And we
>>> need to look into the modern, new Airflow future rather than looking at
>>> some bad and discouraged ways of using Airflow. Even more, we should
>>> encourage and help the users that are using Airflow in those
>>> "non-future-proof" ways to switch to use the new ways. And add features
>>> that make current scheduler more appealing for the cases you mentioned
>>>
>>> Also the 2.4 of Airflow brings Datasets and Data-driven scheduling. And
>>> as surprising as it might look, it will generally solve the "big DAG/small
>>> DAG" problem. Simply speaking, the DAGs of Airflow will suddenly start
>>> becoming more modular and you will be able to do the same you did with huge
>>> 1000 tasks dags with 50 20-tasks dags which will be connected via datasets.
>>> this will be far better, more modular solution. And rather than
>>> complicating Airflow by designing and implementing  multiple schedulers, I
>>> would rather focus on developing tooling that will make distributed DAG
>>> development far more appealing for any users. And those users (like AirBnB
>>> - with huge DAGs) should follow the suite in changing their approach - this
>>> will give them far more capabilities, will enable them to distribute DAG
>>> development and manage it way better than having a huge, simple DAG
>>>
>>> Maybe instead of adding pluggable schedulers, we should rather (after
>>> 2.4) work on a tooling that will help users with huge DAGs to split them.
>>> Maybe we should add a way to prioritise DagRuns ? Both of those are much
>>> more forward-looking than trying to "cement" existing (bad) usage patterns
>>> IMHO by making them "blessed" by having a 2nd type of scheduler supporting
>>> those cases that should be solved differently.
>>>
>>> That's how I see it.
>>>
>>> J.
>>>
>>>
>>> On Tue, Aug 23, 2022 at 7:46 AM Ping Zhang <pi...@umich.edu> wrote:
>>>
>>>> Hi Airflow community,
>>>>
>>>> We are proposing to have the Airflow Scheduler adopt a pluggable
>>>> pattern, similar to the executor.
>>>>
>>>> Background:
>>>>
>>>> Airflow 2.0 has introduced a new scheduler in AIP-15 (Scheduler HA +
>>>> performance improvement)
>>>> <https://airflow.apache.org/blog/airflow-two-point-oh-is-here/#massive-scheduler-performance-improvements>.
>>>> The new scheduler leverages the skip-locked feature in the database to
>>>> scale horizontally
>>>> <https://airflow.apache.org/docs/apache-airflow/stable/concepts/scheduler.html#overview>.
>>>> It works well for relatively small clusters (small number of tasks in a dag
>>>> and small number of dag files) as shown in the benchmark results from the
>>>> community:
>>>>
>>>> Scenario (1000 tasks in total)
>>>>
>>>> DAG shape
>>>>
>>>> 1.10.10 Total Task Lag
>>>>
>>>> 2.0 beta Total Task Lag
>>>>
>>>> Speedup
>>>>
>>>> 100 DAG files, 1 DAG per file,
>>>>
>>>> 10 Tasks per DAG
>>>>
>>>> Linear
>>>>
>>>> 200 seconds
>>>>
>>>> 11.6 seconds
>>>>
>>>> 17 times
>>>>
>>>> 10 DAG files, 1 DAG per file,
>>>>
>>>> 100 Tasks per DAG
>>>>
>>>> Linear
>>>>
>>>> 144 seconds
>>>>
>>>> 14.3 seconds
>>>>
>>>> 10 times
>>>>
>>>> 10 DAG files, 10 DAGs per file,
>>>>
>>>> 10 Tasks per DAG
>>>>
>>>> Binary Tree
>>>>
>>>> 200 seconds
>>>>
>>>> 12 seconds
>>>>
>>>> 16 times
>>>>
>>>> From: https://www.astronomer.io/blog/airflow-2-scheduler
>>>>
>>>> From the most recent 2022 Airflow survey
>>>> <https://docs.google.com/document/d/18E3gBbrPI6cHAKRkRIPfju9pOk4EJNd2M-1fRJO4glA/edit#heading=h.yhlzd4j2mpzz>,
>>>> 81% of the Airflow users have between 1 to 250 DAGs in their largest
>>>> Airflow instance (4.8% of users have more than 1000 DAGs). 75% of the
>>>> surveyed Airflow users have between 1 to 100 tasks per DAG. The Airflow 2.0
>>>> scheduler can satisfy these needs.
>>>>
>>>> However, there are cases where the Airflow 2.0 scheduler cannot be
>>>> deployed due to:
>>>>
>>>>    1.
>>>>
>>>>    The team cannot use more than one scheduler due to the company’s
>>>>    database team not supporting mysql 8+ or postgresql 10+. (Arguably, it is
>>>>    true that they should be supported but in reality, it can take quite a
>>>>    while for large companies to upgrade to newer db versions)
>>>>    2.
>>>>
>>>>    Airflow 2.0 treats all DagRuns with the same scheduling priority (see
>>>>    code
>>>>    <https://github.com/apache/airflow/blob/6b7a343b25b06ab592f19b7e70843dda2d7e0fdb/airflow/jobs/scheduler_job.py#L923>).
>>>>    This means DAGs with more DagRuns could be scheduled more often than others
>>>>    and large DAGs might slow down small DAGs scheduling. This may not be
>>>>    desired in some cases.
>>>>    3.
>>>>
>>>>    For very large scale clusters (with more than 10 million rows in
>>>>    the task instance table), the database tends to be the unstable component.
>>>>    The infra team does not want to add extra load to the database with more
>>>>    than one scheduler. However, with only one Airflow 2.0 scheduler, it cannot
>>>>    support large scale clusters as it has removed the multi-processing dag
>>>>    runs and only uses one core to schedule all dag runs
>>>>    <https://github.com/apache/airflow/blob/6b7a343b25b06ab592f19b7e70843dda2d7e0fdb/airflow/jobs/scheduler_job.py#L886-L976>
>>>>    .
>>>>
>>>> The above limitations hinder evolving Airflow as a general purpose
>>>> scheduling platform.
>>>>
>>>> To address the above limitations and avoid making the scheduler core
>>>> code larger and logic more complex, we propose to have a pluggable
>>>> scheduler pattern. With that, the Airflow infra team/users can choose the
>>>> best scheduler to satisfy their needs and even swap parts that need
>>>> customization to achieve their best interest.
>>>>
>>>> Please let me know your thoughts about this and look forward to
>>>> feedback.
>>>>
>>>> (Here is the google doc link,
>>>> https://docs.google.com/document/d/1njmX3D_9a4TjjG9CYPWJqdkb9EyXkeQPnycYaMTUQ_s/edit?usp=sharing
>>>> feel free to comment it in the doc)
>>>>
>>>> Thanks,
>>>>
>>>> Ping
>>>>
>>>>

Re: Airflow Pluggable Scheduler

Posted by Ash Berlin-Taylor <as...@apache.org>.
Yeah, essentially what `airflow scheduler` does is:

```
job = SchedulerJob()
job.run()
```

You could even put `if __name__ == "__main__": ...` at the end of your 
custom scheduler and then do `python -m your.custom_scheduler`

On Tue, Sep 6 2022 at 12:15:52 +02:00:00, Jarek Potiuk 
<ja...@potiuk.com> wrote:
> Why do you need to make it pluggable?
> 
> One could say that a better approach will be toi build your own 
> scheduler and run it instead of Airflow Scheduler and run it instead? 
> Basically we are talking about replacing the whole scheduling loop. 
> Splitting it into three separate steps makes it actually less 
> flexible than replacing the whole loop with - I think - questionable 
> benefits. But maybe I am mistaken :) ?
> 
> I am not saying it is better and that making it pluggale this way is 
> a bad idea, I just want to know what you want to achieve by making it 
> pluggable this way, compared to just writing your own scheduler from 
> scratch using existing Airflow classes ? In the model you proposed 
> with the "pluggable scheduler framework" - this is basically what 
> would happen anyway if you had to write your own scheduler but IMHO 
> you will be bound and limited to implementing those three methods in 
> specific way, while when writing your own scheduler you would be free 
> to do anything in any way you want.
> 
> Ree: Kubernetes - I think we should not look at what others did but 
> also why they did it. I think there is a difference between 
> Kubernetes in that it provides much more than just scheduling - it 
> provides resource management, networking, cluster management, and 
> scheduling is just part of it. Airflow is quite a different beast 
> altogether. It provides some tools on the "task level" - dependency 
> management, APIS that the tasks can use, the way how individual tasks 
> are executed etc. But on the "whole airflow" level it does not 
> provide any abstractions (resource management etc.)  that will remain 
> if you remove the scheduler. If you remove the scheduler 
> implementation, there is not much left - there are executors, but 
> (providing that we clean their API a bit and make them truly 
> standalone) they could be used totally independently on the 
> "scheduling loop" implementation. There is no need to have the same 
> kind of loop with the three steps that the current scheduler has.
> 
> I think we have another layer of abstraction here: "jobs" that seems 
> more appropriate for what you propose:
> 
> * backfill_job.py
> * triggerer_job.py
> * scheduler_job.py
> * local_task_job.py
> * base_job.py
> 
> What I think you want to do is to replace the whole "SchedulerJob". 
> So why not yourself (or anyone else) implement their own 
> "non_locking_scheduler_job.py" and plug it into CLI?
> 
> I am really curious to see the reasoning why the "pluggable" 
> architecture would be better from yourself (or anyone else) writing a 
> new "scheduler" job and accompanying scheduler CLI from scratch?
> 
> J.
> 
> 
> 
> On Tue, Sep 6, 2022 at 5:53 AM Ping Zhang <pingzh@umich.edu 
> <ma...@umich.edu>> wrote:
>> Hi Jarek, Tomasz and Vikram,
>> 
>> Thanks for your comments.
>> 
>> To be clear, we are not removing the scheduler but to make it 
>> pluggable, just like different executors in the core and the 
>> BackfillJob.
>> 
>> Pluggable scheduler can give the flexibility to the users to choose 
>> or write a scheduler to meet their infra requirements while still 
>> leveraging the rest of Airflow protocol e.g. dag authoring without 
>> forking airflow. It does not make assumptions about how you should 
>> use/run airflow clusters. It also avoids fitting all use cases into 
>> one scheduler, leading to convoluted logic and being hard to test.
>> 
>> The pluggable scheduler framework 
>> <https://gist.github.com/pingzh/6717ff99b4ca31d5b02161f7999a9dd8#file-airflowschedulerframework-py-L1-L41> 
>> is a very lightweight class (less than 40 lines of code), which only 
>> defines high level scheduler contracts. It does not change any 
>> features in the current scheduler. It uses the dependency injection 
>> pattern, which injects the scheduler_cls to the framework. It does 
>> not assume any implementation. It only abstracts common interfaces 
>> for the scheduler (similar to the BaseExecutor). On a very high 
>> level, it breaks down the `_do_scheduling() 
>> <https://github.com/apache/airflow/blob/a2db8fcb7df1a266e82e17b937c9c1cf01a16a42/airflow/jobs/scheduler_job.py#L918-L977>` 
>> into three interfaces:
>> 
>> make_scheduling_decisions, which corresponds to this code block 
>> <https://github.com/apache/airflow/blob/a2db8fcb7df1a266e82e17b937c9c1cf01a16a42/airflow/jobs/scheduler_job.py#L918-L943>.
>> 
>> send_queuable_tasks_to_executor, which corresponds to this code 
>> block 
>> <https://github.com/apache/airflow/blob/a2db8fcb7df1a266e82e17b937c9c1cf01a16a42/airflow/jobs/scheduler_job.py#L945-L973>.
>> 
>> process_system_events, which corresponds to executor events and 
>> zombies.
>> 
>> Just like Kubernetes, it was originally designed for web 
>> applications and microservices. However, it has been extended to the 
>> offline data world. The Scheduling Framework 
>> <https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/> 
>> was proposed to make its scheduler more pluggable. Also, new 
>> Kubernetes schedulers, like YuniKorn 
>> <https://yunikorn.apache.org/docs/next/design/architecture/>, 
>> volcano <https://volcano.sh/en/> were created for different use 
>> cases and plugged to the Kubernetes ecosystem.
>> 
>> Please let me know your thoughts.
>> 
>> Ping
>> 
>> 
>> 
>> On Mon, Aug 29, 2022 at 3:44 PM Vikram Koka 
>> <vi...@astronomer.io.invalid> wrote:
>>> Hi Ping,
>>> 
>>> Conceptually, I have a similar reaction to Jarek and Tomek above, 
>>> but I really want to understand the problem you have described with 
>>> (2) before I comment further.
>>> 
>>> Can you please elaborate on the problems:
>>> Airflow 2.0 treats all DagRuns with the same scheduling priority 
>>> (see code 
>>> <https://github.com/apache/airflow/blob/6b7a343b25b06ab592f19b7e70843dda2d7e0fdb/airflow/jobs/scheduler_job.py#L923>). 
>>> This means DAGs with more DagRuns could be scheduled more often 
>>> than others and large DAGs might slow down small DAGs scheduling. 
>>> This may not be desired in some cases.
>>> 
>>> Can you please share a use case for the above issue?
>>> 
>>> Thanks,
>>> Vikram
>>> 
>>> 
>>> On Sun, Aug 28, 2022 at 1:45 AM Tomasz Urbaszek 
>>> <turbaszek@apache.org <ma...@apache.org>> wrote:
>>>> Hey Ping,
>>>> 
>>>> I somehow agree with Jarek that there's not much detail on what 
>>>> exactly would be the "pluggable" part.
>>>> 
>>>> Is "new" scheduler the only solution to those problems you have 
>>>> mentioned? As said, maybe we should consider DagRuns 
>>>> prioritisation? If growing tables is a perf problem then maybe you 
>>>> should consider introducing some retention policy (it's done by 
>>>> the others as far as I know)?
>>>> 
>>>> Personally I think we should focus on how to improve "out of the 
>>>> box" Airflow instead of adding more plug-something components.
>>>> 
>>>> Cheers,
>>>> Tomek
>>>> 
>>>> On Sat, 27 Aug 2022 at 02:05, Jarek Potiuk <jarek@potiuk.com 
>>>> <ma...@potiuk.com>> wrote:
>>>>> Hey Ping,
>>>>> 
>>>>> I don't think there is nearly enough information in what you 
>>>>> described on what "pluggable" scheduler means. What I see in the 
>>>>> doc and your description is a problem statement, but not a 
>>>>> solution.
>>>>> 
>>>>> But please don't jump on trying to describe it just yet.
>>>>> 
>>>>> I am very skeptical about the idea in general. if you remove 
>>>>> scheduler. from Ariflow Core, there is not much left. Airflow 
>>>>> core is practically speaking only a scheduler when-  it comes to 
>>>>> the internals. I almost think that if someone wants to make 
>>>>> "scheduler" pluggable - that calls for forking Airflow - and 
>>>>> forking airflow (if someone wants to do it and developing for 
>>>>> themselves will be far more effective than trying to get a 
>>>>> "pluggable" architecture. Also because this is at most a tactical 
>>>>> solution IMHO.
>>>>> 
>>>>> I believe a lot of this problem statement of yours is "past 
>>>>> looking" rather than "forward looking" and it does not include 
>>>>> the fact that not only Airflow changes but that the environment 
>>>>> around it changes. And by the time the result of any design and 
>>>>> development of any such pluggable solution might be even close to 
>>>>> completion, the environment will change already and we will be in 
>>>>> a different place - both in terms of what Airflow will be capable 
>>>>> of and what external environment will be.
>>>>> 
>>>>> MySQL 5.7 EOL is in a year - October 2023. And we will surely 
>>>>> drop it then. And anyone using it should. Then the single 
>>>>> scheduler only case will be gone (we will not support 5.7 
>>>>> anyway). I seriously doubt within a year we can develop "another" 
>>>>> scheduler and even if we do, if the only reason for it is not 
>>>>> supporting multiple schedulers, that would be the first thing to 
>>>>> drop in October 2023. And if we have a pluggable scheduler with 2 
>>>>> implementations by October 2023, I will be the first one to raise 
>>>>> "let's drop this non-locking scheduler as we don't need it any 
>>>>> more". If you look back - and imagine we are back in January 2019 
>>>>> and we would keep compatibility with Python 2.7. We would not be 
>>>>> where we are now. And we need to look into the modern, new 
>>>>> Airflow future rather than looking at some bad and discouraged 
>>>>> ways of using Airflow. Even more, we should encourage and help 
>>>>> the users that are using Airflow in those "non-future-proof" ways 
>>>>> to switch to use the new ways. And add features that make current 
>>>>> scheduler more appealing for the cases you mentioned
>>>>> 
>>>>> Also the 2.4 of Airflow brings Datasets and Data-driven 
>>>>> scheduling. And as surprising as it might look, it will generally 
>>>>> solve the "big DAG/small DAG" problem. Simply speaking, the DAGs 
>>>>> of Airflow will suddenly start becoming more modular and you will 
>>>>> be able to do the same you did with huge 1000 tasks dags with 50 
>>>>> 20-tasks dags which will be connected via datasets. this will be 
>>>>> far better, more modular solution. And rather than complicating 
>>>>> Airflow by designing and implementing  multiple schedulers, I 
>>>>> would rather focus on developing tooling that will make 
>>>>> distributed DAG development far more appealing for any users. And 
>>>>> those users (like AirBnB - with huge DAGs) should follow the 
>>>>> suite in changing their approach - this will give them far more 
>>>>> capabilities, will enable them to distribute DAG development and 
>>>>> manage it way better than having a huge, simple DAG
>>>>> 
>>>>> Maybe instead of adding pluggable schedulers, we should rather 
>>>>> (after 2.4) work on a tooling that will help users with huge DAGs 
>>>>> to split them. Maybe we should add a way to prioritise DagRuns ? 
>>>>> Both of those are much more forward-looking than trying to 
>>>>> "cement" existing (bad) usage patterns IMHO by making them 
>>>>> "blessed" by having a 2nd type of scheduler supporting those 
>>>>> cases that should be solved differently.
>>>>> 
>>>>> That's how I see it.
>>>>> 
>>>>> J.
>>>>> 
>>>>> 
>>>>> On Tue, Aug 23, 2022 at 7:46 AM Ping Zhang <pingzh@umich.edu 
>>>>> <ma...@umich.edu>> wrote:
>>>>>> Hi Airflow community,
>>>>>> 
>>>>>> 
>>>>>> We are proposing to have the Airflow Scheduler adopt a pluggable 
>>>>>> pattern, similar to the executor.
>>>>>> 
>>>>>> 
>>>>>> Background:
>>>>>> 
>>>>>> Airflow 2.0 has introduced a new scheduler in AIP-15 (Scheduler 
>>>>>> HA + performance improvement) 
>>>>>> <https://airflow.apache.org/blog/airflow-two-point-oh-is-here/#massive-scheduler-performance-improvements>. 
>>>>>> The new scheduler leverages the skip-locked feature in the 
>>>>>> database to scale horizontally 
>>>>>> <https://airflow.apache.org/docs/apache-airflow/stable/concepts/scheduler.html#overview>. 
>>>>>> It works well for relatively small clusters (small number of 
>>>>>> tasks in a dag and small number of dag files) as shown in the 
>>>>>> benchmark results from the community:
>>>>>> 
>>>>>> 
>>>>>> Scenario (1000 tasks in total)
>>>>>> 
>>>>>> DAG shape
>>>>>> 
>>>>>> 1.10.10 Total Task Lag
>>>>>> 
>>>>>> 2.0 beta Total Task Lag
>>>>>> 
>>>>>> Speedup
>>>>>> 
>>>>>> 100 DAG files, 1 DAG per file,
>>>>>> 
>>>>>> 10 Tasks per DAG
>>>>>> 
>>>>>> Linear
>>>>>> 
>>>>>> 200 seconds
>>>>>> 
>>>>>> 11.6 seconds
>>>>>> 
>>>>>> 17 times
>>>>>> 
>>>>>> 10 DAG files, 1 DAG per file,
>>>>>> 
>>>>>> 100 Tasks per DAG
>>>>>> 
>>>>>> Linear
>>>>>> 
>>>>>> 144 seconds
>>>>>> 
>>>>>> 14.3 seconds
>>>>>> 
>>>>>> 10 times
>>>>>> 
>>>>>> 10 DAG files, 10 DAGs per file,
>>>>>> 
>>>>>> 10 Tasks per DAG
>>>>>> 
>>>>>> Binary Tree
>>>>>> 
>>>>>> 200 seconds
>>>>>> 
>>>>>> 12 seconds
>>>>>> 
>>>>>> 16 times
>>>>>> 
>>>>>> 
>>>>>> From: <https://www.astronomer.io/blog/airflow-2-scheduler>
>>>>>> 
>>>>>> From the most recent 2022 Airflow survey 
>>>>>> <https://docs.google.com/document/d/18E3gBbrPI6cHAKRkRIPfju9pOk4EJNd2M-1fRJO4glA/edit#heading=h.yhlzd4j2mpzz>, 
>>>>>> 81% of the Airflow users have between 1 to 250 DAGs in their 
>>>>>> largest Airflow instance (4.8% of users have more than 1000 
>>>>>> DAGs). 75% of the surveyed Airflow users have between 1 to 100 
>>>>>> tasks per DAG. The Airflow 2.0 scheduler can satisfy these needs.
>>>>>> 
>>>>>> However, there are cases where the Airflow 2.0 scheduler cannot 
>>>>>> be deployed due to:
>>>>>> 
>>>>>> The team cannot use more than one scheduler due to the 
>>>>>> company’s database team not supporting mysql 8+ or postgresql 
>>>>>> 10+. (Arguably, it is true that they should be supported but in 
>>>>>> reality, it can take quite a while for large companies to 
>>>>>> upgrade to newer db versions)
>>>>>> 
>>>>>> Airflow 2.0 treats all DagRuns with the same scheduling priority 
>>>>>> (see code 
>>>>>> <https://github.com/apache/airflow/blob/6b7a343b25b06ab592f19b7e70843dda2d7e0fdb/airflow/jobs/scheduler_job.py#L923>). 
>>>>>> This means DAGs with more DagRuns could be scheduled more often 
>>>>>> than others and large DAGs might slow down small DAGs 
>>>>>> scheduling. This may not be desired in some cases.
>>>>>> 
>>>>>> For very large scale clusters (with more than 10 million rows in 
>>>>>> the task instance table), the database tends to be the unstable 
>>>>>> component. The infra team does not want to add extra load to the 
>>>>>> database with more than one scheduler. However, with only one 
>>>>>> Airflow 2.0 scheduler, it cannot support large scale clusters as 
>>>>>> it has removed the multi-processing dag runs and only uses one 
>>>>>> core to schedule all dag runs 
>>>>>> <https://github.com/apache/airflow/blob/6b7a343b25b06ab592f19b7e70843dda2d7e0fdb/airflow/jobs/scheduler_job.py#L886-L976>.
>>>>>> 
>>>>>> The above limitations hinder evolving Airflow as a general 
>>>>>> purpose scheduling platform.
>>>>>> 
>>>>>> To address the above limitations and avoid making the scheduler 
>>>>>> core code larger and logic more complex, we propose to have a 
>>>>>> pluggable scheduler pattern. With that, the Airflow infra 
>>>>>> team/users can choose the best scheduler to satisfy their needs 
>>>>>> and even swap parts that need customization to achieve their 
>>>>>> best interest.
>>>>>> 
>>>>>> Please let me know your thoughts about this and look forward to 
>>>>>> feedback.
>>>>>> 
>>>>>> (Here is the google doc link, 
>>>>>> <https://docs.google.com/document/d/1njmX3D_9a4TjjG9CYPWJqdkb9EyXkeQPnycYaMTUQ_s/edit?usp=sharing> 
>>>>>> feel free to comment it in the doc)
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> Ping
>>>>>> 
>>>>>> 


Re: Airflow Pluggable Scheduler

Posted by Jarek Potiuk <ja...@potiuk.com>.
Why do you need to make it pluggable?

One could say that a better approach will be toi build your own scheduler
and run it instead of Airflow Scheduler and run it instead? Basically we
are talking about replacing the whole scheduling loop. Splitting it into
three separate steps makes it actually less flexible than replacing the
whole loop with - I think - questionable benefits. But maybe I am mistaken
:) ?

I am not saying it is better and that making it pluggale this way is a bad
idea, I just want to know what you want to achieve by making it pluggable
this way, compared to just writing your own scheduler from scratch using
existing Airflow classes ? In the model you proposed with the "pluggable
scheduler framework" - this is basically what would happen anyway if you
had to write your own scheduler but IMHO you will be bound and limited to
implementing those three methods in specific way, while when writing your
own scheduler you would be free to do anything in any way you want.

Ree: Kubernetes - I think we should not look at what others did but also
why they did it. I think there is a difference between Kubernetes in that
it provides much more than just scheduling - it provides resource
management, networking, cluster management, and scheduling is just part of
it. Airflow is quite a different beast altogether. It provides some tools
on the "task level" - dependency management, APIS that the tasks can use,
the way how individual tasks are executed etc. But on the "whole airflow"
level it does not provide any abstractions (resource management etc.)  that
will remain if you remove the scheduler. If you remove the scheduler
implementation, there is not much left - there are executors, but
(providing that we clean their API a bit and make them truly standalone)
they could be used totally independently on the "scheduling loop"
implementation. There is no need to have the same kind of loop with the
three steps that the current scheduler has.

I think we have another layer of abstraction here: "jobs" that seems more
appropriate for what you propose:

* backfill_job.py
* triggerer_job.py
* scheduler_job.py
* local_task_job.py
* base_job.py

What I think you want to do is to replace the whole "SchedulerJob". So why
not yourself (or anyone else) implement their own
"non_locking_scheduler_job.py" and plug it into CLI?

I am really curious to see the reasoning why the "pluggable" architecture
would be better from yourself (or anyone else) writing a new "scheduler"
job and accompanying scheduler CLI from scratch?

J.



On Tue, Sep 6, 2022 at 5:53 AM Ping Zhang <pi...@umich.edu> wrote:

> Hi Jarek, Tomasz and Vikram,
>
> Thanks for your comments.
>
> To be clear, we are not removing the scheduler but to make it pluggable,
> just like different executors in the core and the BackfillJob.
>
> Pluggable scheduler can give the flexibility to the users to choose or
> write a scheduler to meet their infra requirements while still leveraging
> the rest of Airflow protocol e.g. dag authoring without forking airflow. It
> does not make assumptions about how you should use/run airflow clusters. It
> also avoids fitting all use cases into one scheduler, leading to convoluted
> logic and being hard to test.
>
> The pluggable scheduler framework
> <https://gist.github.com/pingzh/6717ff99b4ca31d5b02161f7999a9dd8#file-airflowschedulerframework-py-L1-L41>
> is a very lightweight class (less than 40 lines of code), which only
> defines high level scheduler contracts. It does not change any features
> in the current scheduler. It uses the dependency injection pattern, which
> injects the scheduler_cls to the framework. It does not assume any
> implementation. It only abstracts common interfaces for the scheduler
> (similar to the BaseExecutor). On a very high level, it breaks down the `
> _do_scheduling()
> <https://github.com/apache/airflow/blob/a2db8fcb7df1a266e82e17b937c9c1cf01a16a42/airflow/jobs/scheduler_job.py#L918-L977>`
> into three interfaces:
>
>    1.
>
>    make_scheduling_decisions, which corresponds to this code block
>    <https://github.com/apache/airflow/blob/a2db8fcb7df1a266e82e17b937c9c1cf01a16a42/airflow/jobs/scheduler_job.py#L918-L943>
>    .
>    2.
>
>    send_queuable_tasks_to_executor, which corresponds to this code block
>    <https://github.com/apache/airflow/blob/a2db8fcb7df1a266e82e17b937c9c1cf01a16a42/airflow/jobs/scheduler_job.py#L945-L973>
>    .
>    3.
>
>    process_system_events, which corresponds to executor events and
>    zombies.
>
> Just like Kubernetes, it was originally designed for web applications and
> microservices. However, it has been extended to the offline data world. The Scheduling
> Framework
> <https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/>
> was proposed to make its scheduler more pluggable. Also, new Kubernetes
> schedulers, like YuniKorn
> <https://yunikorn.apache.org/docs/next/design/architecture/>, volcano
> <https://volcano.sh/en/> were created for different use cases and plugged
> to the Kubernetes ecosystem.
>
> Please let me know your thoughts.
>
> Ping
>
>
> On Mon, Aug 29, 2022 at 3:44 PM Vikram Koka <vi...@astronomer.io.invalid>
> wrote:
>
>> Hi Ping,
>>
>> Conceptually, I have a similar reaction to Jarek and Tomek above, but I
>> really want to understand the problem you have described with (2) before I
>> comment further.
>>
>> Can you please elaborate on the problems:
>> Airflow 2.0 treats all DagRuns with the same scheduling priority (see
>> code
>> <https://github.com/apache/airflow/blob/6b7a343b25b06ab592f19b7e70843dda2d7e0fdb/airflow/jobs/scheduler_job.py#L923>).
>> This means DAGs with more DagRuns could be scheduled more often than others
>> and large DAGs might slow down small DAGs scheduling. This may not be
>> desired in some cases.
>>
>> Can you please share a use case for the above issue?
>>
>> Thanks,
>> Vikram
>>
>>
>> On Sun, Aug 28, 2022 at 1:45 AM Tomasz Urbaszek <tu...@apache.org>
>> wrote:
>>
>>> Hey Ping,
>>>
>>> I somehow agree with Jarek that there's not much detail on what exactly
>>> would be the "pluggable" part.
>>>
>>> Is "new" scheduler the only solution to those problems you have
>>> mentioned? As said, maybe we should consider DagRuns prioritisation? If
>>> growing tables is a perf problem then maybe you should consider
>>> introducing some retention policy (it's done by the others as far as I
>>> know)?
>>>
>>> Personally I think we should focus on how to improve "out of the box"
>>> Airflow instead of adding more plug-something components.
>>>
>>> Cheers,
>>> Tomek
>>>
>>> On Sat, 27 Aug 2022 at 02:05, Jarek Potiuk <ja...@potiuk.com> wrote:
>>>
>>>> Hey Ping,
>>>>
>>>> I don't think there is nearly enough information in what you described
>>>> on what "pluggable" scheduler means. What I see in the doc and your
>>>> description is a problem statement, but not a solution.
>>>>
>>>> But please don't jump on trying to describe it just yet.
>>>>
>>>> I am very skeptical about the idea in general. if you remove scheduler.
>>>> from Ariflow Core, there is not much left. Airflow core is practically
>>>> speaking only a scheduler when-  it comes to the internals. I almost think
>>>> that if someone wants to make "scheduler" pluggable - that calls for
>>>> forking Airflow - and forking airflow (if someone wants to do it and
>>>> developing for themselves will be far more effective than trying to get a
>>>> "pluggable" architecture. Also because this is at most a tactical solution
>>>> IMHO.
>>>>
>>>> I believe a lot of this problem statement of yours is "past looking"
>>>> rather than "forward looking" and it does not include the fact that not
>>>> only Airflow changes but that the environment around it changes. And by the
>>>> time the result of any design and development of any such pluggable
>>>> solution might be even close to completion, the environment will change
>>>> already and we will be in a different place - both in terms of what Airflow
>>>> will be capable of and what external environment will be.
>>>>
>>>> MySQL 5.7 EOL is in a year - October 2023. And we will surely drop it
>>>> then. And anyone using it should. Then the single scheduler only case will
>>>> be gone (we will not support 5.7 anyway). I seriously doubt within a year
>>>> we can develop "another" scheduler and even if we do, if the only reason
>>>> for it is not supporting multiple schedulers, that would be the first thing
>>>> to drop in October 2023. And if we have a pluggable scheduler with 2
>>>> implementations by October 2023, I will be the first one to raise "let's
>>>> drop this non-locking scheduler as we don't need it any more". If you look
>>>> back - and imagine we are back in January 2019 and we would keep
>>>> compatibility with Python 2.7. We would not be where we are now. And we
>>>> need to look into the modern, new Airflow future rather than looking at
>>>> some bad and discouraged ways of using Airflow. Even more, we should
>>>> encourage and help the users that are using Airflow in those
>>>> "non-future-proof" ways to switch to use the new ways. And add features
>>>> that make current scheduler more appealing for the cases you mentioned
>>>>
>>>> Also the 2.4 of Airflow brings Datasets and Data-driven scheduling. And
>>>> as surprising as it might look, it will generally solve the "big DAG/small
>>>> DAG" problem. Simply speaking, the DAGs of Airflow will suddenly start
>>>> becoming more modular and you will be able to do the same you did with huge
>>>> 1000 tasks dags with 50 20-tasks dags which will be connected via datasets.
>>>> this will be far better, more modular solution. And rather than
>>>> complicating Airflow by designing and implementing  multiple schedulers, I
>>>> would rather focus on developing tooling that will make distributed DAG
>>>> development far more appealing for any users. And those users (like AirBnB
>>>> - with huge DAGs) should follow the suite in changing their approach - this
>>>> will give them far more capabilities, will enable them to distribute DAG
>>>> development and manage it way better than having a huge, simple DAG
>>>>
>>>> Maybe instead of adding pluggable schedulers, we should rather (after
>>>> 2.4) work on a tooling that will help users with huge DAGs to split them.
>>>> Maybe we should add a way to prioritise DagRuns ? Both of those are much
>>>> more forward-looking than trying to "cement" existing (bad) usage patterns
>>>> IMHO by making them "blessed" by having a 2nd type of scheduler supporting
>>>> those cases that should be solved differently.
>>>>
>>>> That's how I see it.
>>>>
>>>> J.
>>>>
>>>>
>>>> On Tue, Aug 23, 2022 at 7:46 AM Ping Zhang <pi...@umich.edu> wrote:
>>>>
>>>>> Hi Airflow community,
>>>>>
>>>>> We are proposing to have the Airflow Scheduler adopt a pluggable
>>>>> pattern, similar to the executor.
>>>>>
>>>>> Background:
>>>>>
>>>>> Airflow 2.0 has introduced a new scheduler in AIP-15 (Scheduler HA +
>>>>> performance improvement)
>>>>> <https://airflow.apache.org/blog/airflow-two-point-oh-is-here/#massive-scheduler-performance-improvements>.
>>>>> The new scheduler leverages the skip-locked feature in the database
>>>>> to scale horizontally
>>>>> <https://airflow.apache.org/docs/apache-airflow/stable/concepts/scheduler.html#overview>.
>>>>> It works well for relatively small clusters (small number of tasks in a dag
>>>>> and small number of dag files) as shown in the benchmark results from the
>>>>> community:
>>>>>
>>>>> Scenario (1000 tasks in total)
>>>>>
>>>>> DAG shape
>>>>>
>>>>> 1.10.10 Total Task Lag
>>>>>
>>>>> 2.0 beta Total Task Lag
>>>>>
>>>>> Speedup
>>>>>
>>>>> 100 DAG files, 1 DAG per file,
>>>>>
>>>>> 10 Tasks per DAG
>>>>>
>>>>> Linear
>>>>>
>>>>> 200 seconds
>>>>>
>>>>> 11.6 seconds
>>>>>
>>>>> 17 times
>>>>>
>>>>> 10 DAG files, 1 DAG per file,
>>>>>
>>>>> 100 Tasks per DAG
>>>>>
>>>>> Linear
>>>>>
>>>>> 144 seconds
>>>>>
>>>>> 14.3 seconds
>>>>>
>>>>> 10 times
>>>>>
>>>>> 10 DAG files, 10 DAGs per file,
>>>>>
>>>>> 10 Tasks per DAG
>>>>>
>>>>> Binary Tree
>>>>>
>>>>> 200 seconds
>>>>>
>>>>> 12 seconds
>>>>>
>>>>> 16 times
>>>>>
>>>>> From: https://www.astronomer.io/blog/airflow-2-scheduler
>>>>>
>>>>> From the most recent 2022 Airflow survey
>>>>> <https://docs.google.com/document/d/18E3gBbrPI6cHAKRkRIPfju9pOk4EJNd2M-1fRJO4glA/edit#heading=h.yhlzd4j2mpzz>,
>>>>> 81% of the Airflow users have between 1 to 250 DAGs in their largest
>>>>> Airflow instance (4.8% of users have more than 1000 DAGs). 75% of the
>>>>> surveyed Airflow users have between 1 to 100 tasks per DAG. The Airflow 2.0
>>>>> scheduler can satisfy these needs.
>>>>>
>>>>> However, there are cases where the Airflow 2.0 scheduler cannot be
>>>>> deployed due to:
>>>>>
>>>>>    1.
>>>>>
>>>>>    The team cannot use more than one scheduler due to the company’s
>>>>>    database team not supporting mysql 8+ or postgresql 10+. (Arguably, it is
>>>>>    true that they should be supported but in reality, it can take quite a
>>>>>    while for large companies to upgrade to newer db versions)
>>>>>    2.
>>>>>
>>>>>    Airflow 2.0 treats all DagRuns with the same scheduling priority (see
>>>>>    code
>>>>>    <https://github.com/apache/airflow/blob/6b7a343b25b06ab592f19b7e70843dda2d7e0fdb/airflow/jobs/scheduler_job.py#L923>).
>>>>>    This means DAGs with more DagRuns could be scheduled more often than others
>>>>>    and large DAGs might slow down small DAGs scheduling. This may not be
>>>>>    desired in some cases.
>>>>>    3.
>>>>>
>>>>>    For very large scale clusters (with more than 10 million rows in
>>>>>    the task instance table), the database tends to be the unstable component.
>>>>>    The infra team does not want to add extra load to the database with more
>>>>>    than one scheduler. However, with only one Airflow 2.0 scheduler, it cannot
>>>>>    support large scale clusters as it has removed the multi-processing dag
>>>>>    runs and only uses one core to schedule all dag runs
>>>>>    <https://github.com/apache/airflow/blob/6b7a343b25b06ab592f19b7e70843dda2d7e0fdb/airflow/jobs/scheduler_job.py#L886-L976>
>>>>>    .
>>>>>
>>>>> The above limitations hinder evolving Airflow as a general purpose
>>>>> scheduling platform.
>>>>>
>>>>> To address the above limitations and avoid making the scheduler core
>>>>> code larger and logic more complex, we propose to have a pluggable
>>>>> scheduler pattern. With that, the Airflow infra team/users can choose the
>>>>> best scheduler to satisfy their needs and even swap parts that need
>>>>> customization to achieve their best interest.
>>>>>
>>>>> Please let me know your thoughts about this and look forward to
>>>>> feedback.
>>>>>
>>>>> (Here is the google doc link,
>>>>> https://docs.google.com/document/d/1njmX3D_9a4TjjG9CYPWJqdkb9EyXkeQPnycYaMTUQ_s/edit?usp=sharing
>>>>> feel free to comment it in the doc)
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Ping
>>>>>
>>>>>