You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Deng Xiaodong <xd...@gmail.com> on 2018/11/01 13:40:52 UTC

Re: A Naive Multi-Scheduler Architecture Experiment of Airflow

Thanks Kelvin and Max for your inputs!

To Kelvin’s questions:
1. “Shard by # of files may not yield same load”: fully agree with you. This concern was also raised by other co-workers in my team. But given this is a preliminary trial, we didn’t consider this yet.
2. We haven’t started to look into how we can dynamically allocate scheduler resource yet. But I think this preliminary trial would be a good starting point. 
3. DB: look forward to your PR on this!
4. “Why do you need to shard the scheduler while the scheduler can scale up pretty high”
There are a few reasons:
4.1 we have strict SLA on scheduling. We expect one scheduling loop takes < 3 minutes no matter how many DAGs we have
4.2 we’re containerising the deployment, while our infrastructure team added the restriction that for each pod we can only use up to 2 cores (blocked us from scaling vertically).
4.3 even though this naive architecture doesn’t provide HA, actually it partially addresses the availability concern (if one scheduler out of 5 fails, at least 80% DAGs can still be scheduled properly).

To Max’s questions:
1. I haven’t tested pools or queues features with this architecture. So can’t give a very firm answer on this.
2. In the load tests I have done, I haven’t observed such “misfires” yet (I’m running a customised version based on 1.10.0 BTW)
3. This is a very valid point. I haven’t checked the implementation of DAG prioritisation in detail yet. For the scenario in our team, we don’t prioritise DAGs, so we didn’t take this into consideration. On the other hand, this naive architecture didn’t change anything in Airflow. It simply makes use of the “--subdir” argument of scheduler command. If we want to have a more serious multi-scheduler setting-up natively supported by Airflow, I believe for sure we need to make significant changes to the code to ensure all features, like cross DAG prioritisation, are supported.


Kindly let me know your thoughts. Thanks!

XD
 

> On 1 Nov 2018, at 4:25 AM, Maxime Beauchemin <ma...@gmail.com> wrote:
> 
> A few related thoughts:
> * there may be hiccups around concurrency (pools, queues), though the worker should double-checks that the constraints are still met when firing the task, so in theory this should be ok
> * there may be more "misfires" meaning the task gets sent to the worker, but by the time it starts the conditions aren't met anymore because of a race condition with one of the other schedulers. Here I'm assuming recent versions of Airflow will simply eventually re-fire the misfires and heal
> * cross DAG prioritization can't really take place anymore as there's not a shared "ready-to-run" list of task instances that can be sorted by priority_weight. Whichever scheduler instance fires first is likely to get the open slots first.
> 
> Max
> 
> 
> On Wed, Oct 31, 2018 at 1:00 PM Kevin Yang <yrqls21@gmail.com <ma...@gmail.com>> wrote:
> Finally we start to talk about this seriously? Yeah! :D
> 
> For your approach, a few thoughts:
> 
>    1. Shard by # of files may not yield same load--even very different load
>    since we may have some framework DAG file producing 500 DAG and take
>    forever to parse.
>    2. I think Alex Guziel <https://github.com/saguziel <https://github.com/saguziel>> had previously
>    talked about using apache helix to shard the scheduler. I haven't look a
>    lot into it but may be something you're interested in. I personally like
>    that idea because we don't need to reinvent the wheel about a lot stuff(
>    less code to maintain also ;) ).
>    3. About the DB part, I should be contributing back some changes that
>    can dramatically drop the DB CPU usage. Afterwards I think we should have
>    plenty of headroom( assuming the traffic is ~4000 DAG files and ~40k
>    concurrency running task instances) so we should probly be fine here.
> 
> Also I'm kinda curious about your setup and want to understand why do you
> need to shard the scheduler, since the scheduler can now scale up pretty
> high actually.
> 
> Thank you for initiate the discussion, I think it can turn out to be a very
> valuable and critical discussion--many people have been thinking/discussing
> about this and I can't wait to hear the ideas :D
> 
> Cheers,
> Kevin Y
> 
> On Wed, Oct 31, 2018 at 7:38 AM Deng Xiaodong <xd.deng.r@gmail.com <ma...@gmail.com>> wrote:
> 
> > Hi Folks,
> >
> > Previously I initiated a discussion about the best practice of Airflow
> > setting-up, and it was agreed by a few folks that scheduler may become one
> > of the bottleneck component (we can only run one scheduler instance, can
> > only scale vertically rather than horizontally, etc.). Especially when we
> > have thousands of DAGs, the scheduling latency may be high.
> >
> > In our team, we have experimented a naive multiple-scheduler architecture.
> > Would like to share here, and also seek inputs from you.
> >
> > **1. Background**
> > - Inside DAG_Folder, we can have sub-folders.
> > - When we initiate scheduler instance, we can specify “--subdir” for it,
> > which will specify the specific directory that the scheduler is going to
> > “scan” (https://airflow.apache.org/cli.html#scheduler <https://airflow.apache.org/cli.html#scheduler>).
> >
> > **2. Our Naive Idea**
> > Say we have 2,000 DAGs. If we run one single scheduler instance, one
> > scheduling loop will traverse all 2K DAGs.
> >
> > Our idea is:
> > Step-1: Create multiple sub-directories, say five, under DAG_Folder
> > (subdir1, subdir2, …, subdir5)
> > Step-2: Distribute the DAGs evenly into these sub-directories (400 DAGs in
> > each)
> > Step-3: then we can start scheduler instance on 5 different machines,
> > using command `airflow scheduler --subdir subdir<i>` on machine <i>.
> >
> > Hence eventually, each scheduler only needs to take care of 400 DAGs.
> >
> > **3. Test & Results**
> > - We have done a testing using 2,000 DAGs (3 tasks in each DAG).
> > - DAGs are stored using network attached storage (the same drive mounted
> > to all nodes), so we don’t concern about the DAG_Folder synchronization.
> > - No conflict observed (each DAG file will only be parsed & scheduled by
> > one scheduler instance).
> > - The scheduling speed improves almost linearly. Demonstrated that we can
> > scale scheduler horizontally.
> >
> > **4. Highlight**
> > - This naive idea doesn’t address scheduler availability.
> > - As Kelvin Yang shared earlier in another thread, the database may be
> > another bottleneck when the load is high. But this is not considered here
> > yet.
> >
> >
> > Kindly share your thoughts on this naive idea. Thanks.
> >
> >
> >
> > Best regards,
> > XD
> >
> >
> >
> >
> >


Re: A Naive Multi-Scheduler Architecture Experiment of Airflow

Posted by Deng Xiaodong <xd...@gmail.com>.
Thanks Devjyoti for your reply.

To elaborate based on your inputs: 

- *When to add one more shard*:
We have designed some metrics, like "how long the scheduler instance takes to parse & schedule all DAGs (in the subdir it’s taking care of)". When the metric is higher than a given threshold for long enough time, we may want to add one more shard. 

- *Easy Solution to Balance Shard Load*:
Exactly the same as you’re pointing out, we create initial set of shards by randomly distribute our DAGs into each subdir. Similar to building a mathematical model, there are some assumptions we have to make for convenience, like “complexity of DAGs are roughly equal”.
As for new DAGs: we developed an application creating DAGs based on metadata, and the application would check the # of files in each subdir and always put the new DAG into the subdir with the least # of DAGs.


XD

> On 2 Nov 2018, at 12:47 AM, Devjyoti Patra <de...@qubole.com> wrote:
> 
>>> 1. “Shard by # of files may not yield same load”: fully agree with you.
> This concern was also raised by other co-workers in my team. But given this
> is a preliminary trial, we didn’t consider this yet.
> 
> One issue here is that when do you decide to add one more shard? I think if
> you monitor the time it takes to parse each source file and log it; you can
> use this to find the outliers when your scheduling SLA is breached and move
> the outliers to a new shard. Creating the initial set of shard by randomly
> putting an equal number of files in each subdir seems like the easiest way
> to approach this problem.
> 
> On Thu, Nov 1, 2018 at 7:11 PM Deng Xiaodong <xd...@gmail.com> wrote:
> 
>> Thanks Kelvin and Max for your inputs!
>> 
>> To Kelvin’s questions:
>> 1. “Shard by # of files may not yield same load”: fully agree with you.
>> This concern was also raised by other co-workers in my team. But given this
>> is a preliminary trial, we didn’t consider this yet.
>> 2. We haven’t started to look into how we can dynamically allocate
>> scheduler resource yet. But I think this preliminary trial would be a good
>> starting point.
>> 3. DB: look forward to your PR on this!
>> 4. “Why do you need to shard the scheduler while the scheduler can scale
>> up pretty high”
>> There are a few reasons:
>> 4.1 we have strict SLA on scheduling. We expect one scheduling loop takes
>> < 3 minutes no matter how many DAGs we have
>> 4.2 we’re containerising the deployment, while our infrastructure team
>> added the restriction that for each pod we can only use up to 2 cores
>> (blocked us from scaling vertically).
>> 4.3 even though this naive architecture doesn’t provide HA, actually it
>> partially addresses the availability concern (if one scheduler out of 5
>> fails, at least 80% DAGs can still be scheduled properly).
>> 
>> To Max’s questions:
>> 1. I haven’t tested pools or queues features with this architecture. So
>> can’t give a very firm answer on this.
>> 2. In the load tests I have done, I haven’t observed such “misfires” yet
>> (I’m running a customised version based on 1.10.0 BTW)
>> 3. This is a very valid point. I haven’t checked the implementation of DAG
>> prioritisation in detail yet. For the scenario in our team, we don’t
>> prioritise DAGs, so we didn’t take this into consideration. On the other
>> hand, this naive architecture didn’t change anything in Airflow. It simply
>> makes use of the “--subdir” argument of scheduler command. If we want to
>> have a more serious multi-scheduler setting-up natively supported by
>> Airflow, I believe for sure we need to make significant changes to the code
>> to ensure all features, like cross DAG prioritisation, are supported.
>> 
>> 
>> Kindly let me know your thoughts. Thanks!
>> 
>> XD
>> 
>> 
>>> On 1 Nov 2018, at 4:25 AM, Maxime Beauchemin <ma...@gmail.com>
>> wrote:
>>> 
>>> A few related thoughts:
>>> * there may be hiccups around concurrency (pools, queues), though the
>> worker should double-checks that the constraints are still met when firing
>> the task, so in theory this should be ok
>>> * there may be more "misfires" meaning the task gets sent to the worker,
>> but by the time it starts the conditions aren't met anymore because of a
>> race condition with one of the other schedulers. Here I'm assuming recent
>> versions of Airflow will simply eventually re-fire the misfires and heal
>>> * cross DAG prioritization can't really take place anymore as there's
>> not a shared "ready-to-run" list of task instances that can be sorted by
>> priority_weight. Whichever scheduler instance fires first is likely to get
>> the open slots first.
>>> 
>>> Max
>>> 
>>> 
>>> On Wed, Oct 31, 2018 at 1:00 PM Kevin Yang <yrqls21@gmail.com <mailto:
>> yrqls21@gmail.com>> wrote:
>>> Finally we start to talk about this seriously? Yeah! :D
>>> 
>>> For your approach, a few thoughts:
>>> 
>>>   1. Shard by # of files may not yield same load--even very different
>> load
>>>   since we may have some framework DAG file producing 500 DAG and take
>>>   forever to parse.
>>>   2. I think Alex Guziel <https://github.com/saguziel <
>> https://github.com/saguziel>> had previously
>>>   talked about using apache helix to shard the scheduler. I haven't
>> look a
>>>   lot into it but may be something you're interested in. I personally
>> like
>>>   that idea because we don't need to reinvent the wheel about a lot
>> stuff(
>>>   less code to maintain also ;) ).
>>>   3. About the DB part, I should be contributing back some changes that
>>>   can dramatically drop the DB CPU usage. Afterwards I think we should
>> have
>>>   plenty of headroom( assuming the traffic is ~4000 DAG files and ~40k
>>>   concurrency running task instances) so we should probly be fine here.
>>> 
>>> Also I'm kinda curious about your setup and want to understand why do you
>>> need to shard the scheduler, since the scheduler can now scale up pretty
>>> high actually.
>>> 
>>> Thank you for initiate the discussion, I think it can turn out to be a
>> very
>>> valuable and critical discussion--many people have been
>> thinking/discussing
>>> about this and I can't wait to hear the ideas :D
>>> 
>>> Cheers,
>>> Kevin Y
>>> 
>>> On Wed, Oct 31, 2018 at 7:38 AM Deng Xiaodong <xd.deng.r@gmail.com
>> <ma...@gmail.com>> wrote:
>>> 
>>>> Hi Folks,
>>>> 
>>>> Previously I initiated a discussion about the best practice of Airflow
>>>> setting-up, and it was agreed by a few folks that scheduler may become
>> one
>>>> of the bottleneck component (we can only run one scheduler instance,
>> can
>>>> only scale vertically rather than horizontally, etc.). Especially when
>> we
>>>> have thousands of DAGs, the scheduling latency may be high.
>>>> 
>>>> In our team, we have experimented a naive multiple-scheduler
>> architecture.
>>>> Would like to share here, and also seek inputs from you.
>>>> 
>>>> **1. Background**
>>>> - Inside DAG_Folder, we can have sub-folders.
>>>> - When we initiate scheduler instance, we can specify “--subdir” for
>> it,
>>>> which will specify the specific directory that the scheduler is going
>> to
>>>> “scan” (https://airflow.apache.org/cli.html#scheduler <
>> https://airflow.apache.org/cli.html#scheduler>).
>>>> 
>>>> **2. Our Naive Idea**
>>>> Say we have 2,000 DAGs. If we run one single scheduler instance, one
>>>> scheduling loop will traverse all 2K DAGs.
>>>> 
>>>> Our idea is:
>>>> Step-1: Create multiple sub-directories, say five, under DAG_Folder
>>>> (subdir1, subdir2, …, subdir5)
>>>> Step-2: Distribute the DAGs evenly into these sub-directories (400
>> DAGs in
>>>> each)
>>>> Step-3: then we can start scheduler instance on 5 different machines,
>>>> using command `airflow scheduler --subdir subdir<i>` on machine <i>.
>>>> 
>>>> Hence eventually, each scheduler only needs to take care of 400 DAGs.
>>>> 
>>>> **3. Test & Results**
>>>> - We have done a testing using 2,000 DAGs (3 tasks in each DAG).
>>>> - DAGs are stored using network attached storage (the same drive
>> mounted
>>>> to all nodes), so we don’t concern about the DAG_Folder
>> synchronization.
>>>> - No conflict observed (each DAG file will only be parsed & scheduled
>> by
>>>> one scheduler instance).
>>>> - The scheduling speed improves almost linearly. Demonstrated that we
>> can
>>>> scale scheduler horizontally.
>>>> 
>>>> **4. Highlight**
>>>> - This naive idea doesn’t address scheduler availability.
>>>> - As Kelvin Yang shared earlier in another thread, the database may be
>>>> another bottleneck when the load is high. But this is not considered
>> here
>>>> yet.
>>>> 
>>>> 
>>>> Kindly share your thoughts on this naive idea. Thanks.
>>>> 
>>>> 
>>>> 
>>>> Best regards,
>>>> XD
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>> 
>> 


Re: A Naive Multi-Scheduler Architecture Experiment of Airflow

Posted by Devjyoti Patra <de...@qubole.com>.
>> 1. “Shard by # of files may not yield same load”: fully agree with you.
This concern was also raised by other co-workers in my team. But given this
is a preliminary trial, we didn’t consider this yet.

One issue here is that when do you decide to add one more shard? I think if
you monitor the time it takes to parse each source file and log it; you can
use this to find the outliers when your scheduling SLA is breached and move
the outliers to a new shard. Creating the initial set of shard by randomly
putting an equal number of files in each subdir seems like the easiest way
to approach this problem.

On Thu, Nov 1, 2018 at 7:11 PM Deng Xiaodong <xd...@gmail.com> wrote:

> Thanks Kelvin and Max for your inputs!
>
> To Kelvin’s questions:
> 1. “Shard by # of files may not yield same load”: fully agree with you.
> This concern was also raised by other co-workers in my team. But given this
> is a preliminary trial, we didn’t consider this yet.
> 2. We haven’t started to look into how we can dynamically allocate
> scheduler resource yet. But I think this preliminary trial would be a good
> starting point.
> 3. DB: look forward to your PR on this!
> 4. “Why do you need to shard the scheduler while the scheduler can scale
> up pretty high”
> There are a few reasons:
> 4.1 we have strict SLA on scheduling. We expect one scheduling loop takes
> < 3 minutes no matter how many DAGs we have
> 4.2 we’re containerising the deployment, while our infrastructure team
> added the restriction that for each pod we can only use up to 2 cores
> (blocked us from scaling vertically).
> 4.3 even though this naive architecture doesn’t provide HA, actually it
> partially addresses the availability concern (if one scheduler out of 5
> fails, at least 80% DAGs can still be scheduled properly).
>
> To Max’s questions:
> 1. I haven’t tested pools or queues features with this architecture. So
> can’t give a very firm answer on this.
> 2. In the load tests I have done, I haven’t observed such “misfires” yet
> (I’m running a customised version based on 1.10.0 BTW)
> 3. This is a very valid point. I haven’t checked the implementation of DAG
> prioritisation in detail yet. For the scenario in our team, we don’t
> prioritise DAGs, so we didn’t take this into consideration. On the other
> hand, this naive architecture didn’t change anything in Airflow. It simply
> makes use of the “--subdir” argument of scheduler command. If we want to
> have a more serious multi-scheduler setting-up natively supported by
> Airflow, I believe for sure we need to make significant changes to the code
> to ensure all features, like cross DAG prioritisation, are supported.
>
>
> Kindly let me know your thoughts. Thanks!
>
> XD
>
>
> > On 1 Nov 2018, at 4:25 AM, Maxime Beauchemin <ma...@gmail.com>
> wrote:
> >
> > A few related thoughts:
> > * there may be hiccups around concurrency (pools, queues), though the
> worker should double-checks that the constraints are still met when firing
> the task, so in theory this should be ok
> > * there may be more "misfires" meaning the task gets sent to the worker,
> but by the time it starts the conditions aren't met anymore because of a
> race condition with one of the other schedulers. Here I'm assuming recent
> versions of Airflow will simply eventually re-fire the misfires and heal
> > * cross DAG prioritization can't really take place anymore as there's
> not a shared "ready-to-run" list of task instances that can be sorted by
> priority_weight. Whichever scheduler instance fires first is likely to get
> the open slots first.
> >
> > Max
> >
> >
> > On Wed, Oct 31, 2018 at 1:00 PM Kevin Yang <yrqls21@gmail.com <mailto:
> yrqls21@gmail.com>> wrote:
> > Finally we start to talk about this seriously? Yeah! :D
> >
> > For your approach, a few thoughts:
> >
> >    1. Shard by # of files may not yield same load--even very different
> load
> >    since we may have some framework DAG file producing 500 DAG and take
> >    forever to parse.
> >    2. I think Alex Guziel <https://github.com/saguziel <
> https://github.com/saguziel>> had previously
> >    talked about using apache helix to shard the scheduler. I haven't
> look a
> >    lot into it but may be something you're interested in. I personally
> like
> >    that idea because we don't need to reinvent the wheel about a lot
> stuff(
> >    less code to maintain also ;) ).
> >    3. About the DB part, I should be contributing back some changes that
> >    can dramatically drop the DB CPU usage. Afterwards I think we should
> have
> >    plenty of headroom( assuming the traffic is ~4000 DAG files and ~40k
> >    concurrency running task instances) so we should probly be fine here.
> >
> > Also I'm kinda curious about your setup and want to understand why do you
> > need to shard the scheduler, since the scheduler can now scale up pretty
> > high actually.
> >
> > Thank you for initiate the discussion, I think it can turn out to be a
> very
> > valuable and critical discussion--many people have been
> thinking/discussing
> > about this and I can't wait to hear the ideas :D
> >
> > Cheers,
> > Kevin Y
> >
> > On Wed, Oct 31, 2018 at 7:38 AM Deng Xiaodong <xd.deng.r@gmail.com
> <ma...@gmail.com>> wrote:
> >
> > > Hi Folks,
> > >
> > > Previously I initiated a discussion about the best practice of Airflow
> > > setting-up, and it was agreed by a few folks that scheduler may become
> one
> > > of the bottleneck component (we can only run one scheduler instance,
> can
> > > only scale vertically rather than horizontally, etc.). Especially when
> we
> > > have thousands of DAGs, the scheduling latency may be high.
> > >
> > > In our team, we have experimented a naive multiple-scheduler
> architecture.
> > > Would like to share here, and also seek inputs from you.
> > >
> > > **1. Background**
> > > - Inside DAG_Folder, we can have sub-folders.
> > > - When we initiate scheduler instance, we can specify “--subdir” for
> it,
> > > which will specify the specific directory that the scheduler is going
> to
> > > “scan” (https://airflow.apache.org/cli.html#scheduler <
> https://airflow.apache.org/cli.html#scheduler>).
> > >
> > > **2. Our Naive Idea**
> > > Say we have 2,000 DAGs. If we run one single scheduler instance, one
> > > scheduling loop will traverse all 2K DAGs.
> > >
> > > Our idea is:
> > > Step-1: Create multiple sub-directories, say five, under DAG_Folder
> > > (subdir1, subdir2, …, subdir5)
> > > Step-2: Distribute the DAGs evenly into these sub-directories (400
> DAGs in
> > > each)
> > > Step-3: then we can start scheduler instance on 5 different machines,
> > > using command `airflow scheduler --subdir subdir<i>` on machine <i>.
> > >
> > > Hence eventually, each scheduler only needs to take care of 400 DAGs.
> > >
> > > **3. Test & Results**
> > > - We have done a testing using 2,000 DAGs (3 tasks in each DAG).
> > > - DAGs are stored using network attached storage (the same drive
> mounted
> > > to all nodes), so we don’t concern about the DAG_Folder
> synchronization.
> > > - No conflict observed (each DAG file will only be parsed & scheduled
> by
> > > one scheduler instance).
> > > - The scheduling speed improves almost linearly. Demonstrated that we
> can
> > > scale scheduler horizontally.
> > >
> > > **4. Highlight**
> > > - This naive idea doesn’t address scheduler availability.
> > > - As Kelvin Yang shared earlier in another thread, the database may be
> > > another bottleneck when the load is high. But this is not considered
> here
> > > yet.
> > >
> > >
> > > Kindly share your thoughts on this naive idea. Thanks.
> > >
> > >
> > >
> > > Best regards,
> > > XD
> > >
> > >
> > >
> > >
> > >
>
>