You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Deng Xiaodong <xd...@gmail.com> on 2018/10/31 14:38:25 UTC

A Naive Multi-Scheduler Architecture Experiment of Airflow

Hi Folks,

Previously I initiated a discussion about the best practice of Airflow setting-up, and it was agreed by a few folks that scheduler may become one of the bottleneck component (we can only run one scheduler instance, can only scale vertically rather than horizontally, etc.). Especially when we have thousands of DAGs, the scheduling latency may be high.

In our team, we have experimented a naive multiple-scheduler architecture. Would like to share here, and also seek inputs from you.

*1. Background*
- Inside DAG_Folder, we can have sub-folders.
- When we initiate scheduler instance, we can specify “--subdir” for it, which will specify the specific directory that the scheduler is going to “scan” (https://airflow.apache.org/cli.html#scheduler).

*2. Our Naive Idea*
Say we have 2,000 DAGs. If we run one single scheduler instance, one scheduling loop will traverse all 2K DAGs.

Our idea is:
Step-1: Create multiple sub-directories, say five, under DAG_Folder (subdir1, subdir2, …, subdir5)
Step-2: Distribute the DAGs evenly into these sub-directories (400 DAGs in each)
Step-3: then we can start scheduler instance on 5 different machines, using command `airflow scheduler --subdir subdir<i>` on machine <i>.

Hence eventually, each scheduler only needs to take care of 400 DAGs.

*3. Test & Results*
- We have done a testing using 2,000 DAGs (3 tasks in each DAG).
- DAGs are stored using network attached storage (the same drive mounted to all nodes), so we don’t concern about the DAG_Folder synchronization.
- No conflict observed (each DAG file will only be parsed & scheduled by one scheduler instance).
- The scheduling speed improves almost linearly. Demonstrated that we can scale scheduler horizontally.

*4. Highlight*
- This naive idea doesn’t address scheduler availability.
- As Kelvin Yang shared earlier in another thread, the database may be another bottleneck when the load is high. But this is not considered here yet.


Kindly share your thoughts on this naive idea. Thanks.



Best regards,
XD





Re: A Naive Multi-Scheduler Architecture Experiment of Airflow

Posted by Deng Xiaodong <xd...@gmail.com>.
Thanks Devjyoti for your reply.

To elaborate based on your inputs: 

- *When to add one more shard*:
We have designed some metrics, like "how long the scheduler instance takes to parse & schedule all DAGs (in the subdir it’s taking care of)". When the metric is higher than a given threshold for long enough time, we may want to add one more shard. 

- *Easy Solution to Balance Shard Load*:
Exactly the same as you’re pointing out, we create initial set of shards by randomly distribute our DAGs into each subdir. Similar to building a mathematical model, there are some assumptions we have to make for convenience, like “complexity of DAGs are roughly equal”.
As for new DAGs: we developed an application creating DAGs based on metadata, and the application would check the # of files in each subdir and always put the new DAG into the subdir with the least # of DAGs.


XD

> On 2 Nov 2018, at 12:47 AM, Devjyoti Patra <de...@qubole.com> wrote:
> 
>>> 1. “Shard by # of files may not yield same load”: fully agree with you.
> This concern was also raised by other co-workers in my team. But given this
> is a preliminary trial, we didn’t consider this yet.
> 
> One issue here is that when do you decide to add one more shard? I think if
> you monitor the time it takes to parse each source file and log it; you can
> use this to find the outliers when your scheduling SLA is breached and move
> the outliers to a new shard. Creating the initial set of shard by randomly
> putting an equal number of files in each subdir seems like the easiest way
> to approach this problem.
> 
> On Thu, Nov 1, 2018 at 7:11 PM Deng Xiaodong <xd...@gmail.com> wrote:
> 
>> Thanks Kelvin and Max for your inputs!
>> 
>> To Kelvin’s questions:
>> 1. “Shard by # of files may not yield same load”: fully agree with you.
>> This concern was also raised by other co-workers in my team. But given this
>> is a preliminary trial, we didn’t consider this yet.
>> 2. We haven’t started to look into how we can dynamically allocate
>> scheduler resource yet. But I think this preliminary trial would be a good
>> starting point.
>> 3. DB: look forward to your PR on this!
>> 4. “Why do you need to shard the scheduler while the scheduler can scale
>> up pretty high”
>> There are a few reasons:
>> 4.1 we have strict SLA on scheduling. We expect one scheduling loop takes
>> < 3 minutes no matter how many DAGs we have
>> 4.2 we’re containerising the deployment, while our infrastructure team
>> added the restriction that for each pod we can only use up to 2 cores
>> (blocked us from scaling vertically).
>> 4.3 even though this naive architecture doesn’t provide HA, actually it
>> partially addresses the availability concern (if one scheduler out of 5
>> fails, at least 80% DAGs can still be scheduled properly).
>> 
>> To Max’s questions:
>> 1. I haven’t tested pools or queues features with this architecture. So
>> can’t give a very firm answer on this.
>> 2. In the load tests I have done, I haven’t observed such “misfires” yet
>> (I’m running a customised version based on 1.10.0 BTW)
>> 3. This is a very valid point. I haven’t checked the implementation of DAG
>> prioritisation in detail yet. For the scenario in our team, we don’t
>> prioritise DAGs, so we didn’t take this into consideration. On the other
>> hand, this naive architecture didn’t change anything in Airflow. It simply
>> makes use of the “--subdir” argument of scheduler command. If we want to
>> have a more serious multi-scheduler setting-up natively supported by
>> Airflow, I believe for sure we need to make significant changes to the code
>> to ensure all features, like cross DAG prioritisation, are supported.
>> 
>> 
>> Kindly let me know your thoughts. Thanks!
>> 
>> XD
>> 
>> 
>>> On 1 Nov 2018, at 4:25 AM, Maxime Beauchemin <ma...@gmail.com>
>> wrote:
>>> 
>>> A few related thoughts:
>>> * there may be hiccups around concurrency (pools, queues), though the
>> worker should double-checks that the constraints are still met when firing
>> the task, so in theory this should be ok
>>> * there may be more "misfires" meaning the task gets sent to the worker,
>> but by the time it starts the conditions aren't met anymore because of a
>> race condition with one of the other schedulers. Here I'm assuming recent
>> versions of Airflow will simply eventually re-fire the misfires and heal
>>> * cross DAG prioritization can't really take place anymore as there's
>> not a shared "ready-to-run" list of task instances that can be sorted by
>> priority_weight. Whichever scheduler instance fires first is likely to get
>> the open slots first.
>>> 
>>> Max
>>> 
>>> 
>>> On Wed, Oct 31, 2018 at 1:00 PM Kevin Yang <yrqls21@gmail.com <mailto:
>> yrqls21@gmail.com>> wrote:
>>> Finally we start to talk about this seriously? Yeah! :D
>>> 
>>> For your approach, a few thoughts:
>>> 
>>>   1. Shard by # of files may not yield same load--even very different
>> load
>>>   since we may have some framework DAG file producing 500 DAG and take
>>>   forever to parse.
>>>   2. I think Alex Guziel <https://github.com/saguziel <
>> https://github.com/saguziel>> had previously
>>>   talked about using apache helix to shard the scheduler. I haven't
>> look a
>>>   lot into it but may be something you're interested in. I personally
>> like
>>>   that idea because we don't need to reinvent the wheel about a lot
>> stuff(
>>>   less code to maintain also ;) ).
>>>   3. About the DB part, I should be contributing back some changes that
>>>   can dramatically drop the DB CPU usage. Afterwards I think we should
>> have
>>>   plenty of headroom( assuming the traffic is ~4000 DAG files and ~40k
>>>   concurrency running task instances) so we should probly be fine here.
>>> 
>>> Also I'm kinda curious about your setup and want to understand why do you
>>> need to shard the scheduler, since the scheduler can now scale up pretty
>>> high actually.
>>> 
>>> Thank you for initiate the discussion, I think it can turn out to be a
>> very
>>> valuable and critical discussion--many people have been
>> thinking/discussing
>>> about this and I can't wait to hear the ideas :D
>>> 
>>> Cheers,
>>> Kevin Y
>>> 
>>> On Wed, Oct 31, 2018 at 7:38 AM Deng Xiaodong <xd.deng.r@gmail.com
>> <ma...@gmail.com>> wrote:
>>> 
>>>> Hi Folks,
>>>> 
>>>> Previously I initiated a discussion about the best practice of Airflow
>>>> setting-up, and it was agreed by a few folks that scheduler may become
>> one
>>>> of the bottleneck component (we can only run one scheduler instance,
>> can
>>>> only scale vertically rather than horizontally, etc.). Especially when
>> we
>>>> have thousands of DAGs, the scheduling latency may be high.
>>>> 
>>>> In our team, we have experimented a naive multiple-scheduler
>> architecture.
>>>> Would like to share here, and also seek inputs from you.
>>>> 
>>>> **1. Background**
>>>> - Inside DAG_Folder, we can have sub-folders.
>>>> - When we initiate scheduler instance, we can specify “--subdir” for
>> it,
>>>> which will specify the specific directory that the scheduler is going
>> to
>>>> “scan” (https://airflow.apache.org/cli.html#scheduler <
>> https://airflow.apache.org/cli.html#scheduler>).
>>>> 
>>>> **2. Our Naive Idea**
>>>> Say we have 2,000 DAGs. If we run one single scheduler instance, one
>>>> scheduling loop will traverse all 2K DAGs.
>>>> 
>>>> Our idea is:
>>>> Step-1: Create multiple sub-directories, say five, under DAG_Folder
>>>> (subdir1, subdir2, …, subdir5)
>>>> Step-2: Distribute the DAGs evenly into these sub-directories (400
>> DAGs in
>>>> each)
>>>> Step-3: then we can start scheduler instance on 5 different machines,
>>>> using command `airflow scheduler --subdir subdir<i>` on machine <i>.
>>>> 
>>>> Hence eventually, each scheduler only needs to take care of 400 DAGs.
>>>> 
>>>> **3. Test & Results**
>>>> - We have done a testing using 2,000 DAGs (3 tasks in each DAG).
>>>> - DAGs are stored using network attached storage (the same drive
>> mounted
>>>> to all nodes), so we don’t concern about the DAG_Folder
>> synchronization.
>>>> - No conflict observed (each DAG file will only be parsed & scheduled
>> by
>>>> one scheduler instance).
>>>> - The scheduling speed improves almost linearly. Demonstrated that we
>> can
>>>> scale scheduler horizontally.
>>>> 
>>>> **4. Highlight**
>>>> - This naive idea doesn’t address scheduler availability.
>>>> - As Kelvin Yang shared earlier in another thread, the database may be
>>>> another bottleneck when the load is high. But this is not considered
>> here
>>>> yet.
>>>> 
>>>> 
>>>> Kindly share your thoughts on this naive idea. Thanks.
>>>> 
>>>> 
>>>> 
>>>> Best regards,
>>>> XD
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>> 
>> 


Re: A Naive Multi-Scheduler Architecture Experiment of Airflow

Posted by Devjyoti Patra <de...@qubole.com>.
>> 1. “Shard by # of files may not yield same load”: fully agree with you.
This concern was also raised by other co-workers in my team. But given this
is a preliminary trial, we didn’t consider this yet.

One issue here is that when do you decide to add one more shard? I think if
you monitor the time it takes to parse each source file and log it; you can
use this to find the outliers when your scheduling SLA is breached and move
the outliers to a new shard. Creating the initial set of shard by randomly
putting an equal number of files in each subdir seems like the easiest way
to approach this problem.

On Thu, Nov 1, 2018 at 7:11 PM Deng Xiaodong <xd...@gmail.com> wrote:

> Thanks Kelvin and Max for your inputs!
>
> To Kelvin’s questions:
> 1. “Shard by # of files may not yield same load”: fully agree with you.
> This concern was also raised by other co-workers in my team. But given this
> is a preliminary trial, we didn’t consider this yet.
> 2. We haven’t started to look into how we can dynamically allocate
> scheduler resource yet. But I think this preliminary trial would be a good
> starting point.
> 3. DB: look forward to your PR on this!
> 4. “Why do you need to shard the scheduler while the scheduler can scale
> up pretty high”
> There are a few reasons:
> 4.1 we have strict SLA on scheduling. We expect one scheduling loop takes
> < 3 minutes no matter how many DAGs we have
> 4.2 we’re containerising the deployment, while our infrastructure team
> added the restriction that for each pod we can only use up to 2 cores
> (blocked us from scaling vertically).
> 4.3 even though this naive architecture doesn’t provide HA, actually it
> partially addresses the availability concern (if one scheduler out of 5
> fails, at least 80% DAGs can still be scheduled properly).
>
> To Max’s questions:
> 1. I haven’t tested pools or queues features with this architecture. So
> can’t give a very firm answer on this.
> 2. In the load tests I have done, I haven’t observed such “misfires” yet
> (I’m running a customised version based on 1.10.0 BTW)
> 3. This is a very valid point. I haven’t checked the implementation of DAG
> prioritisation in detail yet. For the scenario in our team, we don’t
> prioritise DAGs, so we didn’t take this into consideration. On the other
> hand, this naive architecture didn’t change anything in Airflow. It simply
> makes use of the “--subdir” argument of scheduler command. If we want to
> have a more serious multi-scheduler setting-up natively supported by
> Airflow, I believe for sure we need to make significant changes to the code
> to ensure all features, like cross DAG prioritisation, are supported.
>
>
> Kindly let me know your thoughts. Thanks!
>
> XD
>
>
> > On 1 Nov 2018, at 4:25 AM, Maxime Beauchemin <ma...@gmail.com>
> wrote:
> >
> > A few related thoughts:
> > * there may be hiccups around concurrency (pools, queues), though the
> worker should double-checks that the constraints are still met when firing
> the task, so in theory this should be ok
> > * there may be more "misfires" meaning the task gets sent to the worker,
> but by the time it starts the conditions aren't met anymore because of a
> race condition with one of the other schedulers. Here I'm assuming recent
> versions of Airflow will simply eventually re-fire the misfires and heal
> > * cross DAG prioritization can't really take place anymore as there's
> not a shared "ready-to-run" list of task instances that can be sorted by
> priority_weight. Whichever scheduler instance fires first is likely to get
> the open slots first.
> >
> > Max
> >
> >
> > On Wed, Oct 31, 2018 at 1:00 PM Kevin Yang <yrqls21@gmail.com <mailto:
> yrqls21@gmail.com>> wrote:
> > Finally we start to talk about this seriously? Yeah! :D
> >
> > For your approach, a few thoughts:
> >
> >    1. Shard by # of files may not yield same load--even very different
> load
> >    since we may have some framework DAG file producing 500 DAG and take
> >    forever to parse.
> >    2. I think Alex Guziel <https://github.com/saguziel <
> https://github.com/saguziel>> had previously
> >    talked about using apache helix to shard the scheduler. I haven't
> look a
> >    lot into it but may be something you're interested in. I personally
> like
> >    that idea because we don't need to reinvent the wheel about a lot
> stuff(
> >    less code to maintain also ;) ).
> >    3. About the DB part, I should be contributing back some changes that
> >    can dramatically drop the DB CPU usage. Afterwards I think we should
> have
> >    plenty of headroom( assuming the traffic is ~4000 DAG files and ~40k
> >    concurrency running task instances) so we should probly be fine here.
> >
> > Also I'm kinda curious about your setup and want to understand why do you
> > need to shard the scheduler, since the scheduler can now scale up pretty
> > high actually.
> >
> > Thank you for initiate the discussion, I think it can turn out to be a
> very
> > valuable and critical discussion--many people have been
> thinking/discussing
> > about this and I can't wait to hear the ideas :D
> >
> > Cheers,
> > Kevin Y
> >
> > On Wed, Oct 31, 2018 at 7:38 AM Deng Xiaodong <xd.deng.r@gmail.com
> <ma...@gmail.com>> wrote:
> >
> > > Hi Folks,
> > >
> > > Previously I initiated a discussion about the best practice of Airflow
> > > setting-up, and it was agreed by a few folks that scheduler may become
> one
> > > of the bottleneck component (we can only run one scheduler instance,
> can
> > > only scale vertically rather than horizontally, etc.). Especially when
> we
> > > have thousands of DAGs, the scheduling latency may be high.
> > >
> > > In our team, we have experimented a naive multiple-scheduler
> architecture.
> > > Would like to share here, and also seek inputs from you.
> > >
> > > **1. Background**
> > > - Inside DAG_Folder, we can have sub-folders.
> > > - When we initiate scheduler instance, we can specify “--subdir” for
> it,
> > > which will specify the specific directory that the scheduler is going
> to
> > > “scan” (https://airflow.apache.org/cli.html#scheduler <
> https://airflow.apache.org/cli.html#scheduler>).
> > >
> > > **2. Our Naive Idea**
> > > Say we have 2,000 DAGs. If we run one single scheduler instance, one
> > > scheduling loop will traverse all 2K DAGs.
> > >
> > > Our idea is:
> > > Step-1: Create multiple sub-directories, say five, under DAG_Folder
> > > (subdir1, subdir2, …, subdir5)
> > > Step-2: Distribute the DAGs evenly into these sub-directories (400
> DAGs in
> > > each)
> > > Step-3: then we can start scheduler instance on 5 different machines,
> > > using command `airflow scheduler --subdir subdir<i>` on machine <i>.
> > >
> > > Hence eventually, each scheduler only needs to take care of 400 DAGs.
> > >
> > > **3. Test & Results**
> > > - We have done a testing using 2,000 DAGs (3 tasks in each DAG).
> > > - DAGs are stored using network attached storage (the same drive
> mounted
> > > to all nodes), so we don’t concern about the DAG_Folder
> synchronization.
> > > - No conflict observed (each DAG file will only be parsed & scheduled
> by
> > > one scheduler instance).
> > > - The scheduling speed improves almost linearly. Demonstrated that we
> can
> > > scale scheduler horizontally.
> > >
> > > **4. Highlight**
> > > - This naive idea doesn’t address scheduler availability.
> > > - As Kelvin Yang shared earlier in another thread, the database may be
> > > another bottleneck when the load is high. But this is not considered
> here
> > > yet.
> > >
> > >
> > > Kindly share your thoughts on this naive idea. Thanks.
> > >
> > >
> > >
> > > Best regards,
> > > XD
> > >
> > >
> > >
> > >
> > >
>
>

Re: A Naive Multi-Scheduler Architecture Experiment of Airflow

Posted by Deng Xiaodong <xd...@gmail.com>.
Thanks Kelvin and Max for your inputs!

To Kelvin’s questions:
1. “Shard by # of files may not yield same load”: fully agree with you. This concern was also raised by other co-workers in my team. But given this is a preliminary trial, we didn’t consider this yet.
2. We haven’t started to look into how we can dynamically allocate scheduler resource yet. But I think this preliminary trial would be a good starting point. 
3. DB: look forward to your PR on this!
4. “Why do you need to shard the scheduler while the scheduler can scale up pretty high”
There are a few reasons:
4.1 we have strict SLA on scheduling. We expect one scheduling loop takes < 3 minutes no matter how many DAGs we have
4.2 we’re containerising the deployment, while our infrastructure team added the restriction that for each pod we can only use up to 2 cores (blocked us from scaling vertically).
4.3 even though this naive architecture doesn’t provide HA, actually it partially addresses the availability concern (if one scheduler out of 5 fails, at least 80% DAGs can still be scheduled properly).

To Max’s questions:
1. I haven’t tested pools or queues features with this architecture. So can’t give a very firm answer on this.
2. In the load tests I have done, I haven’t observed such “misfires” yet (I’m running a customised version based on 1.10.0 BTW)
3. This is a very valid point. I haven’t checked the implementation of DAG prioritisation in detail yet. For the scenario in our team, we don’t prioritise DAGs, so we didn’t take this into consideration. On the other hand, this naive architecture didn’t change anything in Airflow. It simply makes use of the “--subdir” argument of scheduler command. If we want to have a more serious multi-scheduler setting-up natively supported by Airflow, I believe for sure we need to make significant changes to the code to ensure all features, like cross DAG prioritisation, are supported.


Kindly let me know your thoughts. Thanks!

XD
 

> On 1 Nov 2018, at 4:25 AM, Maxime Beauchemin <ma...@gmail.com> wrote:
> 
> A few related thoughts:
> * there may be hiccups around concurrency (pools, queues), though the worker should double-checks that the constraints are still met when firing the task, so in theory this should be ok
> * there may be more "misfires" meaning the task gets sent to the worker, but by the time it starts the conditions aren't met anymore because of a race condition with one of the other schedulers. Here I'm assuming recent versions of Airflow will simply eventually re-fire the misfires and heal
> * cross DAG prioritization can't really take place anymore as there's not a shared "ready-to-run" list of task instances that can be sorted by priority_weight. Whichever scheduler instance fires first is likely to get the open slots first.
> 
> Max
> 
> 
> On Wed, Oct 31, 2018 at 1:00 PM Kevin Yang <yrqls21@gmail.com <ma...@gmail.com>> wrote:
> Finally we start to talk about this seriously? Yeah! :D
> 
> For your approach, a few thoughts:
> 
>    1. Shard by # of files may not yield same load--even very different load
>    since we may have some framework DAG file producing 500 DAG and take
>    forever to parse.
>    2. I think Alex Guziel <https://github.com/saguziel <https://github.com/saguziel>> had previously
>    talked about using apache helix to shard the scheduler. I haven't look a
>    lot into it but may be something you're interested in. I personally like
>    that idea because we don't need to reinvent the wheel about a lot stuff(
>    less code to maintain also ;) ).
>    3. About the DB part, I should be contributing back some changes that
>    can dramatically drop the DB CPU usage. Afterwards I think we should have
>    plenty of headroom( assuming the traffic is ~4000 DAG files and ~40k
>    concurrency running task instances) so we should probly be fine here.
> 
> Also I'm kinda curious about your setup and want to understand why do you
> need to shard the scheduler, since the scheduler can now scale up pretty
> high actually.
> 
> Thank you for initiate the discussion, I think it can turn out to be a very
> valuable and critical discussion--many people have been thinking/discussing
> about this and I can't wait to hear the ideas :D
> 
> Cheers,
> Kevin Y
> 
> On Wed, Oct 31, 2018 at 7:38 AM Deng Xiaodong <xd.deng.r@gmail.com <ma...@gmail.com>> wrote:
> 
> > Hi Folks,
> >
> > Previously I initiated a discussion about the best practice of Airflow
> > setting-up, and it was agreed by a few folks that scheduler may become one
> > of the bottleneck component (we can only run one scheduler instance, can
> > only scale vertically rather than horizontally, etc.). Especially when we
> > have thousands of DAGs, the scheduling latency may be high.
> >
> > In our team, we have experimented a naive multiple-scheduler architecture.
> > Would like to share here, and also seek inputs from you.
> >
> > **1. Background**
> > - Inside DAG_Folder, we can have sub-folders.
> > - When we initiate scheduler instance, we can specify “--subdir” for it,
> > which will specify the specific directory that the scheduler is going to
> > “scan” (https://airflow.apache.org/cli.html#scheduler <https://airflow.apache.org/cli.html#scheduler>).
> >
> > **2. Our Naive Idea**
> > Say we have 2,000 DAGs. If we run one single scheduler instance, one
> > scheduling loop will traverse all 2K DAGs.
> >
> > Our idea is:
> > Step-1: Create multiple sub-directories, say five, under DAG_Folder
> > (subdir1, subdir2, …, subdir5)
> > Step-2: Distribute the DAGs evenly into these sub-directories (400 DAGs in
> > each)
> > Step-3: then we can start scheduler instance on 5 different machines,
> > using command `airflow scheduler --subdir subdir<i>` on machine <i>.
> >
> > Hence eventually, each scheduler only needs to take care of 400 DAGs.
> >
> > **3. Test & Results**
> > - We have done a testing using 2,000 DAGs (3 tasks in each DAG).
> > - DAGs are stored using network attached storage (the same drive mounted
> > to all nodes), so we don’t concern about the DAG_Folder synchronization.
> > - No conflict observed (each DAG file will only be parsed & scheduled by
> > one scheduler instance).
> > - The scheduling speed improves almost linearly. Demonstrated that we can
> > scale scheduler horizontally.
> >
> > **4. Highlight**
> > - This naive idea doesn’t address scheduler availability.
> > - As Kelvin Yang shared earlier in another thread, the database may be
> > another bottleneck when the load is high. But this is not considered here
> > yet.
> >
> >
> > Kindly share your thoughts on this naive idea. Thanks.
> >
> >
> >
> > Best regards,
> > XD
> >
> >
> >
> >
> >


Re: A Naive Multi-Scheduler Architecture Experiment of Airflow

Posted by Maxime Beauchemin <ma...@gmail.com>.
A few related thoughts:
* there may be hiccups around concurrency (pools, queues), though the
worker should double-checks that the constraints are still met when firing
the task, so in theory this should be ok
* there may be more "misfires" meaning the task gets sent to the worker,
but by the time it starts the conditions aren't met anymore because of a
race condition with one of the other schedulers. Here I'm assuming recent
versions of Airflow will simply eventually re-fire the misfires and heal
* cross DAG prioritization can't really take place anymore as there's not a
shared "ready-to-run" list of task instances that can be sorted by
priority_weight. Whichever scheduler instance fires first is likely to get
the open slots first.

Max


On Wed, Oct 31, 2018 at 1:00 PM Kevin Yang <yr...@gmail.com> wrote:

> Finally we start to talk about this seriously? Yeah! :D
>
> For your approach, a few thoughts:
>
>    1. Shard by # of files may not yield same load--even very different load
>    since we may have some framework DAG file producing 500 DAG and take
>    forever to parse.
>    2. I think Alex Guziel <https://github.com/saguziel> had previously
>    talked about using apache helix to shard the scheduler. I haven't look a
>    lot into it but may be something you're interested in. I personally like
>    that idea because we don't need to reinvent the wheel about a lot stuff(
>    less code to maintain also ;) ).
>    3. About the DB part, I should be contributing back some changes that
>    can dramatically drop the DB CPU usage. Afterwards I think we should
> have
>    plenty of headroom( assuming the traffic is ~4000 DAG files and ~40k
>    concurrency running task instances) so we should probly be fine here.
>
> Also I'm kinda curious about your setup and want to understand why do you
> need to shard the scheduler, since the scheduler can now scale up pretty
> high actually.
>
> Thank you for initiate the discussion, I think it can turn out to be a very
> valuable and critical discussion--many people have been thinking/discussing
> about this and I can't wait to hear the ideas :D
>
> Cheers,
> Kevin Y
>
> On Wed, Oct 31, 2018 at 7:38 AM Deng Xiaodong <xd...@gmail.com> wrote:
>
> > Hi Folks,
> >
> > Previously I initiated a discussion about the best practice of Airflow
> > setting-up, and it was agreed by a few folks that scheduler may become
> one
> > of the bottleneck component (we can only run one scheduler instance, can
> > only scale vertically rather than horizontally, etc.). Especially when we
> > have thousands of DAGs, the scheduling latency may be high.
> >
> > In our team, we have experimented a naive multiple-scheduler
> architecture.
> > Would like to share here, and also seek inputs from you.
> >
> > **1. Background**
> > - Inside DAG_Folder, we can have sub-folders.
> > - When we initiate scheduler instance, we can specify “--subdir” for it,
> > which will specify the specific directory that the scheduler is going to
> > “scan” (https://airflow.apache.org/cli.html#scheduler).
> >
> > **2. Our Naive Idea**
> > Say we have 2,000 DAGs. If we run one single scheduler instance, one
> > scheduling loop will traverse all 2K DAGs.
> >
> > Our idea is:
> > Step-1: Create multiple sub-directories, say five, under DAG_Folder
> > (subdir1, subdir2, …, subdir5)
> > Step-2: Distribute the DAGs evenly into these sub-directories (400 DAGs
> in
> > each)
> > Step-3: then we can start scheduler instance on 5 different machines,
> > using command `airflow scheduler --subdir subdir<i>` on machine <i>.
> >
> > Hence eventually, each scheduler only needs to take care of 400 DAGs.
> >
> > **3. Test & Results**
> > - We have done a testing using 2,000 DAGs (3 tasks in each DAG).
> > - DAGs are stored using network attached storage (the same drive mounted
> > to all nodes), so we don’t concern about the DAG_Folder synchronization.
> > - No conflict observed (each DAG file will only be parsed & scheduled by
> > one scheduler instance).
> > - The scheduling speed improves almost linearly. Demonstrated that we can
> > scale scheduler horizontally.
> >
> > **4. Highlight**
> > - This naive idea doesn’t address scheduler availability.
> > - As Kelvin Yang shared earlier in another thread, the database may be
> > another bottleneck when the load is high. But this is not considered here
> > yet.
> >
> >
> > Kindly share your thoughts on this naive idea. Thanks.
> >
> >
> >
> > Best regards,
> > XD
> >
> >
> >
> >
> >
>

Re: A Naive Multi-Scheduler Architecture Experiment of Airflow

Posted by Kevin Yang <yr...@gmail.com>.
Finally we start to talk about this seriously? Yeah! :D

For your approach, a few thoughts:

   1. Shard by # of files may not yield same load--even very different load
   since we may have some framework DAG file producing 500 DAG and take
   forever to parse.
   2. I think Alex Guziel <https://github.com/saguziel> had previously
   talked about using apache helix to shard the scheduler. I haven't look a
   lot into it but may be something you're interested in. I personally like
   that idea because we don't need to reinvent the wheel about a lot stuff(
   less code to maintain also ;) ).
   3. About the DB part, I should be contributing back some changes that
   can dramatically drop the DB CPU usage. Afterwards I think we should have
   plenty of headroom( assuming the traffic is ~4000 DAG files and ~40k
   concurrency running task instances) so we should probly be fine here.

Also I'm kinda curious about your setup and want to understand why do you
need to shard the scheduler, since the scheduler can now scale up pretty
high actually.

Thank you for initiate the discussion, I think it can turn out to be a very
valuable and critical discussion--many people have been thinking/discussing
about this and I can't wait to hear the ideas :D

Cheers,
Kevin Y

On Wed, Oct 31, 2018 at 7:38 AM Deng Xiaodong <xd...@gmail.com> wrote:

> Hi Folks,
>
> Previously I initiated a discussion about the best practice of Airflow
> setting-up, and it was agreed by a few folks that scheduler may become one
> of the bottleneck component (we can only run one scheduler instance, can
> only scale vertically rather than horizontally, etc.). Especially when we
> have thousands of DAGs, the scheduling latency may be high.
>
> In our team, we have experimented a naive multiple-scheduler architecture.
> Would like to share here, and also seek inputs from you.
>
> **1. Background**
> - Inside DAG_Folder, we can have sub-folders.
> - When we initiate scheduler instance, we can specify “--subdir” for it,
> which will specify the specific directory that the scheduler is going to
> “scan” (https://airflow.apache.org/cli.html#scheduler).
>
> **2. Our Naive Idea**
> Say we have 2,000 DAGs. If we run one single scheduler instance, one
> scheduling loop will traverse all 2K DAGs.
>
> Our idea is:
> Step-1: Create multiple sub-directories, say five, under DAG_Folder
> (subdir1, subdir2, …, subdir5)
> Step-2: Distribute the DAGs evenly into these sub-directories (400 DAGs in
> each)
> Step-3: then we can start scheduler instance on 5 different machines,
> using command `airflow scheduler --subdir subdir<i>` on machine <i>.
>
> Hence eventually, each scheduler only needs to take care of 400 DAGs.
>
> **3. Test & Results**
> - We have done a testing using 2,000 DAGs (3 tasks in each DAG).
> - DAGs are stored using network attached storage (the same drive mounted
> to all nodes), so we don’t concern about the DAG_Folder synchronization.
> - No conflict observed (each DAG file will only be parsed & scheduled by
> one scheduler instance).
> - The scheduling speed improves almost linearly. Demonstrated that we can
> scale scheduler horizontally.
>
> **4. Highlight**
> - This naive idea doesn’t address scheduler availability.
> - As Kelvin Yang shared earlier in another thread, the database may be
> another bottleneck when the load is high. But this is not considered here
> yet.
>
>
> Kindly share your thoughts on this naive idea. Thanks.
>
>
>
> Best regards,
> XD
>
>
>
>
>

Re: A Naive Multi-Scheduler Architecture Experiment of Airflow

Posted by Maxime Beauchemin <ma...@gmail.com>.
I feel pretty strongly about not having a ZK dependency/requirement for the
multi scheduler setup. ZK is a fine piece of tech that provides guarantees
we need, but having to install/maintain it is very prohibitive and another
potential point of failure. I don't know the latest but I heard that the
Apache Druid community is ripping out their ZK dependency in favor a
lighter weight Raft Java implementation.

Also voting against on sharding on DAG filename/filepath for the final
solution as it isn't HA (I do understand it's an easy and interesting hack
in the meantime though). Having any scheduler schedule any DAG is so much
more robust and predictable. 10k DAGs running every minute is <200 locks
per second. My feeling is that MySQL / Postgres can totally eat up that
load. For leader election there's probably a python Raft implementation out
there we can use.

Max

On Fri, Nov 9, 2018 at 4:48 AM Daniel (Daniel Lamblin) [BDP - Seoul] <
lamblin@coupang.com> wrote:

> The missing points you brought up, yes that was one of the reasons it
> seemed like getting zookeeper or a DB coordinated procedure involved to
> both count and number the schedulers and mark one of them the lead. Locking
> each dag file for processing sounds easier, but we were seeing update
> transactions fail already without adding more pressure on the DB. Locking
> is another thing zk can handle. But adding zk seems like such deployment
> overhead that scheduler type like executor type needs to become a modular
> option in the process of the change.
>
> The ignore pattern method that was in use described earlier was basically
> adding an entry to a top level .airflowignore file via a flag or env
> instead of making the file. Now that was simple, with all the drawbacks
> mentioned already.
>
> Get Outlook for Android<https://aka.ms/ghei36>
>
> ________________________________
> From: Maxime Beauchemin <ma...@gmail.com>
> Sent: Friday, November 9, 2018 5:03:02 PM
> To: dev@airflow.incubator.apache.org
> Cc: dev@airflow.apache.org; yrqls21@gmail.com
> Subject: Re: A Naive Multi-Scheduler Architecture Experiment of Airflow
>
> [CAUTION]: This email originated from outside of the organization. Do not
> click links or open attachments unless you recognize the sender and know
> the content is safe.
> [주의]: 본 이메일은 회사 외부에서 유입된 이메일입니다. 발신자의 신원과 이메일 내용이 안전한지 확인하기 전까지는 이메일에 포함된
> 링크를 클릭하거나 첨부파일을 열지 마십시오.
>
>
> I mean at that point it's just as easy (or easier) to do things properly:
> get the scheduler subprocesses to take a lock on the DAG it's about to
> process, and release it when it's done. Add a lock timestamp and bit of
> logic to expire locks (to self heal if the process ever crashed and failed
> at releasing the lock). Of course make sure that the
> confirm-its-not-locked-and-take-a-lock process is insulated in a database
> transaction, and your'e mostly good. That sounds like a very easy thing to
> do.
>
> The only thing that's missing at that point to fully support
> multi-schedulers is to centralize the logic that does the prioritization
> and pushing to workers. That's a bit more complicated, it assumes a leader
> (and leader election), and to change the logic of how individual
> "DAG-evaluator processes" communicate what task instances are runnable to
> that leader (over a message queue? over the database?).
>
> Max
>
> On Thu, Nov 8, 2018 at 10:02 AM Daniel (Daniel Lamblin) [BDP - Seoul] <
> lamblin@coupang.com> wrote:
>
> > Since you're discussing multi-scheduler trials,
> > Based on v1.8 we have also tried something, based on passing in a regex
> to
> > each scheduler; DAG file paths which match it are ignored. This required
> > turning off some logic that deletes dag data for dags that are missing
> from
> > the dagbag.
> > It is pretty manual and not evenly distributed, but it allows some 5000+
> > DAGs or so with 6 scheduler instances. That said there's some pain around
> > maintaining such a setup, so we didn't opt for it (yet) in our v1.10
> setup.
> > The lack of cleaning up an old dag name is also not great (it can be done
> > semi manually). Then there's the work in trying to redefine patterns for
> > better mixes, testing that patterns don't all ignore the same file, nor
> > that more than one scheduler includes the same file. I generally wouldn't
> > suggest this approach.
> >
> > In considering to setup a similar modification to v1.10, we thought it
> > would make sense to instead tell each scheduler which scheduler number it
> > is, and how many total schedulers there are. Then each scheduler can use
> > some hash (cityhash?) on the whole py file path, mod it by the scheduler
> > count, and only parse it if it matches its scheduler number.
> >
> > This seemed like a good way to keep a fixed number of schedulers
> balancing
> > new dag files, but we didn't do it (yet) because we started to think
> about
> > getting fancier: what if a scheduler needs to be added? Can it be done
> > without stopping the others and update the total count; or vice-versa for
> > removing a scheduler. If one scheduler drops out can the others renumber
> > themselves? If that could be solved, then the schedulers could be made
> into
> > an autoscaling group… For this we thought about wrapping the whole
> > scheduler instance's process up in some watchdog that might coordinate
> with
> > something like zookeeper (or by using the existing airflow DB) but it got
> > to be full of potential loopholes for the schedulers, like needing to be
> in
> > sync about refilling the dagbag in concert with each other when there's a
> > change in the total count, and problems when one drops off but is
> actually
> > not really down for the count and pops back in having missed that the
> > others decided changed their numbering, etc.
> >
> > I bring this up because the basic form of the ideas doesn't hinge on
> which
> > folder a dag is in, which seems more likely to work nicely with team
> based
> > hierarchies which also import reusable modules across DAG files.
> > -Daniel
> > P.S. yeah we did find there were times when schedulers exited because
> > there was a db lock on task instances they were trying to update. So the
> DB
> > needs to be managed by someone who knows how to scale it for that… or
> > possibly the model needs to be made more conducive to minimally locking
> > updates.
> >
> > On 10/31/18, 11:38 PM, "Deng Xiaodong" <xd...@gmail.com> wrote:
> >
> >     Hi Folks,
> >
> >     Previously I initiated a discussion about the best practice of
> Airflow
> > setting-up, and it was agreed by a few folks that scheduler may become
> one
> > of the bottleneck component (we can only run one scheduler instance, can
> > only scale vertically rather than horizontally, etc.). Especially when we
> > have thousands of DAGs, the scheduling latency may be high.
> >
> >     In our team, we have experimented a naive multiple-scheduler
> > architecture. Would like to share here, and also seek inputs from you.
> >
> >     *1. Background*
> >     - Inside DAG_Folder, we can have sub-folders.
> >     - When we initiate scheduler instance, we can specify “--subdir” for
> > it, which will specify the specific directory that the scheduler is going
> > to “scan” (https://airflow.apache.org/cli.html#scheduler).
> >
> >     *2. Our Naive Idea*
> >     Say we have 2,000 DAGs. If we run one single scheduler instance, one
> > scheduling loop will traverse all 2K DAGs.
> >
> >     Our idea is:
> >     Step-1: Create multiple sub-directories, say five, under DAG_Folder
> > (subdir1, subdir2, …, subdir5)
> >     Step-2: Distribute the DAGs evenly into these sub-directories (400
> > DAGs in each)
> >     Step-3: then we can start scheduler instance on 5 different machines,
> > using command `airflow scheduler --subdir subdir<i>` on machine <i>.
> >
> >     Hence eventually, each scheduler only needs to take care of 400 DAGs.
> >
> >     *3. Test & Results*
> >     - We have done a testing using 2,000 DAGs (3 tasks in each DAG).
> >     - DAGs are stored using network attached storage (the same drive
> > mounted to all nodes), so we don’t concern about the DAG_Folder
> > synchronization.
> >     - No conflict observed (each DAG file will only be parsed & scheduled
> > by one scheduler instance).
> >     - The scheduling speed improves almost linearly. Demonstrated that we
> > can scale scheduler horizontally.
> >
> >     *4. Highlight*
> >     - This naive idea doesn’t address scheduler availability.
> >     - As Kelvin Yang shared earlier in another thread, the database may
> be
> > another bottleneck when the load is high. But this is not considered here
> > yet.
> >
> >
> >     Kindly share your thoughts on this naive idea. Thanks.
> >
> >
> >
> >     Best regards,
> >     XD
> >
> >
> >
> >
> >
> >
> >
>

Re: A Naive Multi-Scheduler Architecture Experiment of Airflow

Posted by "Daniel (Daniel Lamblin) [BDP - Seoul]" <la...@coupang.com>.
The missing points you brought up, yes that was one of the reasons it seemed like getting zookeeper or a DB coordinated procedure involved to both count and number the schedulers and mark one of them the lead. Locking each dag file for processing sounds easier, but we were seeing update transactions fail already without adding more pressure on the DB. Locking is another thing zk can handle. But adding zk seems like such deployment overhead that scheduler type like executor type needs to become a modular option in the process of the change.

The ignore pattern method that was in use described earlier was basically adding an entry to a top level .airflowignore file via a flag or env instead of making the file. Now that was simple, with all the drawbacks mentioned already.

Get Outlook for Android<https://aka.ms/ghei36>

________________________________
From: Maxime Beauchemin <ma...@gmail.com>
Sent: Friday, November 9, 2018 5:03:02 PM
To: dev@airflow.incubator.apache.org
Cc: dev@airflow.apache.org; yrqls21@gmail.com
Subject: Re: A Naive Multi-Scheduler Architecture Experiment of Airflow

[CAUTION]: This email originated from outside of the organization. Do not click links or open attachments unless you recognize the sender and know the content is safe.
[주의]: 본 이메일은 회사 외부에서 유입된 이메일입니다. 발신자의 신원과 이메일 내용이 안전한지 확인하기 전까지는 이메일에 포함된 링크를 클릭하거나 첨부파일을 열지 마십시오.


I mean at that point it's just as easy (or easier) to do things properly:
get the scheduler subprocesses to take a lock on the DAG it's about to
process, and release it when it's done. Add a lock timestamp and bit of
logic to expire locks (to self heal if the process ever crashed and failed
at releasing the lock). Of course make sure that the
confirm-its-not-locked-and-take-a-lock process is insulated in a database
transaction, and your'e mostly good. That sounds like a very easy thing to
do.

The only thing that's missing at that point to fully support
multi-schedulers is to centralize the logic that does the prioritization
and pushing to workers. That's a bit more complicated, it assumes a leader
(and leader election), and to change the logic of how individual
"DAG-evaluator processes" communicate what task instances are runnable to
that leader (over a message queue? over the database?).

Max

On Thu, Nov 8, 2018 at 10:02 AM Daniel (Daniel Lamblin) [BDP - Seoul] <
lamblin@coupang.com> wrote:

> Since you're discussing multi-scheduler trials,
> Based on v1.8 we have also tried something, based on passing in a regex to
> each scheduler; DAG file paths which match it are ignored. This required
> turning off some logic that deletes dag data for dags that are missing from
> the dagbag.
> It is pretty manual and not evenly distributed, but it allows some 5000+
> DAGs or so with 6 scheduler instances. That said there's some pain around
> maintaining such a setup, so we didn't opt for it (yet) in our v1.10 setup.
> The lack of cleaning up an old dag name is also not great (it can be done
> semi manually). Then there's the work in trying to redefine patterns for
> better mixes, testing that patterns don't all ignore the same file, nor
> that more than one scheduler includes the same file. I generally wouldn't
> suggest this approach.
>
> In considering to setup a similar modification to v1.10, we thought it
> would make sense to instead tell each scheduler which scheduler number it
> is, and how many total schedulers there are. Then each scheduler can use
> some hash (cityhash?) on the whole py file path, mod it by the scheduler
> count, and only parse it if it matches its scheduler number.
>
> This seemed like a good way to keep a fixed number of schedulers balancing
> new dag files, but we didn't do it (yet) because we started to think about
> getting fancier: what if a scheduler needs to be added? Can it be done
> without stopping the others and update the total count; or vice-versa for
> removing a scheduler. If one scheduler drops out can the others renumber
> themselves? If that could be solved, then the schedulers could be made into
> an autoscaling group… For this we thought about wrapping the whole
> scheduler instance's process up in some watchdog that might coordinate with
> something like zookeeper (or by using the existing airflow DB) but it got
> to be full of potential loopholes for the schedulers, like needing to be in
> sync about refilling the dagbag in concert with each other when there's a
> change in the total count, and problems when one drops off but is actually
> not really down for the count and pops back in having missed that the
> others decided changed their numbering, etc.
>
> I bring this up because the basic form of the ideas doesn't hinge on which
> folder a dag is in, which seems more likely to work nicely with team based
> hierarchies which also import reusable modules across DAG files.
> -Daniel
> P.S. yeah we did find there were times when schedulers exited because
> there was a db lock on task instances they were trying to update. So the DB
> needs to be managed by someone who knows how to scale it for that… or
> possibly the model needs to be made more conducive to minimally locking
> updates.
>
> On 10/31/18, 11:38 PM, "Deng Xiaodong" <xd...@gmail.com> wrote:
>
>     Hi Folks,
>
>     Previously I initiated a discussion about the best practice of Airflow
> setting-up, and it was agreed by a few folks that scheduler may become one
> of the bottleneck component (we can only run one scheduler instance, can
> only scale vertically rather than horizontally, etc.). Especially when we
> have thousands of DAGs, the scheduling latency may be high.
>
>     In our team, we have experimented a naive multiple-scheduler
> architecture. Would like to share here, and also seek inputs from you.
>
>     *1. Background*
>     - Inside DAG_Folder, we can have sub-folders.
>     - When we initiate scheduler instance, we can specify “--subdir” for
> it, which will specify the specific directory that the scheduler is going
> to “scan” (https://airflow.apache.org/cli.html#scheduler).
>
>     *2. Our Naive Idea*
>     Say we have 2,000 DAGs. If we run one single scheduler instance, one
> scheduling loop will traverse all 2K DAGs.
>
>     Our idea is:
>     Step-1: Create multiple sub-directories, say five, under DAG_Folder
> (subdir1, subdir2, …, subdir5)
>     Step-2: Distribute the DAGs evenly into these sub-directories (400
> DAGs in each)
>     Step-3: then we can start scheduler instance on 5 different machines,
> using command `airflow scheduler --subdir subdir<i>` on machine <i>.
>
>     Hence eventually, each scheduler only needs to take care of 400 DAGs.
>
>     *3. Test & Results*
>     - We have done a testing using 2,000 DAGs (3 tasks in each DAG).
>     - DAGs are stored using network attached storage (the same drive
> mounted to all nodes), so we don’t concern about the DAG_Folder
> synchronization.
>     - No conflict observed (each DAG file will only be parsed & scheduled
> by one scheduler instance).
>     - The scheduling speed improves almost linearly. Demonstrated that we
> can scale scheduler horizontally.
>
>     *4. Highlight*
>     - This naive idea doesn’t address scheduler availability.
>     - As Kelvin Yang shared earlier in another thread, the database may be
> another bottleneck when the load is high. But this is not considered here
> yet.
>
>
>     Kindly share your thoughts on this naive idea. Thanks.
>
>
>
>     Best regards,
>     XD
>
>
>
>
>
>
>

Re: A Naive Multi-Scheduler Architecture Experiment of Airflow

Posted by Maxime Beauchemin <ma...@gmail.com>.
I mean at that point it's just as easy (or easier) to do things properly:
get the scheduler subprocesses to take a lock on the DAG it's about to
process, and release it when it's done. Add a lock timestamp and bit of
logic to expire locks (to self heal if the process ever crashed and failed
at releasing the lock). Of course make sure that the
confirm-its-not-locked-and-take-a-lock process is insulated in a database
transaction, and your'e mostly good. That sounds like a very easy thing to
do.

The only thing that's missing at that point to fully support
multi-schedulers is to centralize the logic that does the prioritization
and pushing to workers. That's a bit more complicated, it assumes a leader
(and leader election), and to change the logic of how individual
"DAG-evaluator processes" communicate what task instances are runnable to
that leader (over a message queue? over the database?).

Max

On Thu, Nov 8, 2018 at 10:02 AM Daniel (Daniel Lamblin) [BDP - Seoul] <
lamblin@coupang.com> wrote:

> Since you're discussing multi-scheduler trials,
> Based on v1.8 we have also tried something, based on passing in a regex to
> each scheduler; DAG file paths which match it are ignored. This required
> turning off some logic that deletes dag data for dags that are missing from
> the dagbag.
> It is pretty manual and not evenly distributed, but it allows some 5000+
> DAGs or so with 6 scheduler instances. That said there's some pain around
> maintaining such a setup, so we didn't opt for it (yet) in our v1.10 setup.
> The lack of cleaning up an old dag name is also not great (it can be done
> semi manually). Then there's the work in trying to redefine patterns for
> better mixes, testing that patterns don't all ignore the same file, nor
> that more than one scheduler includes the same file. I generally wouldn't
> suggest this approach.
>
> In considering to setup a similar modification to v1.10, we thought it
> would make sense to instead tell each scheduler which scheduler number it
> is, and how many total schedulers there are. Then each scheduler can use
> some hash (cityhash?) on the whole py file path, mod it by the scheduler
> count, and only parse it if it matches its scheduler number.
>
> This seemed like a good way to keep a fixed number of schedulers balancing
> new dag files, but we didn't do it (yet) because we started to think about
> getting fancier: what if a scheduler needs to be added? Can it be done
> without stopping the others and update the total count; or vice-versa for
> removing a scheduler. If one scheduler drops out can the others renumber
> themselves? If that could be solved, then the schedulers could be made into
> an autoscaling group… For this we thought about wrapping the whole
> scheduler instance's process up in some watchdog that might coordinate with
> something like zookeeper (or by using the existing airflow DB) but it got
> to be full of potential loopholes for the schedulers, like needing to be in
> sync about refilling the dagbag in concert with each other when there's a
> change in the total count, and problems when one drops off but is actually
> not really down for the count and pops back in having missed that the
> others decided changed their numbering, etc.
>
> I bring this up because the basic form of the ideas doesn't hinge on which
> folder a dag is in, which seems more likely to work nicely with team based
> hierarchies which also import reusable modules across DAG files.
> -Daniel
> P.S. yeah we did find there were times when schedulers exited because
> there was a db lock on task instances they were trying to update. So the DB
> needs to be managed by someone who knows how to scale it for that… or
> possibly the model needs to be made more conducive to minimally locking
> updates.
>
> On 10/31/18, 11:38 PM, "Deng Xiaodong" <xd...@gmail.com> wrote:
>
>     Hi Folks,
>
>     Previously I initiated a discussion about the best practice of Airflow
> setting-up, and it was agreed by a few folks that scheduler may become one
> of the bottleneck component (we can only run one scheduler instance, can
> only scale vertically rather than horizontally, etc.). Especially when we
> have thousands of DAGs, the scheduling latency may be high.
>
>     In our team, we have experimented a naive multiple-scheduler
> architecture. Would like to share here, and also seek inputs from you.
>
>     *1. Background*
>     - Inside DAG_Folder, we can have sub-folders.
>     - When we initiate scheduler instance, we can specify “--subdir” for
> it, which will specify the specific directory that the scheduler is going
> to “scan” (https://airflow.apache.org/cli.html#scheduler).
>
>     *2. Our Naive Idea*
>     Say we have 2,000 DAGs. If we run one single scheduler instance, one
> scheduling loop will traverse all 2K DAGs.
>
>     Our idea is:
>     Step-1: Create multiple sub-directories, say five, under DAG_Folder
> (subdir1, subdir2, …, subdir5)
>     Step-2: Distribute the DAGs evenly into these sub-directories (400
> DAGs in each)
>     Step-3: then we can start scheduler instance on 5 different machines,
> using command `airflow scheduler --subdir subdir<i>` on machine <i>.
>
>     Hence eventually, each scheduler only needs to take care of 400 DAGs.
>
>     *3. Test & Results*
>     - We have done a testing using 2,000 DAGs (3 tasks in each DAG).
>     - DAGs are stored using network attached storage (the same drive
> mounted to all nodes), so we don’t concern about the DAG_Folder
> synchronization.
>     - No conflict observed (each DAG file will only be parsed & scheduled
> by one scheduler instance).
>     - The scheduling speed improves almost linearly. Demonstrated that we
> can scale scheduler horizontally.
>
>     *4. Highlight*
>     - This naive idea doesn’t address scheduler availability.
>     - As Kelvin Yang shared earlier in another thread, the database may be
> another bottleneck when the load is high. But this is not considered here
> yet.
>
>
>     Kindly share your thoughts on this naive idea. Thanks.
>
>
>
>     Best regards,
>     XD
>
>
>
>
>
>
>

Re: A Naive Multi-Scheduler Architecture Experiment of Airflow

Posted by "Daniel (Daniel Lamblin) [BDP - Seoul]" <la...@coupang.com>.
Since you're discussing multi-scheduler trials,
Based on v1.8 we have also tried something, based on passing in a regex to each scheduler; DAG file paths which match it are ignored. This required turning off some logic that deletes dag data for dags that are missing from the dagbag.
It is pretty manual and not evenly distributed, but it allows some 5000+ DAGs or so with 6 scheduler instances. That said there's some pain around maintaining such a setup, so we didn't opt for it (yet) in our v1.10 setup.
The lack of cleaning up an old dag name is also not great (it can be done semi manually). Then there's the work in trying to redefine patterns for better mixes, testing that patterns don't all ignore the same file, nor that more than one scheduler includes the same file. I generally wouldn't suggest this approach.

In considering to setup a similar modification to v1.10, we thought it would make sense to instead tell each scheduler which scheduler number it is, and how many total schedulers there are. Then each scheduler can use some hash (cityhash?) on the whole py file path, mod it by the scheduler count, and only parse it if it matches its scheduler number.

This seemed like a good way to keep a fixed number of schedulers balancing new dag files, but we didn't do it (yet) because we started to think about getting fancier: what if a scheduler needs to be added? Can it be done without stopping the others and update the total count; or vice-versa for removing a scheduler. If one scheduler drops out can the others renumber themselves? If that could be solved, then the schedulers could be made into an autoscaling group… For this we thought about wrapping the whole scheduler instance's process up in some watchdog that might coordinate with something like zookeeper (or by using the existing airflow DB) but it got to be full of potential loopholes for the schedulers, like needing to be in sync about refilling the dagbag in concert with each other when there's a change in the total count, and problems when one drops off but is actually not really down for the count and pops back in having missed that the others decided changed their numbering, etc.

I bring this up because the basic form of the ideas doesn't hinge on which folder a dag is in, which seems more likely to work nicely with team based hierarchies which also import reusable modules across DAG files.
-Daniel
P.S. yeah we did find there were times when schedulers exited because there was a db lock on task instances they were trying to update. So the DB needs to be managed by someone who knows how to scale it for that… or possibly the model needs to be made more conducive to minimally locking updates.

On 10/31/18, 11:38 PM, "Deng Xiaodong" <xd...@gmail.com> wrote:

    Hi Folks,
    
    Previously I initiated a discussion about the best practice of Airflow setting-up, and it was agreed by a few folks that scheduler may become one of the bottleneck component (we can only run one scheduler instance, can only scale vertically rather than horizontally, etc.). Especially when we have thousands of DAGs, the scheduling latency may be high.
    
    In our team, we have experimented a naive multiple-scheduler architecture. Would like to share here, and also seek inputs from you.
    
    *1. Background*
    - Inside DAG_Folder, we can have sub-folders.
    - When we initiate scheduler instance, we can specify “--subdir” for it, which will specify the specific directory that the scheduler is going to “scan” (https://airflow.apache.org/cli.html#scheduler).
    
    *2. Our Naive Idea*
    Say we have 2,000 DAGs. If we run one single scheduler instance, one scheduling loop will traverse all 2K DAGs.
    
    Our idea is:
    Step-1: Create multiple sub-directories, say five, under DAG_Folder (subdir1, subdir2, …, subdir5)
    Step-2: Distribute the DAGs evenly into these sub-directories (400 DAGs in each)
    Step-3: then we can start scheduler instance on 5 different machines, using command `airflow scheduler --subdir subdir<i>` on machine <i>.
    
    Hence eventually, each scheduler only needs to take care of 400 DAGs.
    
    *3. Test & Results*
    - We have done a testing using 2,000 DAGs (3 tasks in each DAG).
    - DAGs are stored using network attached storage (the same drive mounted to all nodes), so we don’t concern about the DAG_Folder synchronization.
    - No conflict observed (each DAG file will only be parsed & scheduled by one scheduler instance).
    - The scheduling speed improves almost linearly. Demonstrated that we can scale scheduler horizontally.
    
    *4. Highlight*
    - This naive idea doesn’t address scheduler availability.
    - As Kelvin Yang shared earlier in another thread, the database may be another bottleneck when the load is high. But this is not considered here yet.
    
    
    Kindly share your thoughts on this naive idea. Thanks.
    
    
    
    Best regards,
    XD