You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Deng Xiaodong <xd...@gmail.com> on 2019/03/01 15:25:10 UTC

Multiple Schedulers - "scheduler_lock"

Hi Max,

Following https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E <https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E>, I’m trying to prepare an AIP for supporting multiple-scheduler in Airflow (mainly for HA and Higher scheduling performance).

Along the process of code checking, I found that there is one attribute of DagModel, “scheduler_lock”. It’s not used at all in current implementation, but it was introduced long time back (2015) to allow multiple schedulers to work together (https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 <https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620> ).

Since you were the original author of it, it would be very helpful if you can kindly share why the multiple-schedulers implementation was removed eventually, and what challenges/complexity there were.
(You already shared a few valuable inputs in the earlier discussion https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E <https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E> , mainly relating to hiccups around concurrency, cross DAG prioritisation & load on DB. Other than these, anything else you would like to advise?)

I will also dive into the git history further to understand it better.

Thanks.


XD

Re: Multiple Schedulers - "scheduler_lock"

Posted by Kevin Yang <yr...@gmail.com>.
Sry I'm late again :)

I agree with most of Bas's idea especially if we decided to do HA maybe
don't do it ourselves( Apache Helix? haven't look deep just an idea).

Peter, I'm very curious about your setup and how come it took 2-3 hours to
process 5000+ DAG files( I assume you meant DAG files not DAGs). Since we
have multiple DAG parsing subprocesses
<https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg#L500>,
if we assume each file takes 5 seconds to parse, with 64 parsing
subprocesses every file can be touched once every ~6.5 mins( or with more
aggressive setting like our, ~3.25 min with 128 parsing subprocesses). Plus
some more overheads like subprocess management ,zombie detection, etc it
shouldn't take more than 10 mins to touch them all. In reality most DAG
file should be quite simple and takes way less than 5 seconds to parse and
drammastically reduce the time needed to cycle through every file. I
noticed you're using celery executor, after the scheduler process harvested
the parsing results, I think it can get into the celery executor pretty
fast. We had stress tests <https://github.com/apache/airflow/pull/4234> on
the enqueuing part of celery and it took less than 5 mins to start 30k+
tasks( worker set task to running - (execution_date+interval) == ~5 mins).
Just trying to provide some insigh on scheduler scaling from our side, the
stress test was not entirely comprehensive and I might be missing some
important info you had causing the scheduling to be slow, if that is the
case scatch above :P

Cheers,
Kevin Y

On Mon, Mar 18, 2019 at 9:45 AM Ash Berlin-Taylor <as...@apache.org> wrote:

> Does anything change about your proposal if you do t assume that workers
> have “quick access” to the DAG files - i.e. what if we are on kube
> executors and the task spin up time plus git sync time is 30-60s?
>
> (Perhaps this is an extreme case, but we are talking about extreme cases)
>
> > On 18 Mar 2019, at 07:58, Bas Harenslak <ba...@godatadriven.com>
> wrote:
> >
> > Peter,
> >
> > The numbers you mention seem to come out of the blue. I think you’re
> oversimplifying it and cannot simply state 180/36 = 5 minutes. Throwing in
> numbers without explanation creates confusion.
> >
> > I have some questions when reading your AIP. I have to make lots of
> assumptions and think explaining it more in depth would clarify a lot.
> >
> >
> >  *   Should a DagScheduler and task run in the same slots? Should there
> be any difference between running the two?
> >  *   How does this work together with e.g. Celery? With the current
> Celery setup you push tasks to the queue, which can be run on any worker.
> With your setup it seems you push a DagScheduler and/or task. Does the
> DagScheduler push tasks itself to the queue so that any task can run
> anywhere? And is it correct to assume the DagScheduler polls the DB to
> check if a task is finished?
> >  *   “If a cycle is done the MainScheduler should schedule a new
> DagScheduler" -> I assume the dagscheduler would set state of a dagrun to
> finished? And the mainscheduler simply checks when the next interval is
> finished and to start a new DagScheduler.
> >  *   The naming is confusing to me. How about naming the DagScheduler
> “DagRunManager”, because that’s what I believe it does?
> >  *   I’m not convinced this is the way to go. Currently the scheduler
> process does a lot more than just scheduling. I.e. also parsing of the
> DAGs, which I believe can be optimised a lot. I think splitting up the
> responsibilities of the scheduler would be more beneficial, instead of
> adding complexity by creating more “schedulers”. Would you agree?
> >
> > On a final note, to this whole thread: I’m very wary of doing
> HA/distributed work ourselves. It adds a lot of complexity, locking is a
> tricky subject (although well thought out by many others, so not
> impossible). Before going there, I would suggest to put effort into
> optimising the (single) scheduler first, e.g. by splitting responsibilities
> between DAG parsing and actual scheduling.
> >
> > Cheers,
> > Bas
> >
> > On 18 Mar 2019, at 07:18, Peter van t Hof <pjrvanthof@gmail.com<mailto:
> pjrvanthof@gmail.com>> wrote:
> >
> > Hi,
> >
> > My proposal is focusing mainly on scalability and indeed not so much on
> HA. This mainly because that is also the main issue from the original
> author. Have a form of HA on this MainScheduler would still be nice to have.
> >
> > The problem with is that have a fixed number of scheduler does not scale
> on the load. On my current client they try to execute 5000+ DAG’s at the
> same time. A single scheduler cycle to touch all DAG’s takes 2-3 hour. So
> to do this within 5 min 36 of those schedulers with locking should be there
> at all time. After 2 hours 2 schedulers would be enough, this means in this
> situation 34 scheduler processes are wasted and only producing overhead.
> >
> > This DagScheduler is a short living task, so this is not a persistent
> worker process. The MainScheduler should resubmit when it is required.
> >
> > Gr,
> > Peter
> >
> >
> > On 18 Mar 2019, at 05:32, Maxime Beauchemin <maximebeauchemin@gmail.com
> <ma...@gmail.com>> wrote:
> >
> > The proposal reads "Looking at the original AIP-15 the author proposes to
> > use locking to enable the use of multiple schedulers, this might
> introduce
> > unnecessary complexity"
> >
> > To me introducing multiple roles (master scheduler + scheduler minions),
> > may be actually more complex than just having "shared nothing" schedulers
> > with locking. The former is also less scalable (whatever workload is done
> > on that master [say serialization] can hit scale issues) and is less HA
> (as
> > it relies on the orchestrator [k8s] for HA).
> >
> > My personal incline has always been going towards renaming the scheduler
> to
> > "supervisor" (as it already does significantly more than just triggering
> > tasks) and allowing many instances of that role, and using locks where
> > necessary. That way there are just 2 roles in the cluster: supervisor and
> > worker processes. Depending on the executor (say for k8s) you don't even
> > need actual persistent worker processes.
> >
> > Max
> >
> > On Sun, Mar 17, 2019 at 1:52 AM Peter van t Hof <pjrvanthof@gmail.com
> <ma...@gmail.com>>
> > wrote:
> >
> > Hi all,
> >
> > I think that scheduling locking is maybe not the best way in solving this
> > issue. Still I’m in support of taking a good look at the scheduler
> because
> > it has some real scaling issues.
> >
> > I did wrote an alternative proposal to solve the scalability of the
> > scheduler:
> >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler
> > <
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler
> >
> >
> > Any input on this is welcome.
> >
> > Gr,
> > Peter
> >
> >
> > On 3 Mar 2019, at 03:26, Deng Xiaodong <xd...@gmail.com> wrote:
> >
> > Thanks Max.
> >
> > I have documented all the discussions around this topic & useful inputs
> > into AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling
> > Performance)
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651
> > <
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651
> >.
> >
> >
> > More inputs from folks are welcomed.
> >
> > Thanks.
> >
> >
> > XD
> >
> > On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin <
> > maximebeauchemin@gmail.com> wrote:
> >
> > Personally I'd vote against the idea of having certain scheduler
> > handling a
> > subset of the DAGs, that's just not HA.
> >
> > Also if you are in an env where you have a small number of large DAGs,
> > the
> > odds of having wasted work and double-firing get pretty high.
> >
> > With the lock in place, it's just a matter of the scheduler loop to
> > select
> > (in a db transaction) the dag that's not been processed for the longest
> > time that is not locked. Flipping the lock flag to true should be part
> > of
> > the db transaction. We probably need a btree index on lock and last
> > processed time.
> >
> > This way adding scheduler processes increases the scheduling pace, and
> > provides an HA solution. No leader / master / slave or election process,
> > just equal workers that work together.
> >
> > Max
> >
> > On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd...@gmail.com>
> > wrote:
> >
> > Get your point and agree. And the suggestion you gave lastly to random
> > sort DAGs is a great idea to address it. Thanks!
> >
> > XD
> >
> > On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <Ja...@polidea.com>
> > wrote:
> >
> > I think that the probability calculation holds only if there is no
> > correlation between different schedulers. I think however there might
> > be
> > an
> > accidental correlation if you think about typical deployments.
> >
> > Some details why I think accidental correlation is possible and even
> > likely. Assume that:
> >
> > - we have similar and similarly busy machines running schedulers
> > (likely)
> > - time is synchronised between the machines (likely)
> > - the machines have the same DAG folders mounted (or copied) and the
> > same filesystem is used (this is exactly what multiple schedulers
> > deployment is all about)
> > - the schedulers start scanning at exactly the same time (crossing
> > 0:00
> > second every full five minutes for example)  - this I am not sure but
> > I
> > imagine this might be "typical" behaviour.
> > - they process list of DAGs in exactly the same sequence (it looks
> > like
> > this is the case dag_processing
> > <
> >
> >
> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300
> >
> > and models/__init__
> > <
> >
> >
> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567
> > :
> > we use os.walk which uses os.listdir for which sequence of processing
> > depends on the filesystem implementation
> > <
> >
> > https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic
> >
> > and
> > then we append files to the list)
> >
> > Then it's rather likely that the schedulers will be competing about
> > the
> > very same DAGs at the very beginning. Locking will change how quickly
> > they
> > process each DAG of course, but If the DAGs are of similar sizes it's
> > also
> > likely that the speed of scanning (DAGS/s) is similar for all
> > schedulers.
> > The schedulers will then catch-up with each other and might pretty
> > much
> > continuously compete for the same DAGs almost all the time.
> >
> > It can be mitigated super-easily by random sorting of the DAGs folder
> > list
> > after it is prepared (it's file-system dependent now so we do not
> > rely on
> > particular order) . Then the probability numbers will hold perfectly I
> > think :)
> >
> > J.
> >
> >
> > On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd...@gmail.com>
> > wrote:
> >
> > I’m thinking of which architecture would be ideal.
> >
> >
> > # Option-1:
> > The master-slave architecture would be one option. But
> > leader-selection
> > will be very essential to consider, otherwise we have issue in terms
> > of
> > HA
> > again.
> >
> >
> > # Option-2:
> > Another option we may consider is to simply start multiple scheduler
> > instances (just using the current implementation, after modify &
> > validate
> > the scheduler_lock on DagModel).
> >
> > - In this case, given we handle everything properly using locking, we
> > don’t need to worry too much about double-scheduling/triggering.
> >
> > - Another potential concern I had earlier is that different
> > schedulers
> > may
> > compete with each other and cause “waste” of scheduler resource.
> > After further thinking, I realise this is a typical Birthday Problem.
> > Given we have m DAGs, and n schedulers, at any moment, the
> > probability
> > that all schedulers are working on different DAGs is m!/((m-n)! *
> > (m^n)),
> > and the probability that there are schedulers competing on the same
> > DAG
> > will be 1-m!/((m-n)! * (m^n)).
> >
> > Let’s say we have 200 DAGs and we start 2 schedulers. At any moment,
> > the
> > probability that there is schedulers competing on the same DAG is
> > only
> > 0.5%. If we run 2 schedulers against 300 DAGs, this probability is
> > only
> > 0.33%.
> > (This probability will be higher if m/n is low. But users should not
> > start
> > too many schedulers if they don’t have that many DAGs).
> >
> > Given the probability of schedulers competing is so low, my concern
> > on
> > scheduler resource waste is not really valid.
> >
> >
> >
> > Based on these calculations/assessment, I think we can go for
> > option-2,
> > i.e. we don’t make big change in the current implementation.
> > Instead, we
> > ensure the scheduler_lock is working well and test intensively on
> > running
> > multiple schedulers. Then we should be good to let users know that
> > it’s
> > safe to run multiple schedulers.
> >
> > Please share your thoughts on this and correct me if I’m wrong in any
> > point above. Thanks.
> >
> >
> > XD
> >
> >
> > Reference: https://en.wikipedia.org/wiki/Birthday_problem <
> > https://en.wikipedia.org/wiki/Birthday_problem>
> >
> >
> > On 2 Mar 2019, at 3:39 PM, Tao Feng <fe...@gmail.com> wrote:
> >
> > Does the proposal use master-slave architecture(leader scheduler vs
> > slave
> > scheduler)?
> >
> > On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yr...@gmail.com>
> > wrote:
> >
> > Preventing double-triggering by separating DAG files different
> > schedulers
> > parse sounds easier and more intuitive. I actually removed one of
> > the
> > double-triggering prevention logic here
> > <
> >
> >
> >
> >
> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
> > (expensive)
> > and
> > was relying on this lock
> > <
> >
> >
> >
> >
> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
> >
> > to
> > prevent double-firing and safe-guard our non-idempotent tasks( btw
> > the
> > insert can be insert overwrite to be idempotent).
> >
> > Also tho in Airbnb we requeue tasks a lot, we haven't see
> > double-firing
> > recently.
> >
> > Cheers,
> > Kevin Y
> >
> > On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
> > maximebeauchemin@gmail.com>
> > wrote:
> >
> > Forgot to mention: the intention was to use the lock, but I never
> > personally got to do the second phase which would consist of
> > skipping
> > the
> > DAG if the lock is on, and expire the lock eventually based on a
> > config
> > setting.
> >
> > Max
> >
> > On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
> > maximebeauchemin@gmail.com>
> > wrote:
> >
> > My original intention with the lock was preventing
> > "double-triggering"
> > of
> > task (triggering refers to the scheduler putting the message in
> > the
> > queue).
> > Airflow now has good "double-firing-prevention" of tasks (firing
> > happens
> > when the worker receives the message and starts the task), even
> > if
> > the
> > scheduler was to go rogue or restart and send multiple triggers
> > for
> > a
> > task
> > instance, the worker(s) should only start one task instance.
> > That's
> > done
> > by
> > running the database assertions behind the conditions being met
> > as
> > read
> > database transaction (no task can alter the rows that validate
> > the
> > assertion while it's getting asserted). In practice it's a little
> > tricky
> > and we've seen rogue double-firing in the past (I have no idea
> > how
> > often
> > that happens).
> >
> > If we do want to prevent double-triggerring, we should make sure
> > that
> > 2
> > schedulers aren't processing the same DAG or DagRun at the same
> > time.
> > That
> > would mean for the scheduler to not start the process of locked
> > DAGs,
> > and
> > by providing a mechanism to expire the locks after some time.
> >
> > Has anyone experienced double firing lately? If that exist we
> > should
> > fix
> > it, but also be careful around multiple scheduler
> > double-triggering
> > as
> > it
> > would make that problem potentially much worse.
> >
> > Max
> >
> > On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <
> > xd.deng.r@gmail.com>
> > wrote:
> >
> > It’s exactly what my team is doing & what I shared here earlier
> > last
> > year
> > (
> >
> >
> >
> >
> >
> >
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> > <
> >
> >
> >
> >
> >
> >
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >
> > )
> >
> > It’s somehow a “hacky” solution (and HA is not addressed), and
> > now
> > I’m
> > thinking how we can have it more proper & robust.
> >
> >
> > XD
> >
> > On 2 Mar 2019, at 12:04 AM, Mario Urquizo <
> > mario.urquizo@gmail.com>
> > wrote:
> >
> > We have been running multiple schedulers for about 3 months.
> > We
> > created
> > multiple services to run airflow schedulers.  The only
> > difference
> > is
> > that
> > we have each of the schedulers pointed to a directory one level
> > deeper
> > than
> > the DAG home directory that the workers and webapp use. We have
> > seen
> > much
> > better scheduling performance but this does not yet help with
> > HA.
> >
> > DAGS_HOME:
> > {airflow_home}/dags  (webapp & workers)
> > {airflow_home}/dags/group-a/ (scheduler1)
> > {airflow_home}/dags/group-b/ (scheduler2)
> > {airflow_home}/dags/group-etc/ (scheduler3)
> >
> > Not sure if this helps, just sharing in case it does.
> >
> > Thank you,
> > Mario
> >
> >
> > On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <
> > bdbruin@gmail.com>
> > wrote:
> >
> > I have done quite some work on making it possible to run
> > multiple
> > schedulers at the same time.  At the moment I don’t think
> > there
> > are
> > real
> > blockers actually to do so. We just don’t actively test it.
> >
> > Database locking is mostly in place (DagRuns and
> > TaskInstances).
> > And
> > I
> > think the worst that can happen is that a task is scheduled
> > twice.
> > The
> > task
> > will detect this most of the time and kill one off if
> > concurrent
> > if
> > not
> > sequential then I will run again in some occasions. Everyone
> > is
> > having
> > idempotent tasks right so no harm done? ;-)
> >
> > Have you encountered issues? Maybe work those out?
> >
> > Cheers
> > Bolke.
> >
> > Verstuurd vanaf mijn iPad
> >
> > Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <
> > xd.deng.r@gmail.com>
> > het
> > volgende geschreven:
> >
> > Hi Max,
> >
> > Following
> >
> >
> >
> >
> >
> >
> >
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> > <
> >
> >
> >
> >
> >
> >
> >
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> > ,
> > I’m trying to prepare an AIP for supporting
> > multiple-scheduler in
> > Airflow
> > (mainly for HA and Higher scheduling performance).
> >
> > Along the process of code checking, I found that there is one
> > attribute
> > of DagModel, “scheduler_lock”. It’s not used at all in current
> > implementation, but it was introduced long time back (2015) to
> > allow
> > multiple schedulers to work together (
> >
> >
> >
> >
> >
> >
> >
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> > <
> >
> >
> >
> >
> >
> >
> >
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> >
> > ).
> >
> > Since you were the original author of it, it would be very
> > helpful
> > if
> > you can kindly share why the multiple-schedulers
> > implementation
> > was
> > removed
> > eventually, and what challenges/complexity there were.
> > (You already shared a few valuable inputs in the earlier
> > discussion
> >
> >
> >
> >
> >
> >
> >
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> > <
> >
> >
> >
> >
> >
> >
> >
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> >
> > , mainly relating to hiccups around concurrency, cross DAG
> > prioritisation &
> > load on DB. Other than these, anything else you would like to
> > advise?)
> >
> > I will also dive into the git history further to understand
> > it
> > better.
> >
> > Thanks.
> >
> >
> > XD
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Jarek Potiuk
> > Polidea <https://www.polidea.com/> | Principal Software Engineer
> >
> > M: +48 660 796 129 <+48660796129>
> > E: jarek.potiuk@polidea.com
> >
> >
> >
> >
> >
> >
> >
>
>

Re: Multiple Schedulers - "scheduler_lock"

Posted by Ash Berlin-Taylor <as...@apache.org>.
Does anything change about your proposal if you do t assume that workers have “quick access” to the DAG files - i.e. what if we are on kube executors and the task spin up time plus git sync time is 30-60s?

(Perhaps this is an extreme case, but we are talking about extreme cases)

> On 18 Mar 2019, at 07:58, Bas Harenslak <ba...@godatadriven.com> wrote:
> 
> Peter,
> 
> The numbers you mention seem to come out of the blue. I think you’re oversimplifying it and cannot simply state 180/36 = 5 minutes. Throwing in numbers without explanation creates confusion.
> 
> I have some questions when reading your AIP. I have to make lots of assumptions and think explaining it more in depth would clarify a lot.
> 
> 
>  *   Should a DagScheduler and task run in the same slots? Should there be any difference between running the two?
>  *   How does this work together with e.g. Celery? With the current Celery setup you push tasks to the queue, which can be run on any worker. With your setup it seems you push a DagScheduler and/or task. Does the DagScheduler push tasks itself to the queue so that any task can run anywhere? And is it correct to assume the DagScheduler polls the DB to check if a task is finished?
>  *   “If a cycle is done the MainScheduler should schedule a new DagScheduler" -> I assume the dagscheduler would set state of a dagrun to finished? And the mainscheduler simply checks when the next interval is finished and to start a new DagScheduler.
>  *   The naming is confusing to me. How about naming the DagScheduler “DagRunManager”, because that’s what I believe it does?
>  *   I’m not convinced this is the way to go. Currently the scheduler process does a lot more than just scheduling. I.e. also parsing of the DAGs, which I believe can be optimised a lot. I think splitting up the responsibilities of the scheduler would be more beneficial, instead of adding complexity by creating more “schedulers”. Would you agree?
> 
> On a final note, to this whole thread: I’m very wary of doing HA/distributed work ourselves. It adds a lot of complexity, locking is a tricky subject (although well thought out by many others, so not impossible). Before going there, I would suggest to put effort into optimising the (single) scheduler first, e.g. by splitting responsibilities between DAG parsing and actual scheduling.
> 
> Cheers,
> Bas
> 
> On 18 Mar 2019, at 07:18, Peter van t Hof <pj...@gmail.com>> wrote:
> 
> Hi,
> 
> My proposal is focusing mainly on scalability and indeed not so much on HA. This mainly because that is also the main issue from the original author. Have a form of HA on this MainScheduler would still be nice to have.
> 
> The problem with is that have a fixed number of scheduler does not scale on the load. On my current client they try to execute 5000+ DAG’s at the same time. A single scheduler cycle to touch all DAG’s takes 2-3 hour. So to do this within 5 min 36 of those schedulers with locking should be there at all time. After 2 hours 2 schedulers would be enough, this means in this situation 34 scheduler processes are wasted and only producing overhead.
> 
> This DagScheduler is a short living task, so this is not a persistent worker process. The MainScheduler should resubmit when it is required.
> 
> Gr,
> Peter
> 
> 
> On 18 Mar 2019, at 05:32, Maxime Beauchemin <ma...@gmail.com>> wrote:
> 
> The proposal reads "Looking at the original AIP-15 the author proposes to
> use locking to enable the use of multiple schedulers, this might introduce
> unnecessary complexity"
> 
> To me introducing multiple roles (master scheduler + scheduler minions),
> may be actually more complex than just having "shared nothing" schedulers
> with locking. The former is also less scalable (whatever workload is done
> on that master [say serialization] can hit scale issues) and is less HA (as
> it relies on the orchestrator [k8s] for HA).
> 
> My personal incline has always been going towards renaming the scheduler to
> "supervisor" (as it already does significantly more than just triggering
> tasks) and allowing many instances of that role, and using locks where
> necessary. That way there are just 2 roles in the cluster: supervisor and
> worker processes. Depending on the executor (say for k8s) you don't even
> need actual persistent worker processes.
> 
> Max
> 
> On Sun, Mar 17, 2019 at 1:52 AM Peter van t Hof <pj...@gmail.com>>
> wrote:
> 
> Hi all,
> 
> I think that scheduling locking is maybe not the best way in solving this
> issue. Still I’m in support of taking a good look at the scheduler because
> it has some real scaling issues.
> 
> I did wrote an alternative proposal to solve the scalability of the
> scheduler:
> 
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler
> <
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler
> 
> 
> Any input on this is welcome.
> 
> Gr,
> Peter
> 
> 
> On 3 Mar 2019, at 03:26, Deng Xiaodong <xd...@gmail.com> wrote:
> 
> Thanks Max.
> 
> I have documented all the discussions around this topic & useful inputs
> into AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling
> Performance)
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651
> <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>.
> 
> 
> More inputs from folks are welcomed.
> 
> Thanks.
> 
> 
> XD
> 
> On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin <
> maximebeauchemin@gmail.com> wrote:
> 
> Personally I'd vote against the idea of having certain scheduler
> handling a
> subset of the DAGs, that's just not HA.
> 
> Also if you are in an env where you have a small number of large DAGs,
> the
> odds of having wasted work and double-firing get pretty high.
> 
> With the lock in place, it's just a matter of the scheduler loop to
> select
> (in a db transaction) the dag that's not been processed for the longest
> time that is not locked. Flipping the lock flag to true should be part
> of
> the db transaction. We probably need a btree index on lock and last
> processed time.
> 
> This way adding scheduler processes increases the scheduling pace, and
> provides an HA solution. No leader / master / slave or election process,
> just equal workers that work together.
> 
> Max
> 
> On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd...@gmail.com>
> wrote:
> 
> Get your point and agree. And the suggestion you gave lastly to random
> sort DAGs is a great idea to address it. Thanks!
> 
> XD
> 
> On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <Ja...@polidea.com>
> wrote:
> 
> I think that the probability calculation holds only if there is no
> correlation between different schedulers. I think however there might
> be
> an
> accidental correlation if you think about typical deployments.
> 
> Some details why I think accidental correlation is possible and even
> likely. Assume that:
> 
> - we have similar and similarly busy machines running schedulers
> (likely)
> - time is synchronised between the machines (likely)
> - the machines have the same DAG folders mounted (or copied) and the
> same filesystem is used (this is exactly what multiple schedulers
> deployment is all about)
> - the schedulers start scanning at exactly the same time (crossing
> 0:00
> second every full five minutes for example)  - this I am not sure but
> I
> imagine this might be "typical" behaviour.
> - they process list of DAGs in exactly the same sequence (it looks
> like
> this is the case dag_processing
> <
> 
> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300
> 
> and models/__init__
> <
> 
> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567
> :
> we use os.walk which uses os.listdir for which sequence of processing
> depends on the filesystem implementation
> <
> 
> https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic>
> and
> then we append files to the list)
> 
> Then it's rather likely that the schedulers will be competing about
> the
> very same DAGs at the very beginning. Locking will change how quickly
> they
> process each DAG of course, but If the DAGs are of similar sizes it's
> also
> likely that the speed of scanning (DAGS/s) is similar for all
> schedulers.
> The schedulers will then catch-up with each other and might pretty
> much
> continuously compete for the same DAGs almost all the time.
> 
> It can be mitigated super-easily by random sorting of the DAGs folder
> list
> after it is prepared (it's file-system dependent now so we do not
> rely on
> particular order) . Then the probability numbers will hold perfectly I
> think :)
> 
> J.
> 
> 
> On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd...@gmail.com>
> wrote:
> 
> I’m thinking of which architecture would be ideal.
> 
> 
> # Option-1:
> The master-slave architecture would be one option. But
> leader-selection
> will be very essential to consider, otherwise we have issue in terms
> of
> HA
> again.
> 
> 
> # Option-2:
> Another option we may consider is to simply start multiple scheduler
> instances (just using the current implementation, after modify &
> validate
> the scheduler_lock on DagModel).
> 
> - In this case, given we handle everything properly using locking, we
> don’t need to worry too much about double-scheduling/triggering.
> 
> - Another potential concern I had earlier is that different
> schedulers
> may
> compete with each other and cause “waste” of scheduler resource.
> After further thinking, I realise this is a typical Birthday Problem.
> Given we have m DAGs, and n schedulers, at any moment, the
> probability
> that all schedulers are working on different DAGs is m!/((m-n)! *
> (m^n)),
> and the probability that there are schedulers competing on the same
> DAG
> will be 1-m!/((m-n)! * (m^n)).
> 
> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment,
> the
> probability that there is schedulers competing on the same DAG is
> only
> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is
> only
> 0.33%.
> (This probability will be higher if m/n is low. But users should not
> start
> too many schedulers if they don’t have that many DAGs).
> 
> Given the probability of schedulers competing is so low, my concern
> on
> scheduler resource waste is not really valid.
> 
> 
> 
> Based on these calculations/assessment, I think we can go for
> option-2,
> i.e. we don’t make big change in the current implementation.
> Instead, we
> ensure the scheduler_lock is working well and test intensively on
> running
> multiple schedulers. Then we should be good to let users know that
> it’s
> safe to run multiple schedulers.
> 
> Please share your thoughts on this and correct me if I’m wrong in any
> point above. Thanks.
> 
> 
> XD
> 
> 
> Reference: https://en.wikipedia.org/wiki/Birthday_problem <
> https://en.wikipedia.org/wiki/Birthday_problem>
> 
> 
> On 2 Mar 2019, at 3:39 PM, Tao Feng <fe...@gmail.com> wrote:
> 
> Does the proposal use master-slave architecture(leader scheduler vs
> slave
> scheduler)?
> 
> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yr...@gmail.com>
> wrote:
> 
> Preventing double-triggering by separating DAG files different
> schedulers
> parse sounds easier and more intuitive. I actually removed one of
> the
> double-triggering prevention logic here
> <
> 
> 
> 
> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
> (expensive)
> and
> was relying on this lock
> <
> 
> 
> 
> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
> 
> to
> prevent double-firing and safe-guard our non-idempotent tasks( btw
> the
> insert can be insert overwrite to be idempotent).
> 
> Also tho in Airbnb we requeue tasks a lot, we haven't see
> double-firing
> recently.
> 
> Cheers,
> Kevin Y
> 
> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
> maximebeauchemin@gmail.com>
> wrote:
> 
> Forgot to mention: the intention was to use the lock, but I never
> personally got to do the second phase which would consist of
> skipping
> the
> DAG if the lock is on, and expire the lock eventually based on a
> config
> setting.
> 
> Max
> 
> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
> maximebeauchemin@gmail.com>
> wrote:
> 
> My original intention with the lock was preventing
> "double-triggering"
> of
> task (triggering refers to the scheduler putting the message in
> the
> queue).
> Airflow now has good "double-firing-prevention" of tasks (firing
> happens
> when the worker receives the message and starts the task), even
> if
> the
> scheduler was to go rogue or restart and send multiple triggers
> for
> a
> task
> instance, the worker(s) should only start one task instance.
> That's
> done
> by
> running the database assertions behind the conditions being met
> as
> read
> database transaction (no task can alter the rows that validate
> the
> assertion while it's getting asserted). In practice it's a little
> tricky
> and we've seen rogue double-firing in the past (I have no idea
> how
> often
> that happens).
> 
> If we do want to prevent double-triggerring, we should make sure
> that
> 2
> schedulers aren't processing the same DAG or DagRun at the same
> time.
> That
> would mean for the scheduler to not start the process of locked
> DAGs,
> and
> by providing a mechanism to expire the locks after some time.
> 
> Has anyone experienced double firing lately? If that exist we
> should
> fix
> it, but also be careful around multiple scheduler
> double-triggering
> as
> it
> would make that problem potentially much worse.
> 
> Max
> 
> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <
> xd.deng.r@gmail.com>
> wrote:
> 
> It’s exactly what my team is doing & what I shared here earlier
> last
> year
> (
> 
> 
> 
> 
> 
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> <
> 
> 
> 
> 
> 
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> 
> )
> 
> It’s somehow a “hacky” solution (and HA is not addressed), and
> now
> I’m
> thinking how we can have it more proper & robust.
> 
> 
> XD
> 
> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <
> mario.urquizo@gmail.com>
> wrote:
> 
> We have been running multiple schedulers for about 3 months.
> We
> created
> multiple services to run airflow schedulers.  The only
> difference
> is
> that
> we have each of the schedulers pointed to a directory one level
> deeper
> than
> the DAG home directory that the workers and webapp use. We have
> seen
> much
> better scheduling performance but this does not yet help with
> HA.
> 
> DAGS_HOME:
> {airflow_home}/dags  (webapp & workers)
> {airflow_home}/dags/group-a/ (scheduler1)
> {airflow_home}/dags/group-b/ (scheduler2)
> {airflow_home}/dags/group-etc/ (scheduler3)
> 
> Not sure if this helps, just sharing in case it does.
> 
> Thank you,
> Mario
> 
> 
> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <
> bdbruin@gmail.com>
> wrote:
> 
> I have done quite some work on making it possible to run
> multiple
> schedulers at the same time.  At the moment I don’t think
> there
> are
> real
> blockers actually to do so. We just don’t actively test it.
> 
> Database locking is mostly in place (DagRuns and
> TaskInstances).
> And
> I
> think the worst that can happen is that a task is scheduled
> twice.
> The
> task
> will detect this most of the time and kill one off if
> concurrent
> if
> not
> sequential then I will run again in some occasions. Everyone
> is
> having
> idempotent tasks right so no harm done? ;-)
> 
> Have you encountered issues? Maybe work those out?
> 
> Cheers
> Bolke.
> 
> Verstuurd vanaf mijn iPad
> 
> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <
> xd.deng.r@gmail.com>
> het
> volgende geschreven:
> 
> Hi Max,
> 
> Following
> 
> 
> 
> 
> 
> 
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> <
> 
> 
> 
> 
> 
> 
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> ,
> I’m trying to prepare an AIP for supporting
> multiple-scheduler in
> Airflow
> (mainly for HA and Higher scheduling performance).
> 
> Along the process of code checking, I found that there is one
> attribute
> of DagModel, “scheduler_lock”. It’s not used at all in current
> implementation, but it was introduced long time back (2015) to
> allow
> multiple schedulers to work together (
> 
> 
> 
> 
> 
> 
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> <
> 
> 
> 
> 
> 
> 
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> 
> ).
> 
> Since you were the original author of it, it would be very
> helpful
> if
> you can kindly share why the multiple-schedulers
> implementation
> was
> removed
> eventually, and what challenges/complexity there were.
> (You already shared a few valuable inputs in the earlier
> discussion
> 
> 
> 
> 
> 
> 
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> <
> 
> 
> 
> 
> 
> 
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> 
> , mainly relating to hiccups around concurrency, cross DAG
> prioritisation &
> load on DB. Other than these, anything else you would like to
> advise?)
> 
> I will also dive into the git history further to understand
> it
> better.
> 
> Thanks.
> 
> 
> XD
> 
> 
> 
> 
> 
> 
> 
> 
> --
> 
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
> 
> M: +48 660 796 129 <+48660796129>
> E: jarek.potiuk@polidea.com
> 
> 
> 
> 
> 
> 
> 


Re: Multiple Schedulers - "scheduler_lock"

Posted by Bas Harenslak <ba...@godatadriven.com>.
Peter,

The numbers you mention seem to come out of the blue. I think you’re oversimplifying it and cannot simply state 180/36 = 5 minutes. Throwing in numbers without explanation creates confusion.

I have some questions when reading your AIP. I have to make lots of assumptions and think explaining it more in depth would clarify a lot.


  *   Should a DagScheduler and task run in the same slots? Should there be any difference between running the two?
  *   How does this work together with e.g. Celery? With the current Celery setup you push tasks to the queue, which can be run on any worker. With your setup it seems you push a DagScheduler and/or task. Does the DagScheduler push tasks itself to the queue so that any task can run anywhere? And is it correct to assume the DagScheduler polls the DB to check if a task is finished?
  *   “If a cycle is done the MainScheduler should schedule a new DagScheduler" -> I assume the dagscheduler would set state of a dagrun to finished? And the mainscheduler simply checks when the next interval is finished and to start a new DagScheduler.
  *   The naming is confusing to me. How about naming the DagScheduler “DagRunManager”, because that’s what I believe it does?
  *   I’m not convinced this is the way to go. Currently the scheduler process does a lot more than just scheduling. I.e. also parsing of the DAGs, which I believe can be optimised a lot. I think splitting up the responsibilities of the scheduler would be more beneficial, instead of adding complexity by creating more “schedulers”. Would you agree?

On a final note, to this whole thread: I’m very wary of doing HA/distributed work ourselves. It adds a lot of complexity, locking is a tricky subject (although well thought out by many others, so not impossible). Before going there, I would suggest to put effort into optimising the (single) scheduler first, e.g. by splitting responsibilities between DAG parsing and actual scheduling.

Cheers,
Bas

On 18 Mar 2019, at 07:18, Peter van t Hof <pj...@gmail.com>> wrote:

Hi,

My proposal is focusing mainly on scalability and indeed not so much on HA. This mainly because that is also the main issue from the original author. Have a form of HA on this MainScheduler would still be nice to have.

The problem with is that have a fixed number of scheduler does not scale on the load. On my current client they try to execute 5000+ DAG’s at the same time. A single scheduler cycle to touch all DAG’s takes 2-3 hour. So to do this within 5 min 36 of those schedulers with locking should be there at all time. After 2 hours 2 schedulers would be enough, this means in this situation 34 scheduler processes are wasted and only producing overhead.

This DagScheduler is a short living task, so this is not a persistent worker process. The MainScheduler should resubmit when it is required.

Gr,
Peter


On 18 Mar 2019, at 05:32, Maxime Beauchemin <ma...@gmail.com>> wrote:

The proposal reads "Looking at the original AIP-15 the author proposes to
use locking to enable the use of multiple schedulers, this might introduce
unnecessary complexity"

To me introducing multiple roles (master scheduler + scheduler minions),
may be actually more complex than just having "shared nothing" schedulers
with locking. The former is also less scalable (whatever workload is done
on that master [say serialization] can hit scale issues) and is less HA (as
it relies on the orchestrator [k8s] for HA).

My personal incline has always been going towards renaming the scheduler to
"supervisor" (as it already does significantly more than just triggering
tasks) and allowing many instances of that role, and using locks where
necessary. That way there are just 2 roles in the cluster: supervisor and
worker processes. Depending on the executor (say for k8s) you don't even
need actual persistent worker processes.

Max

On Sun, Mar 17, 2019 at 1:52 AM Peter van t Hof <pj...@gmail.com>>
wrote:

Hi all,

I think that scheduling locking is maybe not the best way in solving this
issue. Still I’m in support of taking a good look at the scheduler because
it has some real scaling issues.

I did wrote an alternative proposal to solve the scalability of the
scheduler:

https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler
<
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler


Any input on this is welcome.

Gr,
Peter


On 3 Mar 2019, at 03:26, Deng Xiaodong <xd...@gmail.com> wrote:

Thanks Max.

I have documented all the discussions around this topic & useful inputs
into AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling
Performance)
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651
<
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>.


More inputs from folks are welcomed.

Thanks.


XD

On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin <
maximebeauchemin@gmail.com> wrote:

Personally I'd vote against the idea of having certain scheduler
handling a
subset of the DAGs, that's just not HA.

Also if you are in an env where you have a small number of large DAGs,
the
odds of having wasted work and double-firing get pretty high.

With the lock in place, it's just a matter of the scheduler loop to
select
(in a db transaction) the dag that's not been processed for the longest
time that is not locked. Flipping the lock flag to true should be part
of
the db transaction. We probably need a btree index on lock and last
processed time.

This way adding scheduler processes increases the scheduling pace, and
provides an HA solution. No leader / master / slave or election process,
just equal workers that work together.

Max

On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd...@gmail.com>
wrote:

Get your point and agree. And the suggestion you gave lastly to random
sort DAGs is a great idea to address it. Thanks!

XD

On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <Ja...@polidea.com>
wrote:

I think that the probability calculation holds only if there is no
correlation between different schedulers. I think however there might
be
an
accidental correlation if you think about typical deployments.

Some details why I think accidental correlation is possible and even
likely. Assume that:

- we have similar and similarly busy machines running schedulers
(likely)
- time is synchronised between the machines (likely)
- the machines have the same DAG folders mounted (or copied) and the
same filesystem is used (this is exactly what multiple schedulers
deployment is all about)
- the schedulers start scanning at exactly the same time (crossing
0:00
second every full five minutes for example)  - this I am not sure but
I
imagine this might be "typical" behaviour.
- they process list of DAGs in exactly the same sequence (it looks
like
this is the case dag_processing
<

https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300

and models/__init__
<

https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567
:
we use os.walk which uses os.listdir for which sequence of processing
depends on the filesystem implementation
<

https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic>
and
then we append files to the list)

Then it's rather likely that the schedulers will be competing about
the
very same DAGs at the very beginning. Locking will change how quickly
they
process each DAG of course, but If the DAGs are of similar sizes it's
also
likely that the speed of scanning (DAGS/s) is similar for all
schedulers.
The schedulers will then catch-up with each other and might pretty
much
continuously compete for the same DAGs almost all the time.

It can be mitigated super-easily by random sorting of the DAGs folder
list
after it is prepared (it's file-system dependent now so we do not
rely on
particular order) . Then the probability numbers will hold perfectly I
think :)

J.


On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd...@gmail.com>
wrote:

I’m thinking of which architecture would be ideal.


# Option-1:
The master-slave architecture would be one option. But
leader-selection
will be very essential to consider, otherwise we have issue in terms
of
HA
again.


# Option-2:
Another option we may consider is to simply start multiple scheduler
instances (just using the current implementation, after modify &
validate
the scheduler_lock on DagModel).

- In this case, given we handle everything properly using locking, we
don’t need to worry too much about double-scheduling/triggering.

- Another potential concern I had earlier is that different
schedulers
may
compete with each other and cause “waste” of scheduler resource.
After further thinking, I realise this is a typical Birthday Problem.
Given we have m DAGs, and n schedulers, at any moment, the
probability
that all schedulers are working on different DAGs is m!/((m-n)! *
(m^n)),
and the probability that there are schedulers competing on the same
DAG
will be 1-m!/((m-n)! * (m^n)).

Let’s say we have 200 DAGs and we start 2 schedulers. At any moment,
the
probability that there is schedulers competing on the same DAG is
only
0.5%. If we run 2 schedulers against 300 DAGs, this probability is
only
0.33%.
(This probability will be higher if m/n is low. But users should not
start
too many schedulers if they don’t have that many DAGs).

Given the probability of schedulers competing is so low, my concern
on
scheduler resource waste is not really valid.



Based on these calculations/assessment, I think we can go for
option-2,
i.e. we don’t make big change in the current implementation.
Instead, we
ensure the scheduler_lock is working well and test intensively on
running
multiple schedulers. Then we should be good to let users know that
it’s
safe to run multiple schedulers.

Please share your thoughts on this and correct me if I’m wrong in any
point above. Thanks.


XD


Reference: https://en.wikipedia.org/wiki/Birthday_problem <
https://en.wikipedia.org/wiki/Birthday_problem>


On 2 Mar 2019, at 3:39 PM, Tao Feng <fe...@gmail.com> wrote:

Does the proposal use master-slave architecture(leader scheduler vs
slave
scheduler)?

On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yr...@gmail.com>
wrote:

Preventing double-triggering by separating DAG files different
schedulers
parse sounds easier and more intuitive. I actually removed one of
the
double-triggering prevention logic here
<



https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
(expensive)
and
was relying on this lock
<



https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233

to
prevent double-firing and safe-guard our non-idempotent tasks( btw
the
insert can be insert overwrite to be idempotent).

Also tho in Airbnb we requeue tasks a lot, we haven't see
double-firing
recently.

Cheers,
Kevin Y

On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
maximebeauchemin@gmail.com>
wrote:

Forgot to mention: the intention was to use the lock, but I never
personally got to do the second phase which would consist of
skipping
the
DAG if the lock is on, and expire the lock eventually based on a
config
setting.

Max

On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
maximebeauchemin@gmail.com>
wrote:

My original intention with the lock was preventing
"double-triggering"
of
task (triggering refers to the scheduler putting the message in
the
queue).
Airflow now has good "double-firing-prevention" of tasks (firing
happens
when the worker receives the message and starts the task), even
if
the
scheduler was to go rogue or restart and send multiple triggers
for
a
task
instance, the worker(s) should only start one task instance.
That's
done
by
running the database assertions behind the conditions being met
as
read
database transaction (no task can alter the rows that validate
the
assertion while it's getting asserted). In practice it's a little
tricky
and we've seen rogue double-firing in the past (I have no idea
how
often
that happens).

If we do want to prevent double-triggerring, we should make sure
that
2
schedulers aren't processing the same DAG or DagRun at the same
time.
That
would mean for the scheduler to not start the process of locked
DAGs,
and
by providing a mechanism to expire the locks after some time.

Has anyone experienced double firing lately? If that exist we
should
fix
it, but also be careful around multiple scheduler
double-triggering
as
it
would make that problem potentially much worse.

Max

On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <
xd.deng.r@gmail.com>
wrote:

It’s exactly what my team is doing & what I shared here earlier
last
year
(





https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
<





https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E

)

It’s somehow a “hacky” solution (and HA is not addressed), and
now
I’m
thinking how we can have it more proper & robust.


XD

On 2 Mar 2019, at 12:04 AM, Mario Urquizo <
mario.urquizo@gmail.com>
wrote:

We have been running multiple schedulers for about 3 months.
We
created
multiple services to run airflow schedulers.  The only
difference
is
that
we have each of the schedulers pointed to a directory one level
deeper
than
the DAG home directory that the workers and webapp use. We have
seen
much
better scheduling performance but this does not yet help with
HA.

DAGS_HOME:
{airflow_home}/dags  (webapp & workers)
{airflow_home}/dags/group-a/ (scheduler1)
{airflow_home}/dags/group-b/ (scheduler2)
{airflow_home}/dags/group-etc/ (scheduler3)

Not sure if this helps, just sharing in case it does.

Thank you,
Mario


On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <
bdbruin@gmail.com>
wrote:

I have done quite some work on making it possible to run
multiple
schedulers at the same time.  At the moment I don’t think
there
are
real
blockers actually to do so. We just don’t actively test it.

Database locking is mostly in place (DagRuns and
TaskInstances).
And
I
think the worst that can happen is that a task is scheduled
twice.
The
task
will detect this most of the time and kill one off if
concurrent
if
not
sequential then I will run again in some occasions. Everyone
is
having
idempotent tasks right so no harm done? ;-)

Have you encountered issues? Maybe work those out?

Cheers
Bolke.

Verstuurd vanaf mijn iPad

Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <
xd.deng.r@gmail.com>
het
volgende geschreven:

Hi Max,

Following






https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
<






https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
,
I’m trying to prepare an AIP for supporting
multiple-scheduler in
Airflow
(mainly for HA and Higher scheduling performance).

Along the process of code checking, I found that there is one
attribute
of DagModel, “scheduler_lock”. It’s not used at all in current
implementation, but it was introduced long time back (2015) to
allow
multiple schedulers to work together (






https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
<






https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620

).

Since you were the original author of it, it would be very
helpful
if
you can kindly share why the multiple-schedulers
implementation
was
removed
eventually, and what challenges/complexity there were.
(You already shared a few valuable inputs in the earlier
discussion






https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
<






https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E

, mainly relating to hiccups around concurrency, cross DAG
prioritisation &
load on DB. Other than these, anything else you would like to
advise?)

I will also dive into the git history further to understand
it
better.

Thanks.


XD








--

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
E: jarek.potiuk@polidea.com








Re: Multiple Schedulers - "scheduler_lock"

Posted by Peter van t Hof <pj...@gmail.com>.
Hi,

My proposal is focusing mainly on scalability and indeed not so much on HA. This mainly because that is also the main issue from the original author. Have a form of HA on this MainScheduler would still be nice to have.

The problem with is that have a fixed number of scheduler does not scale on the load. On my current client they try to execute 5000+ DAG’s at the same time. A single scheduler cycle to touch all DAG’s takes 2-3 hour. So to do this within 5 min 36 of those schedulers with locking should be there at all time. After 2 hours 2 schedulers would be enough, this means in this situation 34 scheduler processes are wasted and only producing overhead.

This DagScheduler is a short living task, so this is not a persistent worker process. The MainScheduler should resubmit when it is required.

Gr,
Peter


> On 18 Mar 2019, at 05:32, Maxime Beauchemin <ma...@gmail.com> wrote:
> 
> The proposal reads "Looking at the original AIP-15 the author proposes to
> use locking to enable the use of multiple schedulers, this might introduce
> unnecessary complexity"
> 
> To me introducing multiple roles (master scheduler + scheduler minions),
> may be actually more complex than just having "shared nothing" schedulers
> with locking. The former is also less scalable (whatever workload is done
> on that master [say serialization] can hit scale issues) and is less HA (as
> it relies on the orchestrator [k8s] for HA).
> 
> My personal incline has always been going towards renaming the scheduler to
> "supervisor" (as it already does significantly more than just triggering
> tasks) and allowing many instances of that role, and using locks where
> necessary. That way there are just 2 roles in the cluster: supervisor and
> worker processes. Depending on the executor (say for k8s) you don't even
> need actual persistent worker processes.
> 
> Max
> 
> On Sun, Mar 17, 2019 at 1:52 AM Peter van t Hof <pj...@gmail.com>
> wrote:
> 
>> Hi all,
>> 
>> I think that scheduling locking is maybe not the best way in solving this
>> issue. Still I’m in support of taking a good look at the scheduler because
>> it has some real scaling issues.
>> 
>> I did wrote an alternative proposal to solve the scalability of the
>> scheduler:
>> 
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler
>> <
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler
>>> 
>> 
>> Any input on this is welcome.
>> 
>> Gr,
>> Peter
>> 
>> 
>>> On 3 Mar 2019, at 03:26, Deng Xiaodong <xd...@gmail.com> wrote:
>>> 
>>> Thanks Max.
>>> 
>>> I have documented all the discussions around this topic & useful inputs
>> into AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling
>> Performance)
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651
>> <
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>.
>> 
>>> 
>>> More inputs from folks are welcomed.
>>> 
>>> Thanks.
>>> 
>>> 
>>> XD
>>> 
>>>> On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin <
>> maximebeauchemin@gmail.com> wrote:
>>>> 
>>>> Personally I'd vote against the idea of having certain scheduler
>> handling a
>>>> subset of the DAGs, that's just not HA.
>>>> 
>>>> Also if you are in an env where you have a small number of large DAGs,
>> the
>>>> odds of having wasted work and double-firing get pretty high.
>>>> 
>>>> With the lock in place, it's just a matter of the scheduler loop to
>> select
>>>> (in a db transaction) the dag that's not been processed for the longest
>>>> time that is not locked. Flipping the lock flag to true should be part
>> of
>>>> the db transaction. We probably need a btree index on lock and last
>>>> processed time.
>>>> 
>>>> This way adding scheduler processes increases the scheduling pace, and
>>>> provides an HA solution. No leader / master / slave or election process,
>>>> just equal workers that work together.
>>>> 
>>>> Max
>>>> 
>>>> On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd...@gmail.com>
>> wrote:
>>>> 
>>>>> Get your point and agree. And the suggestion you gave lastly to random
>>>>> sort DAGs is a great idea to address it. Thanks!
>>>>> 
>>>>> XD
>>>>> 
>>>>>> On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <Ja...@polidea.com>
>>>>> wrote:
>>>>>> 
>>>>>> I think that the probability calculation holds only if there is no
>>>>>> correlation between different schedulers. I think however there might
>> be
>>>>> an
>>>>>> accidental correlation if you think about typical deployments.
>>>>>> 
>>>>>> Some details why I think accidental correlation is possible and even
>>>>>> likely. Assume that:
>>>>>> 
>>>>>> - we have similar and similarly busy machines running schedulers
>>>>> (likely)
>>>>>> - time is synchronised between the machines (likely)
>>>>>> - the machines have the same DAG folders mounted (or copied) and the
>>>>>> same filesystem is used (this is exactly what multiple schedulers
>>>>>> deployment is all about)
>>>>>> - the schedulers start scanning at exactly the same time (crossing
>> 0:00
>>>>>> second every full five minutes for example)  - this I am not sure but
>> I
>>>>>> imagine this might be "typical" behaviour.
>>>>>> - they process list of DAGs in exactly the same sequence (it looks
>> like
>>>>>> this is the case dag_processing
>>>>>> <
>>>>> 
>> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300
>>>>>> 
>>>>>> and models/__init__
>>>>>> <
>>>>> 
>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567
>>>>>> :
>>>>>> we use os.walk which uses os.listdir for which sequence of processing
>>>>>> depends on the filesystem implementation
>>>>>> <
>>>>> 
>> https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic>
>>>>>> and
>>>>>> then we append files to the list)
>>>>>> 
>>>>>> Then it's rather likely that the schedulers will be competing about
>> the
>>>>>> very same DAGs at the very beginning. Locking will change how quickly
>>>>> they
>>>>>> process each DAG of course, but If the DAGs are of similar sizes it's
>>>>> also
>>>>>> likely that the speed of scanning (DAGS/s) is similar for all
>> schedulers.
>>>>>> The schedulers will then catch-up with each other and might pretty
>> much
>>>>>> continuously compete for the same DAGs almost all the time.
>>>>>> 
>>>>>> It can be mitigated super-easily by random sorting of the DAGs folder
>>>>> list
>>>>>> after it is prepared (it's file-system dependent now so we do not
>> rely on
>>>>>> particular order) . Then the probability numbers will hold perfectly I
>>>>>> think :)
>>>>>> 
>>>>>> J.
>>>>>> 
>>>>>> 
>>>>>> On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>>> I’m thinking of which architecture would be ideal.
>>>>>>> 
>>>>>>> 
>>>>>>> # Option-1:
>>>>>>> The master-slave architecture would be one option. But
>> leader-selection
>>>>>>> will be very essential to consider, otherwise we have issue in terms
>> of
>>>>> HA
>>>>>>> again.
>>>>>>> 
>>>>>>> 
>>>>>>> # Option-2:
>>>>>>> Another option we may consider is to simply start multiple scheduler
>>>>>>> instances (just using the current implementation, after modify &
>>>>> validate
>>>>>>> the scheduler_lock on DagModel).
>>>>>>> 
>>>>>>> - In this case, given we handle everything properly using locking, we
>>>>>>> don’t need to worry too much about double-scheduling/triggering.
>>>>>>> 
>>>>>>> - Another potential concern I had earlier is that different
>> schedulers
>>>>> may
>>>>>>> compete with each other and cause “waste” of scheduler resource.
>>>>>>> After further thinking, I realise this is a typical Birthday Problem.
>>>>>>> Given we have m DAGs, and n schedulers, at any moment, the
>> probability
>>>>>>> that all schedulers are working on different DAGs is m!/((m-n)! *
>>>>> (m^n)),
>>>>>>> and the probability that there are schedulers competing on the same
>> DAG
>>>>>>> will be 1-m!/((m-n)! * (m^n)).
>>>>>>> 
>>>>>>> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment,
>> the
>>>>>>> probability that there is schedulers competing on the same DAG is
>> only
>>>>>>> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is
>> only
>>>>>>> 0.33%.
>>>>>>> (This probability will be higher if m/n is low. But users should not
>>>>> start
>>>>>>> too many schedulers if they don’t have that many DAGs).
>>>>>>> 
>>>>>>> Given the probability of schedulers competing is so low, my concern
>> on
>>>>>>> scheduler resource waste is not really valid.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Based on these calculations/assessment, I think we can go for
>> option-2,
>>>>>>> i.e. we don’t make big change in the current implementation.
>> Instead, we
>>>>>>> ensure the scheduler_lock is working well and test intensively on
>>>>> running
>>>>>>> multiple schedulers. Then we should be good to let users know that
>> it’s
>>>>>>> safe to run multiple schedulers.
>>>>>>> 
>>>>>>> Please share your thoughts on this and correct me if I’m wrong in any
>>>>>>> point above. Thanks.
>>>>>>> 
>>>>>>> 
>>>>>>> XD
>>>>>>> 
>>>>>>> 
>>>>>>> Reference: https://en.wikipedia.org/wiki/Birthday_problem <
>>>>>>> https://en.wikipedia.org/wiki/Birthday_problem>
>>>>>>> 
>>>>>>> 
>>>>>>>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fe...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>> Does the proposal use master-slave architecture(leader scheduler vs
>>>>> slave
>>>>>>>> scheduler)?
>>>>>>>> 
>>>>>>>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yr...@gmail.com>
>> wrote:
>>>>>>>> 
>>>>>>>>> Preventing double-triggering by separating DAG files different
>>>>>>> schedulers
>>>>>>>>> parse sounds easier and more intuitive. I actually removed one of
>> the
>>>>>>>>> double-triggering prevention logic here
>>>>>>>>> <
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
>>>>>>>>>> (expensive)
>>>>>>>>> and
>>>>>>>>> was relying on this lock
>>>>>>>>> <
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
>>>>>>>>>> 
>>>>>>>>> to
>>>>>>>>> prevent double-firing and safe-guard our non-idempotent tasks( btw
>> the
>>>>>>>>> insert can be insert overwrite to be idempotent).
>>>>>>>>> 
>>>>>>>>> Also tho in Airbnb we requeue tasks a lot, we haven't see
>>>>> double-firing
>>>>>>>>> recently.
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Kevin Y
>>>>>>>>> 
>>>>>>>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
>>>>>>>>> maximebeauchemin@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Forgot to mention: the intention was to use the lock, but I never
>>>>>>>>>> personally got to do the second phase which would consist of
>> skipping
>>>>>>> the
>>>>>>>>>> DAG if the lock is on, and expire the lock eventually based on a
>>>>> config
>>>>>>>>>> setting.
>>>>>>>>>> 
>>>>>>>>>> Max
>>>>>>>>>> 
>>>>>>>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
>>>>>>>>>> maximebeauchemin@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> My original intention with the lock was preventing
>>>>> "double-triggering"
>>>>>>>>> of
>>>>>>>>>>> task (triggering refers to the scheduler putting the message in
>> the
>>>>>>>>>> queue).
>>>>>>>>>>> Airflow now has good "double-firing-prevention" of tasks (firing
>>>>>>>>> happens
>>>>>>>>>>> when the worker receives the message and starts the task), even
>> if
>>>>> the
>>>>>>>>>>> scheduler was to go rogue or restart and send multiple triggers
>> for
>>>>> a
>>>>>>>>>> task
>>>>>>>>>>> instance, the worker(s) should only start one task instance.
>> That's
>>>>>>>>> done
>>>>>>>>>> by
>>>>>>>>>>> running the database assertions behind the conditions being met
>> as
>>>>>>> read
>>>>>>>>>>> database transaction (no task can alter the rows that validate
>> the
>>>>>>>>>>> assertion while it's getting asserted). In practice it's a little
>>>>>>>>> tricky
>>>>>>>>>>> and we've seen rogue double-firing in the past (I have no idea
>> how
>>>>>>>>> often
>>>>>>>>>>> that happens).
>>>>>>>>>>> 
>>>>>>>>>>> If we do want to prevent double-triggerring, we should make sure
>>>>> that
>>>>>>> 2
>>>>>>>>>>> schedulers aren't processing the same DAG or DagRun at the same
>>>>> time.
>>>>>>>>>> That
>>>>>>>>>>> would mean for the scheduler to not start the process of locked
>>>>> DAGs,
>>>>>>>>> and
>>>>>>>>>>> by providing a mechanism to expire the locks after some time.
>>>>>>>>>>> 
>>>>>>>>>>> Has anyone experienced double firing lately? If that exist we
>> should
>>>>>>>>> fix
>>>>>>>>>>> it, but also be careful around multiple scheduler
>> double-triggering
>>>>> as
>>>>>>>>> it
>>>>>>>>>>> would make that problem potentially much worse.
>>>>>>>>>>> 
>>>>>>>>>>> Max
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <
>> xd.deng.r@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> It’s exactly what my team is doing & what I shared here earlier
>>>>> last
>>>>>>>>>> year
>>>>>>>>>>>> (
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>>> <
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>> 
>>>>>>>>>>>> )
>>>>>>>>>>>> 
>>>>>>>>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and
>> now
>>>>>>> I’m
>>>>>>>>>>>> thinking how we can have it more proper & robust.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> XD
>>>>>>>>>>>> 
>>>>>>>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <
>>>>> mario.urquizo@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> We have been running multiple schedulers for about 3 months.
>> We
>>>>>>>>>> created
>>>>>>>>>>>>> multiple services to run airflow schedulers.  The only
>> difference
>>>>> is
>>>>>>>>>>>> that
>>>>>>>>>>>>> we have each of the schedulers pointed to a directory one level
>>>>>>>>> deeper
>>>>>>>>>>>> than
>>>>>>>>>>>>> the DAG home directory that the workers and webapp use. We have
>>>>> seen
>>>>>>>>>>>> much
>>>>>>>>>>>>> better scheduling performance but this does not yet help with
>> HA.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> DAGS_HOME:
>>>>>>>>>>>>> {airflow_home}/dags  (webapp & workers)
>>>>>>>>>>>>> {airflow_home}/dags/group-a/ (scheduler1)
>>>>>>>>>>>>> {airflow_home}/dags/group-b/ (scheduler2)
>>>>>>>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Not sure if this helps, just sharing in case it does.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thank you,
>>>>>>>>>>>>> Mario
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <
>> bdbruin@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I have done quite some work on making it possible to run
>> multiple
>>>>>>>>>>>>>> schedulers at the same time.  At the moment I don’t think
>> there
>>>>> are
>>>>>>>>>>>> real
>>>>>>>>>>>>>> blockers actually to do so. We just don’t actively test it.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Database locking is mostly in place (DagRuns and
>> TaskInstances).
>>>>>>>>> And
>>>>>>>>>> I
>>>>>>>>>>>>>> think the worst that can happen is that a task is scheduled
>>>>> twice.
>>>>>>>>>> The
>>>>>>>>>>>> task
>>>>>>>>>>>>>> will detect this most of the time and kill one off if
>> concurrent
>>>>> if
>>>>>>>>>> not
>>>>>>>>>>>>>> sequential then I will run again in some occasions. Everyone
>> is
>>>>>>>>>> having
>>>>>>>>>>>>>> idempotent tasks right so no harm done? ;-)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Have you encountered issues? Maybe work those out?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Cheers
>>>>>>>>>>>>>> Bolke.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Verstuurd vanaf mijn iPad
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <
>>>>> xd.deng.r@gmail.com>
>>>>>>>>>> het
>>>>>>>>>>>>>> volgende geschreven:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi Max,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Following
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>>>> ,
>>>>>>>>>>>>>> I’m trying to prepare an AIP for supporting
>> multiple-scheduler in
>>>>>>>>>>>> Airflow
>>>>>>>>>>>>>> (mainly for HA and Higher scheduling performance).
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Along the process of code checking, I found that there is one
>>>>>>>>>>>> attribute
>>>>>>>>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current
>>>>>>>>>>>>>> implementation, but it was introduced long time back (2015) to
>>>>>>>>> allow
>>>>>>>>>>>>>> multiple schedulers to work together (
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> ).
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Since you were the original author of it, it would be very
>>>>> helpful
>>>>>>>>>> if
>>>>>>>>>>>>>> you can kindly share why the multiple-schedulers
>> implementation
>>>>> was
>>>>>>>>>>>> removed
>>>>>>>>>>>>>> eventually, and what challenges/complexity there were.
>>>>>>>>>>>>>>> (You already shared a few valuable inputs in the earlier
>>>>>>>>> discussion
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> , mainly relating to hiccups around concurrency, cross DAG
>>>>>>>>>>>> prioritisation &
>>>>>>>>>>>>>> load on DB. Other than these, anything else you would like to
>>>>>>>>>> advise?)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I will also dive into the git history further to understand
>> it
>>>>>>>>>> better.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> XD
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> 
>>>>>> Jarek Potiuk
>>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>>> 
>>>>>> M: +48 660 796 129 <+48660796129>
>>>>>> E: jarek.potiuk@polidea.com
>>>>> 
>>>>> 
>>> 
>> 
>> 


Re: Multiple Schedulers - "scheduler_lock"

Posted by Maxime Beauchemin <ma...@gmail.com>.
The proposal reads "Looking at the original AIP-15 the author proposes to
use locking to enable the use of multiple schedulers, this might introduce
unnecessary complexity"

To me introducing multiple roles (master scheduler + scheduler minions),
may be actually more complex than just having "shared nothing" schedulers
with locking. The former is also less scalable (whatever workload is done
on that master [say serialization] can hit scale issues) and is less HA (as
it relies on the orchestrator [k8s] for HA).

My personal incline has always been going towards renaming the scheduler to
"supervisor" (as it already does significantly more than just triggering
tasks) and allowing many instances of that role, and using locks where
necessary. That way there are just 2 roles in the cluster: supervisor and
worker processes. Depending on the executor (say for k8s) you don't even
need actual persistent worker processes.

Max

On Sun, Mar 17, 2019 at 1:52 AM Peter van t Hof <pj...@gmail.com>
wrote:

> Hi all,
>
> I think that scheduling locking is maybe not the best way in solving this
> issue. Still I’m in support of taking a good look at the scheduler because
> it has some real scaling issues.
>
> I did wrote an alternative proposal to solve the scalability of the
> scheduler:
>
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler
> <
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler
> >
>
> Any input on this is welcome.
>
> Gr,
> Peter
>
>
> > On 3 Mar 2019, at 03:26, Deng Xiaodong <xd...@gmail.com> wrote:
> >
> > Thanks Max.
> >
> > I have documented all the discussions around this topic & useful inputs
> into AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling
> Performance)
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651
> <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>.
>
> >
> > More inputs from folks are welcomed.
> >
> > Thanks.
> >
> >
> > XD
> >
> >> On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin <
> maximebeauchemin@gmail.com> wrote:
> >>
> >> Personally I'd vote against the idea of having certain scheduler
> handling a
> >> subset of the DAGs, that's just not HA.
> >>
> >> Also if you are in an env where you have a small number of large DAGs,
> the
> >> odds of having wasted work and double-firing get pretty high.
> >>
> >> With the lock in place, it's just a matter of the scheduler loop to
> select
> >> (in a db transaction) the dag that's not been processed for the longest
> >> time that is not locked. Flipping the lock flag to true should be part
> of
> >> the db transaction. We probably need a btree index on lock and last
> >> processed time.
> >>
> >> This way adding scheduler processes increases the scheduling pace, and
> >> provides an HA solution. No leader / master / slave or election process,
> >> just equal workers that work together.
> >>
> >> Max
> >>
> >> On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd...@gmail.com>
> wrote:
> >>
> >>> Get your point and agree. And the suggestion you gave lastly to random
> >>> sort DAGs is a great idea to address it. Thanks!
> >>>
> >>> XD
> >>>
> >>>> On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <Ja...@polidea.com>
> >>> wrote:
> >>>>
> >>>> I think that the probability calculation holds only if there is no
> >>>> correlation between different schedulers. I think however there might
> be
> >>> an
> >>>> accidental correlation if you think about typical deployments.
> >>>>
> >>>> Some details why I think accidental correlation is possible and even
> >>>> likely. Assume that:
> >>>>
> >>>> - we have similar and similarly busy machines running schedulers
> >>> (likely)
> >>>> - time is synchronised between the machines (likely)
> >>>> - the machines have the same DAG folders mounted (or copied) and the
> >>>> same filesystem is used (this is exactly what multiple schedulers
> >>>> deployment is all about)
> >>>> - the schedulers start scanning at exactly the same time (crossing
> 0:00
> >>>> second every full five minutes for example)  - this I am not sure but
> I
> >>>> imagine this might be "typical" behaviour.
> >>>> - they process list of DAGs in exactly the same sequence (it looks
> like
> >>>> this is the case dag_processing
> >>>> <
> >>>
> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300
> >>>>
> >>>> and models/__init__
> >>>> <
> >>>
> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567
> >>>> :
> >>>> we use os.walk which uses os.listdir for which sequence of processing
> >>>> depends on the filesystem implementation
> >>>> <
> >>>
> https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic>
> >>>> and
> >>>> then we append files to the list)
> >>>>
> >>>> Then it's rather likely that the schedulers will be competing about
> the
> >>>> very same DAGs at the very beginning. Locking will change how quickly
> >>> they
> >>>> process each DAG of course, but If the DAGs are of similar sizes it's
> >>> also
> >>>> likely that the speed of scanning (DAGS/s) is similar for all
> schedulers.
> >>>> The schedulers will then catch-up with each other and might pretty
> much
> >>>> continuously compete for the same DAGs almost all the time.
> >>>>
> >>>> It can be mitigated super-easily by random sorting of the DAGs folder
> >>> list
> >>>> after it is prepared (it's file-system dependent now so we do not
> rely on
> >>>> particular order) . Then the probability numbers will hold perfectly I
> >>>> think :)
> >>>>
> >>>> J.
> >>>>
> >>>>
> >>>> On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> I’m thinking of which architecture would be ideal.
> >>>>>
> >>>>>
> >>>>> # Option-1:
> >>>>> The master-slave architecture would be one option. But
> leader-selection
> >>>>> will be very essential to consider, otherwise we have issue in terms
> of
> >>> HA
> >>>>> again.
> >>>>>
> >>>>>
> >>>>> # Option-2:
> >>>>> Another option we may consider is to simply start multiple scheduler
> >>>>> instances (just using the current implementation, after modify &
> >>> validate
> >>>>> the scheduler_lock on DagModel).
> >>>>>
> >>>>> - In this case, given we handle everything properly using locking, we
> >>>>> don’t need to worry too much about double-scheduling/triggering.
> >>>>>
> >>>>> - Another potential concern I had earlier is that different
> schedulers
> >>> may
> >>>>> compete with each other and cause “waste” of scheduler resource.
> >>>>> After further thinking, I realise this is a typical Birthday Problem.
> >>>>> Given we have m DAGs, and n schedulers, at any moment, the
> probability
> >>>>> that all schedulers are working on different DAGs is m!/((m-n)! *
> >>> (m^n)),
> >>>>> and the probability that there are schedulers competing on the same
> DAG
> >>>>> will be 1-m!/((m-n)! * (m^n)).
> >>>>>
> >>>>> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment,
> the
> >>>>> probability that there is schedulers competing on the same DAG is
> only
> >>>>> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is
> only
> >>>>> 0.33%.
> >>>>> (This probability will be higher if m/n is low. But users should not
> >>> start
> >>>>> too many schedulers if they don’t have that many DAGs).
> >>>>>
> >>>>> Given the probability of schedulers competing is so low, my concern
> on
> >>>>> scheduler resource waste is not really valid.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Based on these calculations/assessment, I think we can go for
> option-2,
> >>>>> i.e. we don’t make big change in the current implementation.
> Instead, we
> >>>>> ensure the scheduler_lock is working well and test intensively on
> >>> running
> >>>>> multiple schedulers. Then we should be good to let users know that
> it’s
> >>>>> safe to run multiple schedulers.
> >>>>>
> >>>>> Please share your thoughts on this and correct me if I’m wrong in any
> >>>>> point above. Thanks.
> >>>>>
> >>>>>
> >>>>> XD
> >>>>>
> >>>>>
> >>>>> Reference: https://en.wikipedia.org/wiki/Birthday_problem <
> >>>>> https://en.wikipedia.org/wiki/Birthday_problem>
> >>>>>
> >>>>>
> >>>>>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fe...@gmail.com> wrote:
> >>>>>>
> >>>>>> Does the proposal use master-slave architecture(leader scheduler vs
> >>> slave
> >>>>>> scheduler)?
> >>>>>>
> >>>>>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yr...@gmail.com>
> wrote:
> >>>>>>
> >>>>>>> Preventing double-triggering by separating DAG files different
> >>>>> schedulers
> >>>>>>> parse sounds easier and more intuitive. I actually removed one of
> the
> >>>>>>> double-triggering prevention logic here
> >>>>>>> <
> >>>>>>>
> >>>>>
> >>>
> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
> >>>>>>>> (expensive)
> >>>>>>> and
> >>>>>>> was relying on this lock
> >>>>>>> <
> >>>>>>>
> >>>>>
> >>>
> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
> >>>>>>>>
> >>>>>>> to
> >>>>>>> prevent double-firing and safe-guard our non-idempotent tasks( btw
> the
> >>>>>>> insert can be insert overwrite to be idempotent).
> >>>>>>>
> >>>>>>> Also tho in Airbnb we requeue tasks a lot, we haven't see
> >>> double-firing
> >>>>>>> recently.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Kevin Y
> >>>>>>>
> >>>>>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
> >>>>>>> maximebeauchemin@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Forgot to mention: the intention was to use the lock, but I never
> >>>>>>>> personally got to do the second phase which would consist of
> skipping
> >>>>> the
> >>>>>>>> DAG if the lock is on, and expire the lock eventually based on a
> >>> config
> >>>>>>>> setting.
> >>>>>>>>
> >>>>>>>> Max
> >>>>>>>>
> >>>>>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
> >>>>>>>> maximebeauchemin@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> My original intention with the lock was preventing
> >>> "double-triggering"
> >>>>>>> of
> >>>>>>>>> task (triggering refers to the scheduler putting the message in
> the
> >>>>>>>> queue).
> >>>>>>>>> Airflow now has good "double-firing-prevention" of tasks (firing
> >>>>>>> happens
> >>>>>>>>> when the worker receives the message and starts the task), even
> if
> >>> the
> >>>>>>>>> scheduler was to go rogue or restart and send multiple triggers
> for
> >>> a
> >>>>>>>> task
> >>>>>>>>> instance, the worker(s) should only start one task instance.
> That's
> >>>>>>> done
> >>>>>>>> by
> >>>>>>>>> running the database assertions behind the conditions being met
> as
> >>>>> read
> >>>>>>>>> database transaction (no task can alter the rows that validate
> the
> >>>>>>>>> assertion while it's getting asserted). In practice it's a little
> >>>>>>> tricky
> >>>>>>>>> and we've seen rogue double-firing in the past (I have no idea
> how
> >>>>>>> often
> >>>>>>>>> that happens).
> >>>>>>>>>
> >>>>>>>>> If we do want to prevent double-triggerring, we should make sure
> >>> that
> >>>>> 2
> >>>>>>>>> schedulers aren't processing the same DAG or DagRun at the same
> >>> time.
> >>>>>>>> That
> >>>>>>>>> would mean for the scheduler to not start the process of locked
> >>> DAGs,
> >>>>>>> and
> >>>>>>>>> by providing a mechanism to expire the locks after some time.
> >>>>>>>>>
> >>>>>>>>> Has anyone experienced double firing lately? If that exist we
> should
> >>>>>>> fix
> >>>>>>>>> it, but also be careful around multiple scheduler
> double-triggering
> >>> as
> >>>>>>> it
> >>>>>>>>> would make that problem potentially much worse.
> >>>>>>>>>
> >>>>>>>>> Max
> >>>>>>>>>
> >>>>>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <
> xd.deng.r@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> It’s exactly what my team is doing & what I shared here earlier
> >>> last
> >>>>>>>> year
> >>>>>>>>>> (
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>>>>>>> <
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>>>>>>
> >>>>>>>>>> )
> >>>>>>>>>>
> >>>>>>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and
> now
> >>>>> I’m
> >>>>>>>>>> thinking how we can have it more proper & robust.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> XD
> >>>>>>>>>>
> >>>>>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <
> >>> mario.urquizo@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> We have been running multiple schedulers for about 3 months.
> We
> >>>>>>>> created
> >>>>>>>>>>> multiple services to run airflow schedulers.  The only
> difference
> >>> is
> >>>>>>>>>> that
> >>>>>>>>>>> we have each of the schedulers pointed to a directory one level
> >>>>>>> deeper
> >>>>>>>>>> than
> >>>>>>>>>>> the DAG home directory that the workers and webapp use. We have
> >>> seen
> >>>>>>>>>> much
> >>>>>>>>>>> better scheduling performance but this does not yet help with
> HA.
> >>>>>>>>>>>
> >>>>>>>>>>> DAGS_HOME:
> >>>>>>>>>>> {airflow_home}/dags  (webapp & workers)
> >>>>>>>>>>> {airflow_home}/dags/group-a/ (scheduler1)
> >>>>>>>>>>> {airflow_home}/dags/group-b/ (scheduler2)
> >>>>>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3)
> >>>>>>>>>>>
> >>>>>>>>>>> Not sure if this helps, just sharing in case it does.
> >>>>>>>>>>>
> >>>>>>>>>>> Thank you,
> >>>>>>>>>>> Mario
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <
> bdbruin@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> I have done quite some work on making it possible to run
> multiple
> >>>>>>>>>>>> schedulers at the same time.  At the moment I don’t think
> there
> >>> are
> >>>>>>>>>> real
> >>>>>>>>>>>> blockers actually to do so. We just don’t actively test it.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Database locking is mostly in place (DagRuns and
> TaskInstances).
> >>>>>>> And
> >>>>>>>> I
> >>>>>>>>>>>> think the worst that can happen is that a task is scheduled
> >>> twice.
> >>>>>>>> The
> >>>>>>>>>> task
> >>>>>>>>>>>> will detect this most of the time and kill one off if
> concurrent
> >>> if
> >>>>>>>> not
> >>>>>>>>>>>> sequential then I will run again in some occasions. Everyone
> is
> >>>>>>>> having
> >>>>>>>>>>>> idempotent tasks right so no harm done? ;-)
> >>>>>>>>>>>>
> >>>>>>>>>>>> Have you encountered issues? Maybe work those out?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers
> >>>>>>>>>>>> Bolke.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Verstuurd vanaf mijn iPad
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <
> >>> xd.deng.r@gmail.com>
> >>>>>>>> het
> >>>>>>>>>>>> volgende geschreven:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Max,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Following
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>>>>>>>>> <
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>>>>>>>> ,
> >>>>>>>>>>>> I’m trying to prepare an AIP for supporting
> multiple-scheduler in
> >>>>>>>>>> Airflow
> >>>>>>>>>>>> (mainly for HA and Higher scheduling performance).
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Along the process of code checking, I found that there is one
> >>>>>>>>>> attribute
> >>>>>>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current
> >>>>>>>>>>>> implementation, but it was introduced long time back (2015) to
> >>>>>>> allow
> >>>>>>>>>>>> multiple schedulers to work together (
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> >>>>>>>>>>>> <
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> >>>>>>>>>>>
> >>>>>>>>>>>> ).
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Since you were the original author of it, it would be very
> >>> helpful
> >>>>>>>> if
> >>>>>>>>>>>> you can kindly share why the multiple-schedulers
> implementation
> >>> was
> >>>>>>>>>> removed
> >>>>>>>>>>>> eventually, and what challenges/complexity there were.
> >>>>>>>>>>>>> (You already shared a few valuable inputs in the earlier
> >>>>>>> discussion
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> >>>>>>>>>>>> <
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> >>>>>>>>>>>
> >>>>>>>>>>>> , mainly relating to hiccups around concurrency, cross DAG
> >>>>>>>>>> prioritisation &
> >>>>>>>>>>>> load on DB. Other than these, anything else you would like to
> >>>>>>>> advise?)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I will also dive into the git history further to understand
> it
> >>>>>>>> better.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> XD
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>> --
> >>>>
> >>>> Jarek Potiuk
> >>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
> >>>>
> >>>> M: +48 660 796 129 <+48660796129>
> >>>> E: jarek.potiuk@polidea.com
> >>>
> >>>
> >
>
>

Re: Multiple Schedulers - "scheduler_lock"

Posted by Peter van t Hof <pj...@gmail.com>.
Hi all,

I think that scheduling locking is maybe not the best way in solving this issue. Still I’m in support of taking a good look at the scheduler because it has some real scaling issues.

I did wrote an alternative proposal to solve the scalability of the scheduler:
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler>

Any input on this is welcome.

Gr,
Peter


> On 3 Mar 2019, at 03:26, Deng Xiaodong <xd...@gmail.com> wrote:
> 
> Thanks Max.
> 
> I have documented all the discussions around this topic & useful inputs into AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling Performance) https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651 <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>. 
> 
> More inputs from folks are welcomed.
> 
> Thanks.
> 
> 
> XD
> 
>> On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin <ma...@gmail.com> wrote:
>> 
>> Personally I'd vote against the idea of having certain scheduler handling a
>> subset of the DAGs, that's just not HA.
>> 
>> Also if you are in an env where you have a small number of large DAGs, the
>> odds of having wasted work and double-firing get pretty high.
>> 
>> With the lock in place, it's just a matter of the scheduler loop to select
>> (in a db transaction) the dag that's not been processed for the longest
>> time that is not locked. Flipping the lock flag to true should be part of
>> the db transaction. We probably need a btree index on lock and last
>> processed time.
>> 
>> This way adding scheduler processes increases the scheduling pace, and
>> provides an HA solution. No leader / master / slave or election process,
>> just equal workers that work together.
>> 
>> Max
>> 
>> On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd...@gmail.com> wrote:
>> 
>>> Get your point and agree. And the suggestion you gave lastly to random
>>> sort DAGs is a great idea to address it. Thanks!
>>> 
>>> XD
>>> 
>>>> On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <Ja...@polidea.com>
>>> wrote:
>>>> 
>>>> I think that the probability calculation holds only if there is no
>>>> correlation between different schedulers. I think however there might be
>>> an
>>>> accidental correlation if you think about typical deployments.
>>>> 
>>>> Some details why I think accidental correlation is possible and even
>>>> likely. Assume that:
>>>> 
>>>> - we have similar and similarly busy machines running schedulers
>>> (likely)
>>>> - time is synchronised between the machines (likely)
>>>> - the machines have the same DAG folders mounted (or copied) and the
>>>> same filesystem is used (this is exactly what multiple schedulers
>>>> deployment is all about)
>>>> - the schedulers start scanning at exactly the same time (crossing 0:00
>>>> second every full five minutes for example)  - this I am not sure but I
>>>> imagine this might be "typical" behaviour.
>>>> - they process list of DAGs in exactly the same sequence (it looks like
>>>> this is the case dag_processing
>>>> <
>>> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300
>>>> 
>>>> and models/__init__
>>>> <
>>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567
>>>> :
>>>> we use os.walk which uses os.listdir for which sequence of processing
>>>> depends on the filesystem implementation
>>>> <
>>> https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic>
>>>> and
>>>> then we append files to the list)
>>>> 
>>>> Then it's rather likely that the schedulers will be competing about the
>>>> very same DAGs at the very beginning. Locking will change how quickly
>>> they
>>>> process each DAG of course, but If the DAGs are of similar sizes it's
>>> also
>>>> likely that the speed of scanning (DAGS/s) is similar for all schedulers.
>>>> The schedulers will then catch-up with each other and might pretty much
>>>> continuously compete for the same DAGs almost all the time.
>>>> 
>>>> It can be mitigated super-easily by random sorting of the DAGs folder
>>> list
>>>> after it is prepared (it's file-system dependent now so we do not rely on
>>>> particular order) . Then the probability numbers will hold perfectly I
>>>> think :)
>>>> 
>>>> J.
>>>> 
>>>> 
>>>> On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd...@gmail.com>
>>> wrote:
>>>> 
>>>>> I’m thinking of which architecture would be ideal.
>>>>> 
>>>>> 
>>>>> # Option-1:
>>>>> The master-slave architecture would be one option. But leader-selection
>>>>> will be very essential to consider, otherwise we have issue in terms of
>>> HA
>>>>> again.
>>>>> 
>>>>> 
>>>>> # Option-2:
>>>>> Another option we may consider is to simply start multiple scheduler
>>>>> instances (just using the current implementation, after modify &
>>> validate
>>>>> the scheduler_lock on DagModel).
>>>>> 
>>>>> - In this case, given we handle everything properly using locking, we
>>>>> don’t need to worry too much about double-scheduling/triggering.
>>>>> 
>>>>> - Another potential concern I had earlier is that different schedulers
>>> may
>>>>> compete with each other and cause “waste” of scheduler resource.
>>>>> After further thinking, I realise this is a typical Birthday Problem.
>>>>> Given we have m DAGs, and n schedulers, at any moment, the probability
>>>>> that all schedulers are working on different DAGs is m!/((m-n)! *
>>> (m^n)),
>>>>> and the probability that there are schedulers competing on the same DAG
>>>>> will be 1-m!/((m-n)! * (m^n)).
>>>>> 
>>>>> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, the
>>>>> probability that there is schedulers competing on the same DAG is only
>>>>> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is only
>>>>> 0.33%.
>>>>> (This probability will be higher if m/n is low. But users should not
>>> start
>>>>> too many schedulers if they don’t have that many DAGs).
>>>>> 
>>>>> Given the probability of schedulers competing is so low, my concern on
>>>>> scheduler resource waste is not really valid.
>>>>> 
>>>>> 
>>>>> 
>>>>> Based on these calculations/assessment, I think we can go for option-2,
>>>>> i.e. we don’t make big change in the current implementation. Instead, we
>>>>> ensure the scheduler_lock is working well and test intensively on
>>> running
>>>>> multiple schedulers. Then we should be good to let users know that it’s
>>>>> safe to run multiple schedulers.
>>>>> 
>>>>> Please share your thoughts on this and correct me if I’m wrong in any
>>>>> point above. Thanks.
>>>>> 
>>>>> 
>>>>> XD
>>>>> 
>>>>> 
>>>>> Reference: https://en.wikipedia.org/wiki/Birthday_problem <
>>>>> https://en.wikipedia.org/wiki/Birthday_problem>
>>>>> 
>>>>> 
>>>>>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fe...@gmail.com> wrote:
>>>>>> 
>>>>>> Does the proposal use master-slave architecture(leader scheduler vs
>>> slave
>>>>>> scheduler)?
>>>>>> 
>>>>>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yr...@gmail.com> wrote:
>>>>>> 
>>>>>>> Preventing double-triggering by separating DAG files different
>>>>> schedulers
>>>>>>> parse sounds easier and more intuitive. I actually removed one of the
>>>>>>> double-triggering prevention logic here
>>>>>>> <
>>>>>>> 
>>>>> 
>>> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
>>>>>>>> (expensive)
>>>>>>> and
>>>>>>> was relying on this lock
>>>>>>> <
>>>>>>> 
>>>>> 
>>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
>>>>>>>> 
>>>>>>> to
>>>>>>> prevent double-firing and safe-guard our non-idempotent tasks( btw the
>>>>>>> insert can be insert overwrite to be idempotent).
>>>>>>> 
>>>>>>> Also tho in Airbnb we requeue tasks a lot, we haven't see
>>> double-firing
>>>>>>> recently.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Kevin Y
>>>>>>> 
>>>>>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
>>>>>>> maximebeauchemin@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Forgot to mention: the intention was to use the lock, but I never
>>>>>>>> personally got to do the second phase which would consist of skipping
>>>>> the
>>>>>>>> DAG if the lock is on, and expire the lock eventually based on a
>>> config
>>>>>>>> setting.
>>>>>>>> 
>>>>>>>> Max
>>>>>>>> 
>>>>>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
>>>>>>>> maximebeauchemin@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> My original intention with the lock was preventing
>>> "double-triggering"
>>>>>>> of
>>>>>>>>> task (triggering refers to the scheduler putting the message in the
>>>>>>>> queue).
>>>>>>>>> Airflow now has good "double-firing-prevention" of tasks (firing
>>>>>>> happens
>>>>>>>>> when the worker receives the message and starts the task), even if
>>> the
>>>>>>>>> scheduler was to go rogue or restart and send multiple triggers for
>>> a
>>>>>>>> task
>>>>>>>>> instance, the worker(s) should only start one task instance. That's
>>>>>>> done
>>>>>>>> by
>>>>>>>>> running the database assertions behind the conditions being met as
>>>>> read
>>>>>>>>> database transaction (no task can alter the rows that validate the
>>>>>>>>> assertion while it's getting asserted). In practice it's a little
>>>>>>> tricky
>>>>>>>>> and we've seen rogue double-firing in the past (I have no idea how
>>>>>>> often
>>>>>>>>> that happens).
>>>>>>>>> 
>>>>>>>>> If we do want to prevent double-triggerring, we should make sure
>>> that
>>>>> 2
>>>>>>>>> schedulers aren't processing the same DAG or DagRun at the same
>>> time.
>>>>>>>> That
>>>>>>>>> would mean for the scheduler to not start the process of locked
>>> DAGs,
>>>>>>> and
>>>>>>>>> by providing a mechanism to expire the locks after some time.
>>>>>>>>> 
>>>>>>>>> Has anyone experienced double firing lately? If that exist we should
>>>>>>> fix
>>>>>>>>> it, but also be careful around multiple scheduler double-triggering
>>> as
>>>>>>> it
>>>>>>>>> would make that problem potentially much worse.
>>>>>>>>> 
>>>>>>>>> Max
>>>>>>>>> 
>>>>>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> It’s exactly what my team is doing & what I shared here earlier
>>> last
>>>>>>>> year
>>>>>>>>>> (
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>> <
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>> 
>>>>>>>>>> )
>>>>>>>>>> 
>>>>>>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and now
>>>>> I’m
>>>>>>>>>> thinking how we can have it more proper & robust.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> XD
>>>>>>>>>> 
>>>>>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <
>>> mario.urquizo@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> We have been running multiple schedulers for about 3 months.  We
>>>>>>>> created
>>>>>>>>>>> multiple services to run airflow schedulers.  The only difference
>>> is
>>>>>>>>>> that
>>>>>>>>>>> we have each of the schedulers pointed to a directory one level
>>>>>>> deeper
>>>>>>>>>> than
>>>>>>>>>>> the DAG home directory that the workers and webapp use. We have
>>> seen
>>>>>>>>>> much
>>>>>>>>>>> better scheduling performance but this does not yet help with HA.
>>>>>>>>>>> 
>>>>>>>>>>> DAGS_HOME:
>>>>>>>>>>> {airflow_home}/dags  (webapp & workers)
>>>>>>>>>>> {airflow_home}/dags/group-a/ (scheduler1)
>>>>>>>>>>> {airflow_home}/dags/group-b/ (scheduler2)
>>>>>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3)
>>>>>>>>>>> 
>>>>>>>>>>> Not sure if this helps, just sharing in case it does.
>>>>>>>>>>> 
>>>>>>>>>>> Thank you,
>>>>>>>>>>> Mario
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bd...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> I have done quite some work on making it possible to run multiple
>>>>>>>>>>>> schedulers at the same time.  At the moment I don’t think there
>>> are
>>>>>>>>>> real
>>>>>>>>>>>> blockers actually to do so. We just don’t actively test it.
>>>>>>>>>>>> 
>>>>>>>>>>>> Database locking is mostly in place (DagRuns and TaskInstances).
>>>>>>> And
>>>>>>>> I
>>>>>>>>>>>> think the worst that can happen is that a task is scheduled
>>> twice.
>>>>>>>> The
>>>>>>>>>> task
>>>>>>>>>>>> will detect this most of the time and kill one off if concurrent
>>> if
>>>>>>>> not
>>>>>>>>>>>> sequential then I will run again in some occasions. Everyone is
>>>>>>>> having
>>>>>>>>>>>> idempotent tasks right so no harm done? ;-)
>>>>>>>>>>>> 
>>>>>>>>>>>> Have you encountered issues? Maybe work those out?
>>>>>>>>>>>> 
>>>>>>>>>>>> Cheers
>>>>>>>>>>>> Bolke.
>>>>>>>>>>>> 
>>>>>>>>>>>> Verstuurd vanaf mijn iPad
>>>>>>>>>>>> 
>>>>>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <
>>> xd.deng.r@gmail.com>
>>>>>>>> het
>>>>>>>>>>>> volgende geschreven:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Max,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Following
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>>> <
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>> ,
>>>>>>>>>>>> I’m trying to prepare an AIP for supporting multiple-scheduler in
>>>>>>>>>> Airflow
>>>>>>>>>>>> (mainly for HA and Higher scheduling performance).
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Along the process of code checking, I found that there is one
>>>>>>>>>> attribute
>>>>>>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current
>>>>>>>>>>>> implementation, but it was introduced long time back (2015) to
>>>>>>> allow
>>>>>>>>>>>> multiple schedulers to work together (
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>>>>>>> <
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>>>>>> 
>>>>>>>>>>>> ).
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Since you were the original author of it, it would be very
>>> helpful
>>>>>>>> if
>>>>>>>>>>>> you can kindly share why the multiple-schedulers implementation
>>> was
>>>>>>>>>> removed
>>>>>>>>>>>> eventually, and what challenges/complexity there were.
>>>>>>>>>>>>> (You already shared a few valuable inputs in the earlier
>>>>>>> discussion
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>>> <
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>> 
>>>>>>>>>>>> , mainly relating to hiccups around concurrency, cross DAG
>>>>>>>>>> prioritisation &
>>>>>>>>>>>> load on DB. Other than these, anything else you would like to
>>>>>>>> advise?)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I will also dive into the git history further to understand it
>>>>>>>> better.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> XD
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> --
>>>> 
>>>> Jarek Potiuk
>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>> 
>>>> M: +48 660 796 129 <+48660796129>
>>>> E: jarek.potiuk@polidea.com
>>> 
>>> 
> 


Re: Multiple Schedulers - "scheduler_lock"

Posted by Deng Xiaodong <xd...@gmail.com>.
Thanks Max.

I have documented all the discussions around this topic & useful inputs into AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling Performance) https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651 <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>. 

More inputs from folks are welcomed.

Thanks.


XD

> On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin <ma...@gmail.com> wrote:
> 
> Personally I'd vote against the idea of having certain scheduler handling a
> subset of the DAGs, that's just not HA.
> 
> Also if you are in an env where you have a small number of large DAGs, the
> odds of having wasted work and double-firing get pretty high.
> 
> With the lock in place, it's just a matter of the scheduler loop to select
> (in a db transaction) the dag that's not been processed for the longest
> time that is not locked. Flipping the lock flag to true should be part of
> the db transaction. We probably need a btree index on lock and last
> processed time.
> 
> This way adding scheduler processes increases the scheduling pace, and
> provides an HA solution. No leader / master / slave or election process,
> just equal workers that work together.
> 
> Max
> 
> On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd...@gmail.com> wrote:
> 
>> Get your point and agree. And the suggestion you gave lastly to random
>> sort DAGs is a great idea to address it. Thanks!
>> 
>> XD
>> 
>>> On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <Ja...@polidea.com>
>> wrote:
>>> 
>>> I think that the probability calculation holds only if there is no
>>> correlation between different schedulers. I think however there might be
>> an
>>> accidental correlation if you think about typical deployments.
>>> 
>>> Some details why I think accidental correlation is possible and even
>>> likely. Assume that:
>>> 
>>>  - we have similar and similarly busy machines running schedulers
>> (likely)
>>>  - time is synchronised between the machines (likely)
>>>  - the machines have the same DAG folders mounted (or copied) and the
>>>  same filesystem is used (this is exactly what multiple schedulers
>>>  deployment is all about)
>>>  - the schedulers start scanning at exactly the same time (crossing 0:00
>>>  second every full five minutes for example)  - this I am not sure but I
>>>  imagine this might be "typical" behaviour.
>>>  - they process list of DAGs in exactly the same sequence (it looks like
>>>  this is the case dag_processing
>>>  <
>> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300
>>> 
>>>  and models/__init__
>>>  <
>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567
>>> :
>>>  we use os.walk which uses os.listdir for which sequence of processing
>>>  depends on the filesystem implementation
>>>  <
>> https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic>
>>> and
>>>  then we append files to the list)
>>> 
>>> Then it's rather likely that the schedulers will be competing about the
>>> very same DAGs at the very beginning. Locking will change how quickly
>> they
>>> process each DAG of course, but If the DAGs are of similar sizes it's
>> also
>>> likely that the speed of scanning (DAGS/s) is similar for all schedulers.
>>> The schedulers will then catch-up with each other and might pretty much
>>> continuously compete for the same DAGs almost all the time.
>>> 
>>> It can be mitigated super-easily by random sorting of the DAGs folder
>> list
>>> after it is prepared (it's file-system dependent now so we do not rely on
>>> particular order) . Then the probability numbers will hold perfectly I
>>> think :)
>>> 
>>> J.
>>> 
>>> 
>>> On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd...@gmail.com>
>> wrote:
>>> 
>>>> I’m thinking of which architecture would be ideal.
>>>> 
>>>> 
>>>> # Option-1:
>>>> The master-slave architecture would be one option. But leader-selection
>>>> will be very essential to consider, otherwise we have issue in terms of
>> HA
>>>> again.
>>>> 
>>>> 
>>>> # Option-2:
>>>> Another option we may consider is to simply start multiple scheduler
>>>> instances (just using the current implementation, after modify &
>> validate
>>>> the scheduler_lock on DagModel).
>>>> 
>>>> - In this case, given we handle everything properly using locking, we
>>>> don’t need to worry too much about double-scheduling/triggering.
>>>> 
>>>> - Another potential concern I had earlier is that different schedulers
>> may
>>>> compete with each other and cause “waste” of scheduler resource.
>>>> After further thinking, I realise this is a typical Birthday Problem.
>>>> Given we have m DAGs, and n schedulers, at any moment, the probability
>>>> that all schedulers are working on different DAGs is m!/((m-n)! *
>> (m^n)),
>>>> and the probability that there are schedulers competing on the same DAG
>>>> will be 1-m!/((m-n)! * (m^n)).
>>>> 
>>>> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, the
>>>> probability that there is schedulers competing on the same DAG is only
>>>> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is only
>>>> 0.33%.
>>>> (This probability will be higher if m/n is low. But users should not
>> start
>>>> too many schedulers if they don’t have that many DAGs).
>>>> 
>>>> Given the probability of schedulers competing is so low, my concern on
>>>> scheduler resource waste is not really valid.
>>>> 
>>>> 
>>>> 
>>>> Based on these calculations/assessment, I think we can go for option-2,
>>>> i.e. we don’t make big change in the current implementation. Instead, we
>>>> ensure the scheduler_lock is working well and test intensively on
>> running
>>>> multiple schedulers. Then we should be good to let users know that it’s
>>>> safe to run multiple schedulers.
>>>> 
>>>> Please share your thoughts on this and correct me if I’m wrong in any
>>>> point above. Thanks.
>>>> 
>>>> 
>>>> XD
>>>> 
>>>> 
>>>> Reference: https://en.wikipedia.org/wiki/Birthday_problem <
>>>> https://en.wikipedia.org/wiki/Birthday_problem>
>>>> 
>>>> 
>>>>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fe...@gmail.com> wrote:
>>>>> 
>>>>> Does the proposal use master-slave architecture(leader scheduler vs
>> slave
>>>>> scheduler)?
>>>>> 
>>>>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yr...@gmail.com> wrote:
>>>>> 
>>>>>> Preventing double-triggering by separating DAG files different
>>>> schedulers
>>>>>> parse sounds easier and more intuitive. I actually removed one of the
>>>>>> double-triggering prevention logic here
>>>>>> <
>>>>>> 
>>>> 
>> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
>>>>>>> (expensive)
>>>>>> and
>>>>>> was relying on this lock
>>>>>> <
>>>>>> 
>>>> 
>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
>>>>>>> 
>>>>>> to
>>>>>> prevent double-firing and safe-guard our non-idempotent tasks( btw the
>>>>>> insert can be insert overwrite to be idempotent).
>>>>>> 
>>>>>> Also tho in Airbnb we requeue tasks a lot, we haven't see
>> double-firing
>>>>>> recently.
>>>>>> 
>>>>>> Cheers,
>>>>>> Kevin Y
>>>>>> 
>>>>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
>>>>>> maximebeauchemin@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Forgot to mention: the intention was to use the lock, but I never
>>>>>>> personally got to do the second phase which would consist of skipping
>>>> the
>>>>>>> DAG if the lock is on, and expire the lock eventually based on a
>> config
>>>>>>> setting.
>>>>>>> 
>>>>>>> Max
>>>>>>> 
>>>>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
>>>>>>> maximebeauchemin@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> My original intention with the lock was preventing
>> "double-triggering"
>>>>>> of
>>>>>>>> task (triggering refers to the scheduler putting the message in the
>>>>>>> queue).
>>>>>>>> Airflow now has good "double-firing-prevention" of tasks (firing
>>>>>> happens
>>>>>>>> when the worker receives the message and starts the task), even if
>> the
>>>>>>>> scheduler was to go rogue or restart and send multiple triggers for
>> a
>>>>>>> task
>>>>>>>> instance, the worker(s) should only start one task instance. That's
>>>>>> done
>>>>>>> by
>>>>>>>> running the database assertions behind the conditions being met as
>>>> read
>>>>>>>> database transaction (no task can alter the rows that validate the
>>>>>>>> assertion while it's getting asserted). In practice it's a little
>>>>>> tricky
>>>>>>>> and we've seen rogue double-firing in the past (I have no idea how
>>>>>> often
>>>>>>>> that happens).
>>>>>>>> 
>>>>>>>> If we do want to prevent double-triggerring, we should make sure
>> that
>>>> 2
>>>>>>>> schedulers aren't processing the same DAG or DagRun at the same
>> time.
>>>>>>> That
>>>>>>>> would mean for the scheduler to not start the process of locked
>> DAGs,
>>>>>> and
>>>>>>>> by providing a mechanism to expire the locks after some time.
>>>>>>>> 
>>>>>>>> Has anyone experienced double firing lately? If that exist we should
>>>>>> fix
>>>>>>>> it, but also be careful around multiple scheduler double-triggering
>> as
>>>>>> it
>>>>>>>> would make that problem potentially much worse.
>>>>>>>> 
>>>>>>>> Max
>>>>>>>> 
>>>>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd...@gmail.com>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> It’s exactly what my team is doing & what I shared here earlier
>> last
>>>>>>> year
>>>>>>>>> (
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>> <
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>> 
>>>>>>>>> )
>>>>>>>>> 
>>>>>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and now
>>>> I’m
>>>>>>>>> thinking how we can have it more proper & robust.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> XD
>>>>>>>>> 
>>>>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <
>> mario.urquizo@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> We have been running multiple schedulers for about 3 months.  We
>>>>>>> created
>>>>>>>>>> multiple services to run airflow schedulers.  The only difference
>> is
>>>>>>>>> that
>>>>>>>>>> we have each of the schedulers pointed to a directory one level
>>>>>> deeper
>>>>>>>>> than
>>>>>>>>>> the DAG home directory that the workers and webapp use. We have
>> seen
>>>>>>>>> much
>>>>>>>>>> better scheduling performance but this does not yet help with HA.
>>>>>>>>>> 
>>>>>>>>>> DAGS_HOME:
>>>>>>>>>> {airflow_home}/dags  (webapp & workers)
>>>>>>>>>> {airflow_home}/dags/group-a/ (scheduler1)
>>>>>>>>>> {airflow_home}/dags/group-b/ (scheduler2)
>>>>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3)
>>>>>>>>>> 
>>>>>>>>>> Not sure if this helps, just sharing in case it does.
>>>>>>>>>> 
>>>>>>>>>> Thank you,
>>>>>>>>>> Mario
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bd...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> I have done quite some work on making it possible to run multiple
>>>>>>>>>>> schedulers at the same time.  At the moment I don’t think there
>> are
>>>>>>>>> real
>>>>>>>>>>> blockers actually to do so. We just don’t actively test it.
>>>>>>>>>>> 
>>>>>>>>>>> Database locking is mostly in place (DagRuns and TaskInstances).
>>>>>> And
>>>>>>> I
>>>>>>>>>>> think the worst that can happen is that a task is scheduled
>> twice.
>>>>>>> The
>>>>>>>>> task
>>>>>>>>>>> will detect this most of the time and kill one off if concurrent
>> if
>>>>>>> not
>>>>>>>>>>> sequential then I will run again in some occasions. Everyone is
>>>>>>> having
>>>>>>>>>>> idempotent tasks right so no harm done? ;-)
>>>>>>>>>>> 
>>>>>>>>>>> Have you encountered issues? Maybe work those out?
>>>>>>>>>>> 
>>>>>>>>>>> Cheers
>>>>>>>>>>> Bolke.
>>>>>>>>>>> 
>>>>>>>>>>> Verstuurd vanaf mijn iPad
>>>>>>>>>>> 
>>>>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <
>> xd.deng.r@gmail.com>
>>>>>>> het
>>>>>>>>>>> volgende geschreven:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi Max,
>>>>>>>>>>>> 
>>>>>>>>>>>> Following
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>> <
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>> ,
>>>>>>>>>>> I’m trying to prepare an AIP for supporting multiple-scheduler in
>>>>>>>>> Airflow
>>>>>>>>>>> (mainly for HA and Higher scheduling performance).
>>>>>>>>>>>> 
>>>>>>>>>>>> Along the process of code checking, I found that there is one
>>>>>>>>> attribute
>>>>>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current
>>>>>>>>>>> implementation, but it was introduced long time back (2015) to
>>>>>> allow
>>>>>>>>>>> multiple schedulers to work together (
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>>>>>> <
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>>>>> 
>>>>>>>>>>> ).
>>>>>>>>>>>> 
>>>>>>>>>>>> Since you were the original author of it, it would be very
>> helpful
>>>>>>> if
>>>>>>>>>>> you can kindly share why the multiple-schedulers implementation
>> was
>>>>>>>>> removed
>>>>>>>>>>> eventually, and what challenges/complexity there were.
>>>>>>>>>>>> (You already shared a few valuable inputs in the earlier
>>>>>> discussion
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>> <
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>>>>> 
>>>>>>>>>>> , mainly relating to hiccups around concurrency, cross DAG
>>>>>>>>> prioritisation &
>>>>>>>>>>> load on DB. Other than these, anything else you would like to
>>>>>>> advise?)
>>>>>>>>>>>> 
>>>>>>>>>>>> I will also dive into the git history further to understand it
>>>>>>> better.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> XD
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>>>> 
>>> 
>>> --
>>> 
>>> Jarek Potiuk
>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>> 
>>> M: +48 660 796 129 <+48660796129>
>>> E: jarek.potiuk@polidea.com
>> 
>> 


Re: Multiple Schedulers - "scheduler_lock"

Posted by Maxime Beauchemin <ma...@gmail.com>.
Personally I'd vote against the idea of having certain scheduler handling a
subset of the DAGs, that's just not HA.

Also if you are in an env where you have a small number of large DAGs, the
odds of having wasted work and double-firing get pretty high.

With the lock in place, it's just a matter of the scheduler loop to select
(in a db transaction) the dag that's not been processed for the longest
time that is not locked. Flipping the lock flag to true should be part of
the db transaction. We probably need a btree index on lock and last
processed time.

This way adding scheduler processes increases the scheduling pace, and
provides an HA solution. No leader / master / slave or election process,
just equal workers that work together.

Max

On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd...@gmail.com> wrote:

> Get your point and agree. And the suggestion you gave lastly to random
> sort DAGs is a great idea to address it. Thanks!
>
> XD
>
> > On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <Ja...@polidea.com>
> wrote:
> >
> > I think that the probability calculation holds only if there is no
> > correlation between different schedulers. I think however there might be
> an
> > accidental correlation if you think about typical deployments.
> >
> > Some details why I think accidental correlation is possible and even
> > likely. Assume that:
> >
> >   - we have similar and similarly busy machines running schedulers
> (likely)
> >   - time is synchronised between the machines (likely)
> >   - the machines have the same DAG folders mounted (or copied) and the
> >   same filesystem is used (this is exactly what multiple schedulers
> >   deployment is all about)
> >   - the schedulers start scanning at exactly the same time (crossing 0:00
> >   second every full five minutes for example)  - this I am not sure but I
> >   imagine this might be "typical" behaviour.
> >   - they process list of DAGs in exactly the same sequence (it looks like
> >   this is the case dag_processing
> >   <
> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300
> >
> >   and models/__init__
> >   <
> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567
> >:
> >   we use os.walk which uses os.listdir for which sequence of processing
> >   depends on the filesystem implementation
> >   <
> https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic>
> > and
> >   then we append files to the list)
> >
> > Then it's rather likely that the schedulers will be competing about the
> > very same DAGs at the very beginning. Locking will change how quickly
> they
> > process each DAG of course, but If the DAGs are of similar sizes it's
> also
> > likely that the speed of scanning (DAGS/s) is similar for all schedulers.
> > The schedulers will then catch-up with each other and might pretty much
> > continuously compete for the same DAGs almost all the time.
> >
> > It can be mitigated super-easily by random sorting of the DAGs folder
> list
> > after it is prepared (it's file-system dependent now so we do not rely on
> > particular order) . Then the probability numbers will hold perfectly I
> > think :)
> >
> > J.
> >
> >
> > On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd...@gmail.com>
> wrote:
> >
> >> I’m thinking of which architecture would be ideal.
> >>
> >>
> >> # Option-1:
> >> The master-slave architecture would be one option. But leader-selection
> >> will be very essential to consider, otherwise we have issue in terms of
> HA
> >> again.
> >>
> >>
> >> # Option-2:
> >> Another option we may consider is to simply start multiple scheduler
> >> instances (just using the current implementation, after modify &
> validate
> >> the scheduler_lock on DagModel).
> >>
> >> - In this case, given we handle everything properly using locking, we
> >> don’t need to worry too much about double-scheduling/triggering.
> >>
> >> - Another potential concern I had earlier is that different schedulers
> may
> >> compete with each other and cause “waste” of scheduler resource.
> >> After further thinking, I realise this is a typical Birthday Problem.
> >> Given we have m DAGs, and n schedulers, at any moment, the probability
> >> that all schedulers are working on different DAGs is m!/((m-n)! *
> (m^n)),
> >> and the probability that there are schedulers competing on the same DAG
> >> will be 1-m!/((m-n)! * (m^n)).
> >>
> >> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, the
> >> probability that there is schedulers competing on the same DAG is only
> >> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is only
> >> 0.33%.
> >> (This probability will be higher if m/n is low. But users should not
> start
> >> too many schedulers if they don’t have that many DAGs).
> >>
> >> Given the probability of schedulers competing is so low, my concern on
> >> scheduler resource waste is not really valid.
> >>
> >>
> >>
> >> Based on these calculations/assessment, I think we can go for option-2,
> >> i.e. we don’t make big change in the current implementation. Instead, we
> >> ensure the scheduler_lock is working well and test intensively on
> running
> >> multiple schedulers. Then we should be good to let users know that it’s
> >> safe to run multiple schedulers.
> >>
> >> Please share your thoughts on this and correct me if I’m wrong in any
> >> point above. Thanks.
> >>
> >>
> >> XD
> >>
> >>
> >> Reference: https://en.wikipedia.org/wiki/Birthday_problem <
> >> https://en.wikipedia.org/wiki/Birthday_problem>
> >>
> >>
> >>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fe...@gmail.com> wrote:
> >>>
> >>> Does the proposal use master-slave architecture(leader scheduler vs
> slave
> >>> scheduler)?
> >>>
> >>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yr...@gmail.com> wrote:
> >>>
> >>>> Preventing double-triggering by separating DAG files different
> >> schedulers
> >>>> parse sounds easier and more intuitive. I actually removed one of the
> >>>> double-triggering prevention logic here
> >>>> <
> >>>>
> >>
> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
> >>>>> (expensive)
> >>>> and
> >>>> was relying on this lock
> >>>> <
> >>>>
> >>
> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
> >>>>>
> >>>> to
> >>>> prevent double-firing and safe-guard our non-idempotent tasks( btw the
> >>>> insert can be insert overwrite to be idempotent).
> >>>>
> >>>> Also tho in Airbnb we requeue tasks a lot, we haven't see
> double-firing
> >>>> recently.
> >>>>
> >>>> Cheers,
> >>>> Kevin Y
> >>>>
> >>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
> >>>> maximebeauchemin@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Forgot to mention: the intention was to use the lock, but I never
> >>>>> personally got to do the second phase which would consist of skipping
> >> the
> >>>>> DAG if the lock is on, and expire the lock eventually based on a
> config
> >>>>> setting.
> >>>>>
> >>>>> Max
> >>>>>
> >>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
> >>>>> maximebeauchemin@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> My original intention with the lock was preventing
> "double-triggering"
> >>>> of
> >>>>>> task (triggering refers to the scheduler putting the message in the
> >>>>> queue).
> >>>>>> Airflow now has good "double-firing-prevention" of tasks (firing
> >>>> happens
> >>>>>> when the worker receives the message and starts the task), even if
> the
> >>>>>> scheduler was to go rogue or restart and send multiple triggers for
> a
> >>>>> task
> >>>>>> instance, the worker(s) should only start one task instance. That's
> >>>> done
> >>>>> by
> >>>>>> running the database assertions behind the conditions being met as
> >> read
> >>>>>> database transaction (no task can alter the rows that validate the
> >>>>>> assertion while it's getting asserted). In practice it's a little
> >>>> tricky
> >>>>>> and we've seen rogue double-firing in the past (I have no idea how
> >>>> often
> >>>>>> that happens).
> >>>>>>
> >>>>>> If we do want to prevent double-triggerring, we should make sure
> that
> >> 2
> >>>>>> schedulers aren't processing the same DAG or DagRun at the same
> time.
> >>>>> That
> >>>>>> would mean for the scheduler to not start the process of locked
> DAGs,
> >>>> and
> >>>>>> by providing a mechanism to expire the locks after some time.
> >>>>>>
> >>>>>> Has anyone experienced double firing lately? If that exist we should
> >>>> fix
> >>>>>> it, but also be careful around multiple scheduler double-triggering
> as
> >>>> it
> >>>>>> would make that problem potentially much worse.
> >>>>>>
> >>>>>> Max
> >>>>>>
> >>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> It’s exactly what my team is doing & what I shared here earlier
> last
> >>>>> year
> >>>>>>> (
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>>>> <
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>>>
> >>>>>>> )
> >>>>>>>
> >>>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and now
> >> I’m
> >>>>>>> thinking how we can have it more proper & robust.
> >>>>>>>
> >>>>>>>
> >>>>>>> XD
> >>>>>>>
> >>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <
> mario.urquizo@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> We have been running multiple schedulers for about 3 months.  We
> >>>>> created
> >>>>>>>> multiple services to run airflow schedulers.  The only difference
> is
> >>>>>>> that
> >>>>>>>> we have each of the schedulers pointed to a directory one level
> >>>> deeper
> >>>>>>> than
> >>>>>>>> the DAG home directory that the workers and webapp use. We have
> seen
> >>>>>>> much
> >>>>>>>> better scheduling performance but this does not yet help with HA.
> >>>>>>>>
> >>>>>>>> DAGS_HOME:
> >>>>>>>> {airflow_home}/dags  (webapp & workers)
> >>>>>>>> {airflow_home}/dags/group-a/ (scheduler1)
> >>>>>>>> {airflow_home}/dags/group-b/ (scheduler2)
> >>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3)
> >>>>>>>>
> >>>>>>>> Not sure if this helps, just sharing in case it does.
> >>>>>>>>
> >>>>>>>> Thank you,
> >>>>>>>> Mario
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bd...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> I have done quite some work on making it possible to run multiple
> >>>>>>>>> schedulers at the same time.  At the moment I don’t think there
> are
> >>>>>>> real
> >>>>>>>>> blockers actually to do so. We just don’t actively test it.
> >>>>>>>>>
> >>>>>>>>> Database locking is mostly in place (DagRuns and TaskInstances).
> >>>> And
> >>>>> I
> >>>>>>>>> think the worst that can happen is that a task is scheduled
> twice.
> >>>>> The
> >>>>>>> task
> >>>>>>>>> will detect this most of the time and kill one off if concurrent
> if
> >>>>> not
> >>>>>>>>> sequential then I will run again in some occasions. Everyone is
> >>>>> having
> >>>>>>>>> idempotent tasks right so no harm done? ;-)
> >>>>>>>>>
> >>>>>>>>> Have you encountered issues? Maybe work those out?
> >>>>>>>>>
> >>>>>>>>> Cheers
> >>>>>>>>> Bolke.
> >>>>>>>>>
> >>>>>>>>> Verstuurd vanaf mijn iPad
> >>>>>>>>>
> >>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <
> xd.deng.r@gmail.com>
> >>>>> het
> >>>>>>>>> volgende geschreven:
> >>>>>>>>>>
> >>>>>>>>>> Hi Max,
> >>>>>>>>>>
> >>>>>>>>>> Following
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>>>>>> <
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>>>>> ,
> >>>>>>>>> I’m trying to prepare an AIP for supporting multiple-scheduler in
> >>>>>>> Airflow
> >>>>>>>>> (mainly for HA and Higher scheduling performance).
> >>>>>>>>>>
> >>>>>>>>>> Along the process of code checking, I found that there is one
> >>>>>>> attribute
> >>>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current
> >>>>>>>>> implementation, but it was introduced long time back (2015) to
> >>>> allow
> >>>>>>>>> multiple schedulers to work together (
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> >>>>>>>>> <
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> >>>>>>>>
> >>>>>>>>> ).
> >>>>>>>>>>
> >>>>>>>>>> Since you were the original author of it, it would be very
> helpful
> >>>>> if
> >>>>>>>>> you can kindly share why the multiple-schedulers implementation
> was
> >>>>>>> removed
> >>>>>>>>> eventually, and what challenges/complexity there were.
> >>>>>>>>>> (You already shared a few valuable inputs in the earlier
> >>>> discussion
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> >>>>>>>>> <
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> >>>>>>>>
> >>>>>>>>> , mainly relating to hiccups around concurrency, cross DAG
> >>>>>>> prioritisation &
> >>>>>>>>> load on DB. Other than these, anything else you would like to
> >>>>> advise?)
> >>>>>>>>>>
> >>>>>>>>>> I will also dive into the git history further to understand it
> >>>>> better.
> >>>>>>>>>>
> >>>>>>>>>> Thanks.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> XD
> >>>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> >>
> >
> > --
> >
> > Jarek Potiuk
> > Polidea <https://www.polidea.com/> | Principal Software Engineer
> >
> > M: +48 660 796 129 <+48660796129>
> > E: jarek.potiuk@polidea.com
>
>

Re: Multiple Schedulers - "scheduler_lock"

Posted by Deng Xiaodong <xd...@gmail.com>.
Get your point and agree. And the suggestion you gave lastly to random sort DAGs is a great idea to address it. Thanks!

XD

> On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <Ja...@polidea.com> wrote:
> 
> I think that the probability calculation holds only if there is no
> correlation between different schedulers. I think however there might be an
> accidental correlation if you think about typical deployments.
> 
> Some details why I think accidental correlation is possible and even
> likely. Assume that:
> 
>   - we have similar and similarly busy machines running schedulers (likely)
>   - time is synchronised between the machines (likely)
>   - the machines have the same DAG folders mounted (or copied) and the
>   same filesystem is used (this is exactly what multiple schedulers
>   deployment is all about)
>   - the schedulers start scanning at exactly the same time (crossing 0:00
>   second every full five minutes for example)  - this I am not sure but I
>   imagine this might be "typical" behaviour.
>   - they process list of DAGs in exactly the same sequence (it looks like
>   this is the case dag_processing
>   <https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300>
>   and models/__init__
>   <https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567>:
>   we use os.walk which uses os.listdir for which sequence of processing
>   depends on the filesystem implementation
>   <https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic>
> and
>   then we append files to the list)
> 
> Then it's rather likely that the schedulers will be competing about the
> very same DAGs at the very beginning. Locking will change how quickly they
> process each DAG of course, but If the DAGs are of similar sizes it's also
> likely that the speed of scanning (DAGS/s) is similar for all schedulers.
> The schedulers will then catch-up with each other and might pretty much
> continuously compete for the same DAGs almost all the time.
> 
> It can be mitigated super-easily by random sorting of the DAGs folder list
> after it is prepared (it's file-system dependent now so we do not rely on
> particular order) . Then the probability numbers will hold perfectly I
> think :)
> 
> J.
> 
> 
> On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd...@gmail.com> wrote:
> 
>> I’m thinking of which architecture would be ideal.
>> 
>> 
>> # Option-1:
>> The master-slave architecture would be one option. But leader-selection
>> will be very essential to consider, otherwise we have issue in terms of HA
>> again.
>> 
>> 
>> # Option-2:
>> Another option we may consider is to simply start multiple scheduler
>> instances (just using the current implementation, after modify & validate
>> the scheduler_lock on DagModel).
>> 
>> - In this case, given we handle everything properly using locking, we
>> don’t need to worry too much about double-scheduling/triggering.
>> 
>> - Another potential concern I had earlier is that different schedulers may
>> compete with each other and cause “waste” of scheduler resource.
>> After further thinking, I realise this is a typical Birthday Problem.
>> Given we have m DAGs, and n schedulers, at any moment, the probability
>> that all schedulers are working on different DAGs is m!/((m-n)! * (m^n)),
>> and the probability that there are schedulers competing on the same DAG
>> will be 1-m!/((m-n)! * (m^n)).
>> 
>> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, the
>> probability that there is schedulers competing on the same DAG is only
>> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is only
>> 0.33%.
>> (This probability will be higher if m/n is low. But users should not start
>> too many schedulers if they don’t have that many DAGs).
>> 
>> Given the probability of schedulers competing is so low, my concern on
>> scheduler resource waste is not really valid.
>> 
>> 
>> 
>> Based on these calculations/assessment, I think we can go for option-2,
>> i.e. we don’t make big change in the current implementation. Instead, we
>> ensure the scheduler_lock is working well and test intensively on running
>> multiple schedulers. Then we should be good to let users know that it’s
>> safe to run multiple schedulers.
>> 
>> Please share your thoughts on this and correct me if I’m wrong in any
>> point above. Thanks.
>> 
>> 
>> XD
>> 
>> 
>> Reference: https://en.wikipedia.org/wiki/Birthday_problem <
>> https://en.wikipedia.org/wiki/Birthday_problem>
>> 
>> 
>>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fe...@gmail.com> wrote:
>>> 
>>> Does the proposal use master-slave architecture(leader scheduler vs slave
>>> scheduler)?
>>> 
>>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yr...@gmail.com> wrote:
>>> 
>>>> Preventing double-triggering by separating DAG files different
>> schedulers
>>>> parse sounds easier and more intuitive. I actually removed one of the
>>>> double-triggering prevention logic here
>>>> <
>>>> 
>> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
>>>>> (expensive)
>>>> and
>>>> was relying on this lock
>>>> <
>>>> 
>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
>>>>> 
>>>> to
>>>> prevent double-firing and safe-guard our non-idempotent tasks( btw the
>>>> insert can be insert overwrite to be idempotent).
>>>> 
>>>> Also tho in Airbnb we requeue tasks a lot, we haven't see double-firing
>>>> recently.
>>>> 
>>>> Cheers,
>>>> Kevin Y
>>>> 
>>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
>>>> maximebeauchemin@gmail.com>
>>>> wrote:
>>>> 
>>>>> Forgot to mention: the intention was to use the lock, but I never
>>>>> personally got to do the second phase which would consist of skipping
>> the
>>>>> DAG if the lock is on, and expire the lock eventually based on a config
>>>>> setting.
>>>>> 
>>>>> Max
>>>>> 
>>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
>>>>> maximebeauchemin@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> My original intention with the lock was preventing "double-triggering"
>>>> of
>>>>>> task (triggering refers to the scheduler putting the message in the
>>>>> queue).
>>>>>> Airflow now has good "double-firing-prevention" of tasks (firing
>>>> happens
>>>>>> when the worker receives the message and starts the task), even if the
>>>>>> scheduler was to go rogue or restart and send multiple triggers for a
>>>>> task
>>>>>> instance, the worker(s) should only start one task instance. That's
>>>> done
>>>>> by
>>>>>> running the database assertions behind the conditions being met as
>> read
>>>>>> database transaction (no task can alter the rows that validate the
>>>>>> assertion while it's getting asserted). In practice it's a little
>>>> tricky
>>>>>> and we've seen rogue double-firing in the past (I have no idea how
>>>> often
>>>>>> that happens).
>>>>>> 
>>>>>> If we do want to prevent double-triggerring, we should make sure that
>> 2
>>>>>> schedulers aren't processing the same DAG or DagRun at the same time.
>>>>> That
>>>>>> would mean for the scheduler to not start the process of locked DAGs,
>>>> and
>>>>>> by providing a mechanism to expire the locks after some time.
>>>>>> 
>>>>>> Has anyone experienced double firing lately? If that exist we should
>>>> fix
>>>>>> it, but also be careful around multiple scheduler double-triggering as
>>>> it
>>>>>> would make that problem potentially much worse.
>>>>>> 
>>>>>> Max
>>>>>> 
>>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>>> It’s exactly what my team is doing & what I shared here earlier last
>>>>> year
>>>>>>> (
>>>>>>> 
>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>> <
>>>>>>> 
>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>> 
>>>>>>> )
>>>>>>> 
>>>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and now
>> I’m
>>>>>>> thinking how we can have it more proper & robust.
>>>>>>> 
>>>>>>> 
>>>>>>> XD
>>>>>>> 
>>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <ma...@gmail.com>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> We have been running multiple schedulers for about 3 months.  We
>>>>> created
>>>>>>>> multiple services to run airflow schedulers.  The only difference is
>>>>>>> that
>>>>>>>> we have each of the schedulers pointed to a directory one level
>>>> deeper
>>>>>>> than
>>>>>>>> the DAG home directory that the workers and webapp use. We have seen
>>>>>>> much
>>>>>>>> better scheduling performance but this does not yet help with HA.
>>>>>>>> 
>>>>>>>> DAGS_HOME:
>>>>>>>> {airflow_home}/dags  (webapp & workers)
>>>>>>>> {airflow_home}/dags/group-a/ (scheduler1)
>>>>>>>> {airflow_home}/dags/group-b/ (scheduler2)
>>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3)
>>>>>>>> 
>>>>>>>> Not sure if this helps, just sharing in case it does.
>>>>>>>> 
>>>>>>>> Thank you,
>>>>>>>> Mario
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bd...@gmail.com>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> I have done quite some work on making it possible to run multiple
>>>>>>>>> schedulers at the same time.  At the moment I don’t think there are
>>>>>>> real
>>>>>>>>> blockers actually to do so. We just don’t actively test it.
>>>>>>>>> 
>>>>>>>>> Database locking is mostly in place (DagRuns and TaskInstances).
>>>> And
>>>>> I
>>>>>>>>> think the worst that can happen is that a task is scheduled twice.
>>>>> The
>>>>>>> task
>>>>>>>>> will detect this most of the time and kill one off if concurrent if
>>>>> not
>>>>>>>>> sequential then I will run again in some occasions. Everyone is
>>>>> having
>>>>>>>>> idempotent tasks right so no harm done? ;-)
>>>>>>>>> 
>>>>>>>>> Have you encountered issues? Maybe work those out?
>>>>>>>>> 
>>>>>>>>> Cheers
>>>>>>>>> Bolke.
>>>>>>>>> 
>>>>>>>>> Verstuurd vanaf mijn iPad
>>>>>>>>> 
>>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <xd...@gmail.com>
>>>>> het
>>>>>>>>> volgende geschreven:
>>>>>>>>>> 
>>>>>>>>>> Hi Max,
>>>>>>>>>> 
>>>>>>>>>> Following
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>> <
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>> ,
>>>>>>>>> I’m trying to prepare an AIP for supporting multiple-scheduler in
>>>>>>> Airflow
>>>>>>>>> (mainly for HA and Higher scheduling performance).
>>>>>>>>>> 
>>>>>>>>>> Along the process of code checking, I found that there is one
>>>>>>> attribute
>>>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current
>>>>>>>>> implementation, but it was introduced long time back (2015) to
>>>> allow
>>>>>>>>> multiple schedulers to work together (
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>>>> <
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>>> 
>>>>>>>>> ).
>>>>>>>>>> 
>>>>>>>>>> Since you were the original author of it, it would be very helpful
>>>>> if
>>>>>>>>> you can kindly share why the multiple-schedulers implementation was
>>>>>>> removed
>>>>>>>>> eventually, and what challenges/complexity there were.
>>>>>>>>>> (You already shared a few valuable inputs in the earlier
>>>> discussion
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>>>> <
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>>> 
>>>>>>>>> , mainly relating to hiccups around concurrency, cross DAG
>>>>>>> prioritisation &
>>>>>>>>> load on DB. Other than these, anything else you would like to
>>>>> advise?)
>>>>>>>>>> 
>>>>>>>>>> I will also dive into the git history further to understand it
>>>>> better.
>>>>>>>>>> 
>>>>>>>>>> Thanks.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> XD
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> 
>> 
> 
> -- 
> 
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
> 
> M: +48 660 796 129 <+48660796129>
> E: jarek.potiuk@polidea.com


Re: Multiple Schedulers - "scheduler_lock"

Posted by Jarek Potiuk <Ja...@polidea.com>.
I think that the probability calculation holds only if there is no
correlation between different schedulers. I think however there might be an
accidental correlation if you think about typical deployments.

Some details why I think accidental correlation is possible and even
likely. Assume that:

   - we have similar and similarly busy machines running schedulers (likely)
   - time is synchronised between the machines (likely)
   - the machines have the same DAG folders mounted (or copied) and the
   same filesystem is used (this is exactly what multiple schedulers
   deployment is all about)
   - the schedulers start scanning at exactly the same time (crossing 0:00
   second every full five minutes for example)  - this I am not sure but I
   imagine this might be "typical" behaviour.
   - they process list of DAGs in exactly the same sequence (it looks like
   this is the case dag_processing
   <https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300>
   and models/__init__
   <https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567>:
   we use os.walk which uses os.listdir for which sequence of processing
   depends on the filesystem implementation
   <https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic>
and
   then we append files to the list)

Then it's rather likely that the schedulers will be competing about the
very same DAGs at the very beginning. Locking will change how quickly they
process each DAG of course, but If the DAGs are of similar sizes it's also
likely that the speed of scanning (DAGS/s) is similar for all schedulers.
The schedulers will then catch-up with each other and might pretty much
continuously compete for the same DAGs almost all the time.

It can be mitigated super-easily by random sorting of the DAGs folder list
after it is prepared (it's file-system dependent now so we do not rely on
particular order) . Then the probability numbers will hold perfectly I
think :)

J.


On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd...@gmail.com> wrote:

> I’m thinking of which architecture would be ideal.
>
>
> # Option-1:
> The master-slave architecture would be one option. But leader-selection
> will be very essential to consider, otherwise we have issue in terms of HA
> again.
>
>
> # Option-2:
> Another option we may consider is to simply start multiple scheduler
> instances (just using the current implementation, after modify & validate
> the scheduler_lock on DagModel).
>
> - In this case, given we handle everything properly using locking, we
> don’t need to worry too much about double-scheduling/triggering.
>
> - Another potential concern I had earlier is that different schedulers may
> compete with each other and cause “waste” of scheduler resource.
> After further thinking, I realise this is a typical Birthday Problem.
> Given we have m DAGs, and n schedulers, at any moment, the probability
> that all schedulers are working on different DAGs is m!/((m-n)! * (m^n)),
> and the probability that there are schedulers competing on the same DAG
> will be 1-m!/((m-n)! * (m^n)).
>
> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, the
> probability that there is schedulers competing on the same DAG is only
> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is only
> 0.33%.
> (This probability will be higher if m/n is low. But users should not start
> too many schedulers if they don’t have that many DAGs).
>
> Given the probability of schedulers competing is so low, my concern on
> scheduler resource waste is not really valid.
>
>
>
> Based on these calculations/assessment, I think we can go for option-2,
> i.e. we don’t make big change in the current implementation. Instead, we
> ensure the scheduler_lock is working well and test intensively on running
> multiple schedulers. Then we should be good to let users know that it’s
> safe to run multiple schedulers.
>
> Please share your thoughts on this and correct me if I’m wrong in any
> point above. Thanks.
>
>
> XD
>
>
> Reference: https://en.wikipedia.org/wiki/Birthday_problem <
> https://en.wikipedia.org/wiki/Birthday_problem>
>
>
> > On 2 Mar 2019, at 3:39 PM, Tao Feng <fe...@gmail.com> wrote:
> >
> > Does the proposal use master-slave architecture(leader scheduler vs slave
> > scheduler)?
> >
> > On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yr...@gmail.com> wrote:
> >
> >> Preventing double-triggering by separating DAG files different
> schedulers
> >> parse sounds easier and more intuitive. I actually removed one of the
> >> double-triggering prevention logic here
> >> <
> >>
> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
> >>> (expensive)
> >> and
> >> was relying on this lock
> >> <
> >>
> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
> >>>
> >> to
> >> prevent double-firing and safe-guard our non-idempotent tasks( btw the
> >> insert can be insert overwrite to be idempotent).
> >>
> >> Also tho in Airbnb we requeue tasks a lot, we haven't see double-firing
> >> recently.
> >>
> >> Cheers,
> >> Kevin Y
> >>
> >> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
> >> maximebeauchemin@gmail.com>
> >> wrote:
> >>
> >>> Forgot to mention: the intention was to use the lock, but I never
> >>> personally got to do the second phase which would consist of skipping
> the
> >>> DAG if the lock is on, and expire the lock eventually based on a config
> >>> setting.
> >>>
> >>> Max
> >>>
> >>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
> >>> maximebeauchemin@gmail.com>
> >>> wrote:
> >>>
> >>>> My original intention with the lock was preventing "double-triggering"
> >> of
> >>>> task (triggering refers to the scheduler putting the message in the
> >>> queue).
> >>>> Airflow now has good "double-firing-prevention" of tasks (firing
> >> happens
> >>>> when the worker receives the message and starts the task), even if the
> >>>> scheduler was to go rogue or restart and send multiple triggers for a
> >>> task
> >>>> instance, the worker(s) should only start one task instance. That's
> >> done
> >>> by
> >>>> running the database assertions behind the conditions being met as
> read
> >>>> database transaction (no task can alter the rows that validate the
> >>>> assertion while it's getting asserted). In practice it's a little
> >> tricky
> >>>> and we've seen rogue double-firing in the past (I have no idea how
> >> often
> >>>> that happens).
> >>>>
> >>>> If we do want to prevent double-triggerring, we should make sure that
> 2
> >>>> schedulers aren't processing the same DAG or DagRun at the same time.
> >>> That
> >>>> would mean for the scheduler to not start the process of locked DAGs,
> >> and
> >>>> by providing a mechanism to expire the locks after some time.
> >>>>
> >>>> Has anyone experienced double firing lately? If that exist we should
> >> fix
> >>>> it, but also be careful around multiple scheduler double-triggering as
> >> it
> >>>> would make that problem potentially much worse.
> >>>>
> >>>> Max
> >>>>
> >>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> It’s exactly what my team is doing & what I shared here earlier last
> >>> year
> >>>>> (
> >>>>>
> >>>
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>> <
> >>>>>
> >>>
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>
> >>>>> )
> >>>>>
> >>>>> It’s somehow a “hacky” solution (and HA is not addressed), and now
> I’m
> >>>>> thinking how we can have it more proper & robust.
> >>>>>
> >>>>>
> >>>>> XD
> >>>>>
> >>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <ma...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>> We have been running multiple schedulers for about 3 months.  We
> >>> created
> >>>>>> multiple services to run airflow schedulers.  The only difference is
> >>>>> that
> >>>>>> we have each of the schedulers pointed to a directory one level
> >> deeper
> >>>>> than
> >>>>>> the DAG home directory that the workers and webapp use. We have seen
> >>>>> much
> >>>>>> better scheduling performance but this does not yet help with HA.
> >>>>>>
> >>>>>> DAGS_HOME:
> >>>>>> {airflow_home}/dags  (webapp & workers)
> >>>>>> {airflow_home}/dags/group-a/ (scheduler1)
> >>>>>> {airflow_home}/dags/group-b/ (scheduler2)
> >>>>>> {airflow_home}/dags/group-etc/ (scheduler3)
> >>>>>>
> >>>>>> Not sure if this helps, just sharing in case it does.
> >>>>>>
> >>>>>> Thank you,
> >>>>>> Mario
> >>>>>>
> >>>>>>
> >>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bd...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> I have done quite some work on making it possible to run multiple
> >>>>>>> schedulers at the same time.  At the moment I don’t think there are
> >>>>> real
> >>>>>>> blockers actually to do so. We just don’t actively test it.
> >>>>>>>
> >>>>>>> Database locking is mostly in place (DagRuns and TaskInstances).
> >> And
> >>> I
> >>>>>>> think the worst that can happen is that a task is scheduled twice.
> >>> The
> >>>>> task
> >>>>>>> will detect this most of the time and kill one off if concurrent if
> >>> not
> >>>>>>> sequential then I will run again in some occasions. Everyone is
> >>> having
> >>>>>>> idempotent tasks right so no harm done? ;-)
> >>>>>>>
> >>>>>>> Have you encountered issues? Maybe work those out?
> >>>>>>>
> >>>>>>> Cheers
> >>>>>>> Bolke.
> >>>>>>>
> >>>>>>> Verstuurd vanaf mijn iPad
> >>>>>>>
> >>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <xd...@gmail.com>
> >>> het
> >>>>>>> volgende geschreven:
> >>>>>>>>
> >>>>>>>> Hi Max,
> >>>>>>>>
> >>>>>>>> Following
> >>>>>>>
> >>>>>
> >>>
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>>>> <
> >>>>>>>
> >>>>>
> >>>
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>>> ,
> >>>>>>> I’m trying to prepare an AIP for supporting multiple-scheduler in
> >>>>> Airflow
> >>>>>>> (mainly for HA and Higher scheduling performance).
> >>>>>>>>
> >>>>>>>> Along the process of code checking, I found that there is one
> >>>>> attribute
> >>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current
> >>>>>>> implementation, but it was introduced long time back (2015) to
> >> allow
> >>>>>>> multiple schedulers to work together (
> >>>>>>>
> >>>>>
> >>>
> >>
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> >>>>>>> <
> >>>>>>>
> >>>>>
> >>>
> >>
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> >>>>>>
> >>>>>>> ).
> >>>>>>>>
> >>>>>>>> Since you were the original author of it, it would be very helpful
> >>> if
> >>>>>>> you can kindly share why the multiple-schedulers implementation was
> >>>>> removed
> >>>>>>> eventually, and what challenges/complexity there were.
> >>>>>>>> (You already shared a few valuable inputs in the earlier
> >> discussion
> >>>>>>>
> >>>>>
> >>>
> >>
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> >>>>>>> <
> >>>>>>>
> >>>>>
> >>>
> >>
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> >>>>>>
> >>>>>>> , mainly relating to hiccups around concurrency, cross DAG
> >>>>> prioritisation &
> >>>>>>> load on DB. Other than these, anything else you would like to
> >>> advise?)
> >>>>>>>>
> >>>>>>>> I will also dive into the git history further to understand it
> >>> better.
> >>>>>>>>
> >>>>>>>> Thanks.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> XD
> >>>>>>>
> >>>>>
> >>>>>
> >>>
> >>
>
>

-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
E: jarek.potiuk@polidea.com

Re: Multiple Schedulers - "scheduler_lock"

Posted by Deng Xiaodong <xd...@gmail.com>.
I’m thinking of which architecture would be ideal.


# Option-1:
The master-slave architecture would be one option. But leader-selection will be very essential to consider, otherwise we have issue in terms of HA again.


# Option-2:
Another option we may consider is to simply start multiple scheduler instances (just using the current implementation, after modify & validate the scheduler_lock on DagModel).

- In this case, given we handle everything properly using locking, we don’t need to worry too much about double-scheduling/triggering.

- Another potential concern I had earlier is that different schedulers may compete with each other and cause “waste” of scheduler resource.
After further thinking, I realise this is a typical Birthday Problem. 
Given we have m DAGs, and n schedulers, at any moment, the probability that all schedulers are working on different DAGs is m!/((m-n)! * (m^n)), and the probability that there are schedulers competing on the same DAG will be 1-m!/((m-n)! * (m^n)).

Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, the probability that there is schedulers competing on the same DAG is only 0.5%. If we run 2 schedulers against 300 DAGs, this probability is only 0.33%.
(This probability will be higher if m/n is low. But users should not start too many schedulers if they don’t have that many DAGs).

Given the probability of schedulers competing is so low, my concern on scheduler resource waste is not really valid.



Based on these calculations/assessment, I think we can go for option-2, i.e. we don’t make big change in the current implementation. Instead, we ensure the scheduler_lock is working well and test intensively on running multiple schedulers. Then we should be good to let users know that it’s safe to run multiple schedulers.

Please share your thoughts on this and correct me if I’m wrong in any point above. Thanks.


XD


Reference: https://en.wikipedia.org/wiki/Birthday_problem <https://en.wikipedia.org/wiki/Birthday_problem> 


> On 2 Mar 2019, at 3:39 PM, Tao Feng <fe...@gmail.com> wrote:
> 
> Does the proposal use master-slave architecture(leader scheduler vs slave
> scheduler)?
> 
> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yr...@gmail.com> wrote:
> 
>> Preventing double-triggering by separating DAG files different schedulers
>> parse sounds easier and more intuitive. I actually removed one of the
>> double-triggering prevention logic here
>> <
>> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
>>> (expensive)
>> and
>> was relying on this lock
>> <
>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
>>> 
>> to
>> prevent double-firing and safe-guard our non-idempotent tasks( btw the
>> insert can be insert overwrite to be idempotent).
>> 
>> Also tho in Airbnb we requeue tasks a lot, we haven't see double-firing
>> recently.
>> 
>> Cheers,
>> Kevin Y
>> 
>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
>> maximebeauchemin@gmail.com>
>> wrote:
>> 
>>> Forgot to mention: the intention was to use the lock, but I never
>>> personally got to do the second phase which would consist of skipping the
>>> DAG if the lock is on, and expire the lock eventually based on a config
>>> setting.
>>> 
>>> Max
>>> 
>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
>>> maximebeauchemin@gmail.com>
>>> wrote:
>>> 
>>>> My original intention with the lock was preventing "double-triggering"
>> of
>>>> task (triggering refers to the scheduler putting the message in the
>>> queue).
>>>> Airflow now has good "double-firing-prevention" of tasks (firing
>> happens
>>>> when the worker receives the message and starts the task), even if the
>>>> scheduler was to go rogue or restart and send multiple triggers for a
>>> task
>>>> instance, the worker(s) should only start one task instance. That's
>> done
>>> by
>>>> running the database assertions behind the conditions being met as read
>>>> database transaction (no task can alter the rows that validate the
>>>> assertion while it's getting asserted). In practice it's a little
>> tricky
>>>> and we've seen rogue double-firing in the past (I have no idea how
>> often
>>>> that happens).
>>>> 
>>>> If we do want to prevent double-triggerring, we should make sure that 2
>>>> schedulers aren't processing the same DAG or DagRun at the same time.
>>> That
>>>> would mean for the scheduler to not start the process of locked DAGs,
>> and
>>>> by providing a mechanism to expire the locks after some time.
>>>> 
>>>> Has anyone experienced double firing lately? If that exist we should
>> fix
>>>> it, but also be careful around multiple scheduler double-triggering as
>> it
>>>> would make that problem potentially much worse.
>>>> 
>>>> Max
>>>> 
>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd...@gmail.com>
>>> wrote:
>>>> 
>>>>> It’s exactly what my team is doing & what I shared here earlier last
>>> year
>>>>> (
>>>>> 
>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>> <
>>>>> 
>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>> 
>>>>> )
>>>>> 
>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and now I’m
>>>>> thinking how we can have it more proper & robust.
>>>>> 
>>>>> 
>>>>> XD
>>>>> 
>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <ma...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>> We have been running multiple schedulers for about 3 months.  We
>>> created
>>>>>> multiple services to run airflow schedulers.  The only difference is
>>>>> that
>>>>>> we have each of the schedulers pointed to a directory one level
>> deeper
>>>>> than
>>>>>> the DAG home directory that the workers and webapp use. We have seen
>>>>> much
>>>>>> better scheduling performance but this does not yet help with HA.
>>>>>> 
>>>>>> DAGS_HOME:
>>>>>> {airflow_home}/dags  (webapp & workers)
>>>>>> {airflow_home}/dags/group-a/ (scheduler1)
>>>>>> {airflow_home}/dags/group-b/ (scheduler2)
>>>>>> {airflow_home}/dags/group-etc/ (scheduler3)
>>>>>> 
>>>>>> Not sure if this helps, just sharing in case it does.
>>>>>> 
>>>>>> Thank you,
>>>>>> Mario
>>>>>> 
>>>>>> 
>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bd...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>>> I have done quite some work on making it possible to run multiple
>>>>>>> schedulers at the same time.  At the moment I don’t think there are
>>>>> real
>>>>>>> blockers actually to do so. We just don’t actively test it.
>>>>>>> 
>>>>>>> Database locking is mostly in place (DagRuns and TaskInstances).
>> And
>>> I
>>>>>>> think the worst that can happen is that a task is scheduled twice.
>>> The
>>>>> task
>>>>>>> will detect this most of the time and kill one off if concurrent if
>>> not
>>>>>>> sequential then I will run again in some occasions. Everyone is
>>> having
>>>>>>> idempotent tasks right so no harm done? ;-)
>>>>>>> 
>>>>>>> Have you encountered issues? Maybe work those out?
>>>>>>> 
>>>>>>> Cheers
>>>>>>> Bolke.
>>>>>>> 
>>>>>>> Verstuurd vanaf mijn iPad
>>>>>>> 
>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <xd...@gmail.com>
>>> het
>>>>>>> volgende geschreven:
>>>>>>>> 
>>>>>>>> Hi Max,
>>>>>>>> 
>>>>>>>> Following
>>>>>>> 
>>>>> 
>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>> <
>>>>>>> 
>>>>> 
>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>> ,
>>>>>>> I’m trying to prepare an AIP for supporting multiple-scheduler in
>>>>> Airflow
>>>>>>> (mainly for HA and Higher scheduling performance).
>>>>>>>> 
>>>>>>>> Along the process of code checking, I found that there is one
>>>>> attribute
>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current
>>>>>>> implementation, but it was introduced long time back (2015) to
>> allow
>>>>>>> multiple schedulers to work together (
>>>>>>> 
>>>>> 
>>> 
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>> <
>>>>>>> 
>>>>> 
>>> 
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>> 
>>>>>>> ).
>>>>>>>> 
>>>>>>>> Since you were the original author of it, it would be very helpful
>>> if
>>>>>>> you can kindly share why the multiple-schedulers implementation was
>>>>> removed
>>>>>>> eventually, and what challenges/complexity there were.
>>>>>>>> (You already shared a few valuable inputs in the earlier
>> discussion
>>>>>>> 
>>>>> 
>>> 
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>> <
>>>>>>> 
>>>>> 
>>> 
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>> 
>>>>>>> , mainly relating to hiccups around concurrency, cross DAG
>>>>> prioritisation &
>>>>>>> load on DB. Other than these, anything else you would like to
>>> advise?)
>>>>>>>> 
>>>>>>>> I will also dive into the git history further to understand it
>>> better.
>>>>>>>> 
>>>>>>>> Thanks.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> XD
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>> 


Re: Multiple Schedulers - "scheduler_lock"

Posted by Tao Feng <fe...@gmail.com>.
Does the proposal use master-slave architecture(leader scheduler vs slave
scheduler)?

On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yr...@gmail.com> wrote:

> Preventing double-triggering by separating DAG files different schedulers
> parse sounds easier and more intuitive. I actually removed one of the
> double-triggering prevention logic here
> <
> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
> >(expensive)
> and
> was relying on this lock
> <
> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
> >
> to
> prevent double-firing and safe-guard our non-idempotent tasks( btw the
> insert can be insert overwrite to be idempotent).
>
> Also tho in Airbnb we requeue tasks a lot, we haven't see double-firing
> recently.
>
> Cheers,
> Kevin Y
>
> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
> maximebeauchemin@gmail.com>
> wrote:
>
> > Forgot to mention: the intention was to use the lock, but I never
> > personally got to do the second phase which would consist of skipping the
> > DAG if the lock is on, and expire the lock eventually based on a config
> > setting.
> >
> > Max
> >
> > On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
> > maximebeauchemin@gmail.com>
> > wrote:
> >
> > > My original intention with the lock was preventing "double-triggering"
> of
> > > task (triggering refers to the scheduler putting the message in the
> > queue).
> > > Airflow now has good "double-firing-prevention" of tasks (firing
> happens
> > > when the worker receives the message and starts the task), even if the
> > > scheduler was to go rogue or restart and send multiple triggers for a
> > task
> > > instance, the worker(s) should only start one task instance. That's
> done
> > by
> > > running the database assertions behind the conditions being met as read
> > > database transaction (no task can alter the rows that validate the
> > > assertion while it's getting asserted). In practice it's a little
> tricky
> > > and we've seen rogue double-firing in the past (I have no idea how
> often
> > > that happens).
> > >
> > > If we do want to prevent double-triggerring, we should make sure that 2
> > > schedulers aren't processing the same DAG or DagRun at the same time.
> > That
> > > would mean for the scheduler to not start the process of locked DAGs,
> and
> > > by providing a mechanism to expire the locks after some time.
> > >
> > > Has anyone experienced double firing lately? If that exist we should
> fix
> > > it, but also be careful around multiple scheduler double-triggering as
> it
> > > would make that problem potentially much worse.
> > >
> > > Max
> > >
> > > On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd...@gmail.com>
> > wrote:
> > >
> > >> It’s exactly what my team is doing & what I shared here earlier last
> > year
> > >> (
> > >>
> >
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> > >> <
> > >>
> >
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> > >
> > >> )
> > >>
> > >> It’s somehow a “hacky” solution (and HA is not addressed), and now I’m
> > >> thinking how we can have it more proper & robust.
> > >>
> > >>
> > >> XD
> > >>
> > >> > On 2 Mar 2019, at 12:04 AM, Mario Urquizo <ma...@gmail.com>
> > >> wrote:
> > >> >
> > >> > We have been running multiple schedulers for about 3 months.  We
> > created
> > >> > multiple services to run airflow schedulers.  The only difference is
> > >> that
> > >> > we have each of the schedulers pointed to a directory one level
> deeper
> > >> than
> > >> > the DAG home directory that the workers and webapp use. We have seen
> > >> much
> > >> > better scheduling performance but this does not yet help with HA.
> > >> >
> > >> > DAGS_HOME:
> > >> > {airflow_home}/dags  (webapp & workers)
> > >> > {airflow_home}/dags/group-a/ (scheduler1)
> > >> > {airflow_home}/dags/group-b/ (scheduler2)
> > >> > {airflow_home}/dags/group-etc/ (scheduler3)
> > >> >
> > >> > Not sure if this helps, just sharing in case it does.
> > >> >
> > >> > Thank you,
> > >> > Mario
> > >> >
> > >> >
> > >> > On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bd...@gmail.com>
> > >> wrote:
> > >> >
> > >> >> I have done quite some work on making it possible to run multiple
> > >> >> schedulers at the same time.  At the moment I don’t think there are
> > >> real
> > >> >> blockers actually to do so. We just don’t actively test it.
> > >> >>
> > >> >> Database locking is mostly in place (DagRuns and TaskInstances).
> And
> > I
> > >> >> think the worst that can happen is that a task is scheduled twice.
> > The
> > >> task
> > >> >> will detect this most of the time and kill one off if concurrent if
> > not
> > >> >> sequential then I will run again in some occasions. Everyone is
> > having
> > >> >> idempotent tasks right so no harm done? ;-)
> > >> >>
> > >> >> Have you encountered issues? Maybe work those out?
> > >> >>
> > >> >> Cheers
> > >> >> Bolke.
> > >> >>
> > >> >> Verstuurd vanaf mijn iPad
> > >> >>
> > >> >>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <xd...@gmail.com>
> > het
> > >> >> volgende geschreven:
> > >> >>>
> > >> >>> Hi Max,
> > >> >>>
> > >> >>> Following
> > >> >>
> > >>
> >
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> > >> >> <
> > >> >>
> > >>
> >
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> > >> >,
> > >> >> I’m trying to prepare an AIP for supporting multiple-scheduler in
> > >> Airflow
> > >> >> (mainly for HA and Higher scheduling performance).
> > >> >>>
> > >> >>> Along the process of code checking, I found that there is one
> > >> attribute
> > >> >> of DagModel, “scheduler_lock”. It’s not used at all in current
> > >> >> implementation, but it was introduced long time back (2015) to
> allow
> > >> >> multiple schedulers to work together (
> > >> >>
> > >>
> >
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> > >> >> <
> > >> >>
> > >>
> >
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> > >> >
> > >> >> ).
> > >> >>>
> > >> >>> Since you were the original author of it, it would be very helpful
> > if
> > >> >> you can kindly share why the multiple-schedulers implementation was
> > >> removed
> > >> >> eventually, and what challenges/complexity there were.
> > >> >>> (You already shared a few valuable inputs in the earlier
> discussion
> > >> >>
> > >>
> >
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> > >> >> <
> > >> >>
> > >>
> >
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> > >> >
> > >> >> , mainly relating to hiccups around concurrency, cross DAG
> > >> prioritisation &
> > >> >> load on DB. Other than these, anything else you would like to
> > advise?)
> > >> >>>
> > >> >>> I will also dive into the git history further to understand it
> > better.
> > >> >>>
> > >> >>> Thanks.
> > >> >>>
> > >> >>>
> > >> >>> XD
> > >> >>
> > >>
> > >>
> >
>

Re: Multiple Schedulers - "scheduler_lock"

Posted by Kevin Yang <yr...@gmail.com>.
Preventing double-triggering by separating DAG files different schedulers
parse sounds easier and more intuitive. I actually removed one of the
double-triggering prevention logic here
<https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127>(expensive)
and
was relying on this lock
<https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233>
to
prevent double-firing and safe-guard our non-idempotent tasks( btw the
insert can be insert overwrite to be idempotent).

Also tho in Airbnb we requeue tasks a lot, we haven't see double-firing
recently.

Cheers,
Kevin Y

On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <ma...@gmail.com>
wrote:

> Forgot to mention: the intention was to use the lock, but I never
> personally got to do the second phase which would consist of skipping the
> DAG if the lock is on, and expire the lock eventually based on a config
> setting.
>
> Max
>
> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
> maximebeauchemin@gmail.com>
> wrote:
>
> > My original intention with the lock was preventing "double-triggering" of
> > task (triggering refers to the scheduler putting the message in the
> queue).
> > Airflow now has good "double-firing-prevention" of tasks (firing happens
> > when the worker receives the message and starts the task), even if the
> > scheduler was to go rogue or restart and send multiple triggers for a
> task
> > instance, the worker(s) should only start one task instance. That's done
> by
> > running the database assertions behind the conditions being met as read
> > database transaction (no task can alter the rows that validate the
> > assertion while it's getting asserted). In practice it's a little tricky
> > and we've seen rogue double-firing in the past (I have no idea how often
> > that happens).
> >
> > If we do want to prevent double-triggerring, we should make sure that 2
> > schedulers aren't processing the same DAG or DagRun at the same time.
> That
> > would mean for the scheduler to not start the process of locked DAGs, and
> > by providing a mechanism to expire the locks after some time.
> >
> > Has anyone experienced double firing lately? If that exist we should fix
> > it, but also be careful around multiple scheduler double-triggering as it
> > would make that problem potentially much worse.
> >
> > Max
> >
> > On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd...@gmail.com>
> wrote:
> >
> >> It’s exactly what my team is doing & what I shared here earlier last
> year
> >> (
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >> <
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >
> >> )
> >>
> >> It’s somehow a “hacky” solution (and HA is not addressed), and now I’m
> >> thinking how we can have it more proper & robust.
> >>
> >>
> >> XD
> >>
> >> > On 2 Mar 2019, at 12:04 AM, Mario Urquizo <ma...@gmail.com>
> >> wrote:
> >> >
> >> > We have been running multiple schedulers for about 3 months.  We
> created
> >> > multiple services to run airflow schedulers.  The only difference is
> >> that
> >> > we have each of the schedulers pointed to a directory one level deeper
> >> than
> >> > the DAG home directory that the workers and webapp use. We have seen
> >> much
> >> > better scheduling performance but this does not yet help with HA.
> >> >
> >> > DAGS_HOME:
> >> > {airflow_home}/dags  (webapp & workers)
> >> > {airflow_home}/dags/group-a/ (scheduler1)
> >> > {airflow_home}/dags/group-b/ (scheduler2)
> >> > {airflow_home}/dags/group-etc/ (scheduler3)
> >> >
> >> > Not sure if this helps, just sharing in case it does.
> >> >
> >> > Thank you,
> >> > Mario
> >> >
> >> >
> >> > On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bd...@gmail.com>
> >> wrote:
> >> >
> >> >> I have done quite some work on making it possible to run multiple
> >> >> schedulers at the same time.  At the moment I don’t think there are
> >> real
> >> >> blockers actually to do so. We just don’t actively test it.
> >> >>
> >> >> Database locking is mostly in place (DagRuns and TaskInstances). And
> I
> >> >> think the worst that can happen is that a task is scheduled twice.
> The
> >> task
> >> >> will detect this most of the time and kill one off if concurrent if
> not
> >> >> sequential then I will run again in some occasions. Everyone is
> having
> >> >> idempotent tasks right so no harm done? ;-)
> >> >>
> >> >> Have you encountered issues? Maybe work those out?
> >> >>
> >> >> Cheers
> >> >> Bolke.
> >> >>
> >> >> Verstuurd vanaf mijn iPad
> >> >>
> >> >>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <xd...@gmail.com>
> het
> >> >> volgende geschreven:
> >> >>>
> >> >>> Hi Max,
> >> >>>
> >> >>> Following
> >> >>
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >> >> <
> >> >>
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >> >,
> >> >> I’m trying to prepare an AIP for supporting multiple-scheduler in
> >> Airflow
> >> >> (mainly for HA and Higher scheduling performance).
> >> >>>
> >> >>> Along the process of code checking, I found that there is one
> >> attribute
> >> >> of DagModel, “scheduler_lock”. It’s not used at all in current
> >> >> implementation, but it was introduced long time back (2015) to allow
> >> >> multiple schedulers to work together (
> >> >>
> >>
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> >> >> <
> >> >>
> >>
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> >> >
> >> >> ).
> >> >>>
> >> >>> Since you were the original author of it, it would be very helpful
> if
> >> >> you can kindly share why the multiple-schedulers implementation was
> >> removed
> >> >> eventually, and what challenges/complexity there were.
> >> >>> (You already shared a few valuable inputs in the earlier discussion
> >> >>
> >>
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> >> >> <
> >> >>
> >>
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> >> >
> >> >> , mainly relating to hiccups around concurrency, cross DAG
> >> prioritisation &
> >> >> load on DB. Other than these, anything else you would like to
> advise?)
> >> >>>
> >> >>> I will also dive into the git history further to understand it
> better.
> >> >>>
> >> >>> Thanks.
> >> >>>
> >> >>>
> >> >>> XD
> >> >>
> >>
> >>
>

Re: Multiple Schedulers - "scheduler_lock"

Posted by Maxime Beauchemin <ma...@gmail.com>.
Forgot to mention: the intention was to use the lock, but I never
personally got to do the second phase which would consist of skipping the
DAG if the lock is on, and expire the lock eventually based on a config
setting.

Max

On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <ma...@gmail.com>
wrote:

> My original intention with the lock was preventing "double-triggering" of
> task (triggering refers to the scheduler putting the message in the queue).
> Airflow now has good "double-firing-prevention" of tasks (firing happens
> when the worker receives the message and starts the task), even if the
> scheduler was to go rogue or restart and send multiple triggers for a task
> instance, the worker(s) should only start one task instance. That's done by
> running the database assertions behind the conditions being met as read
> database transaction (no task can alter the rows that validate the
> assertion while it's getting asserted). In practice it's a little tricky
> and we've seen rogue double-firing in the past (I have no idea how often
> that happens).
>
> If we do want to prevent double-triggerring, we should make sure that 2
> schedulers aren't processing the same DAG or DagRun at the same time. That
> would mean for the scheduler to not start the process of locked DAGs, and
> by providing a mechanism to expire the locks after some time.
>
> Has anyone experienced double firing lately? If that exist we should fix
> it, but also be careful around multiple scheduler double-triggering as it
> would make that problem potentially much worse.
>
> Max
>
> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd...@gmail.com> wrote:
>
>> It’s exactly what my team is doing & what I shared here earlier last year
>> (
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>> <
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E>
>> )
>>
>> It’s somehow a “hacky” solution (and HA is not addressed), and now I’m
>> thinking how we can have it more proper & robust.
>>
>>
>> XD
>>
>> > On 2 Mar 2019, at 12:04 AM, Mario Urquizo <ma...@gmail.com>
>> wrote:
>> >
>> > We have been running multiple schedulers for about 3 months.  We created
>> > multiple services to run airflow schedulers.  The only difference is
>> that
>> > we have each of the schedulers pointed to a directory one level deeper
>> than
>> > the DAG home directory that the workers and webapp use. We have seen
>> much
>> > better scheduling performance but this does not yet help with HA.
>> >
>> > DAGS_HOME:
>> > {airflow_home}/dags  (webapp & workers)
>> > {airflow_home}/dags/group-a/ (scheduler1)
>> > {airflow_home}/dags/group-b/ (scheduler2)
>> > {airflow_home}/dags/group-etc/ (scheduler3)
>> >
>> > Not sure if this helps, just sharing in case it does.
>> >
>> > Thank you,
>> > Mario
>> >
>> >
>> > On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bd...@gmail.com>
>> wrote:
>> >
>> >> I have done quite some work on making it possible to run multiple
>> >> schedulers at the same time.  At the moment I don’t think there are
>> real
>> >> blockers actually to do so. We just don’t actively test it.
>> >>
>> >> Database locking is mostly in place (DagRuns and TaskInstances). And I
>> >> think the worst that can happen is that a task is scheduled twice. The
>> task
>> >> will detect this most of the time and kill one off if concurrent if not
>> >> sequential then I will run again in some occasions. Everyone is having
>> >> idempotent tasks right so no harm done? ;-)
>> >>
>> >> Have you encountered issues? Maybe work those out?
>> >>
>> >> Cheers
>> >> Bolke.
>> >>
>> >> Verstuurd vanaf mijn iPad
>> >>
>> >>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <xd...@gmail.com> het
>> >> volgende geschreven:
>> >>>
>> >>> Hi Max,
>> >>>
>> >>> Following
>> >>
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>> >> <
>> >>
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>> >,
>> >> I’m trying to prepare an AIP for supporting multiple-scheduler in
>> Airflow
>> >> (mainly for HA and Higher scheduling performance).
>> >>>
>> >>> Along the process of code checking, I found that there is one
>> attribute
>> >> of DagModel, “scheduler_lock”. It’s not used at all in current
>> >> implementation, but it was introduced long time back (2015) to allow
>> >> multiple schedulers to work together (
>> >>
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>> >> <
>> >>
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>> >
>> >> ).
>> >>>
>> >>> Since you were the original author of it, it would be very helpful if
>> >> you can kindly share why the multiple-schedulers implementation was
>> removed
>> >> eventually, and what challenges/complexity there were.
>> >>> (You already shared a few valuable inputs in the earlier discussion
>> >>
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>> >> <
>> >>
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>> >
>> >> , mainly relating to hiccups around concurrency, cross DAG
>> prioritisation &
>> >> load on DB. Other than these, anything else you would like to advise?)
>> >>>
>> >>> I will also dive into the git history further to understand it better.
>> >>>
>> >>> Thanks.
>> >>>
>> >>>
>> >>> XD
>> >>
>>
>>

Re: Multiple Schedulers - "scheduler_lock"

Posted by Maxime Beauchemin <ma...@gmail.com>.
My original intention with the lock was preventing "double-triggering" of
task (triggering refers to the scheduler putting the message in the queue).
Airflow now has good "double-firing-prevention" of tasks (firing happens
when the worker receives the message and starts the task), even if the
scheduler was to go rogue or restart and send multiple triggers for a task
instance, the worker(s) should only start one task instance. That's done by
running the database assertions behind the conditions being met as read
database transaction (no task can alter the rows that validate the
assertion while it's getting asserted). In practice it's a little tricky
and we've seen rogue double-firing in the past (I have no idea how often
that happens).

If we do want to prevent double-triggerring, we should make sure that 2
schedulers aren't processing the same DAG or DagRun at the same time. That
would mean for the scheduler to not start the process of locked DAGs, and
by providing a mechanism to expire the locks after some time.

Has anyone experienced double firing lately? If that exist we should fix
it, but also be careful around multiple scheduler double-triggering as it
would make that problem potentially much worse.

Max

On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd...@gmail.com> wrote:

> It’s exactly what my team is doing & what I shared here earlier last year (
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> <
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E>
> )
>
> It’s somehow a “hacky” solution (and HA is not addressed), and now I’m
> thinking how we can have it more proper & robust.
>
>
> XD
>
> > On 2 Mar 2019, at 12:04 AM, Mario Urquizo <ma...@gmail.com>
> wrote:
> >
> > We have been running multiple schedulers for about 3 months.  We created
> > multiple services to run airflow schedulers.  The only difference is that
> > we have each of the schedulers pointed to a directory one level deeper
> than
> > the DAG home directory that the workers and webapp use. We have seen much
> > better scheduling performance but this does not yet help with HA.
> >
> > DAGS_HOME:
> > {airflow_home}/dags  (webapp & workers)
> > {airflow_home}/dags/group-a/ (scheduler1)
> > {airflow_home}/dags/group-b/ (scheduler2)
> > {airflow_home}/dags/group-etc/ (scheduler3)
> >
> > Not sure if this helps, just sharing in case it does.
> >
> > Thank you,
> > Mario
> >
> >
> > On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bd...@gmail.com> wrote:
> >
> >> I have done quite some work on making it possible to run multiple
> >> schedulers at the same time.  At the moment I don’t think there are real
> >> blockers actually to do so. We just don’t actively test it.
> >>
> >> Database locking is mostly in place (DagRuns and TaskInstances). And I
> >> think the worst that can happen is that a task is scheduled twice. The
> task
> >> will detect this most of the time and kill one off if concurrent if not
> >> sequential then I will run again in some occasions. Everyone is having
> >> idempotent tasks right so no harm done? ;-)
> >>
> >> Have you encountered issues? Maybe work those out?
> >>
> >> Cheers
> >> Bolke.
> >>
> >> Verstuurd vanaf mijn iPad
> >>
> >>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <xd...@gmail.com> het
> >> volgende geschreven:
> >>>
> >>> Hi Max,
> >>>
> >>> Following
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >> <
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >,
> >> I’m trying to prepare an AIP for supporting multiple-scheduler in
> Airflow
> >> (mainly for HA and Higher scheduling performance).
> >>>
> >>> Along the process of code checking, I found that there is one attribute
> >> of DagModel, “scheduler_lock”. It’s not used at all in current
> >> implementation, but it was introduced long time back (2015) to allow
> >> multiple schedulers to work together (
> >>
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> >> <
> >>
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> >
> >> ).
> >>>
> >>> Since you were the original author of it, it would be very helpful if
> >> you can kindly share why the multiple-schedulers implementation was
> removed
> >> eventually, and what challenges/complexity there were.
> >>> (You already shared a few valuable inputs in the earlier discussion
> >>
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> >> <
> >>
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> >
> >> , mainly relating to hiccups around concurrency, cross DAG
> prioritisation &
> >> load on DB. Other than these, anything else you would like to advise?)
> >>>
> >>> I will also dive into the git history further to understand it better.
> >>>
> >>> Thanks.
> >>>
> >>>
> >>> XD
> >>
>
>

Re: Multiple Schedulers - "scheduler_lock"

Posted by Deng Xiaodong <xd...@gmail.com>.
It’s exactly what my team is doing & what I shared here earlier last year (https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E <https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E> )

It’s somehow a “hacky” solution (and HA is not addressed), and now I’m thinking how we can have it more proper & robust.


XD

> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <ma...@gmail.com> wrote:
> 
> We have been running multiple schedulers for about 3 months.  We created
> multiple services to run airflow schedulers.  The only difference is that
> we have each of the schedulers pointed to a directory one level deeper than
> the DAG home directory that the workers and webapp use. We have seen much
> better scheduling performance but this does not yet help with HA.
> 
> DAGS_HOME:
> {airflow_home}/dags  (webapp & workers)
> {airflow_home}/dags/group-a/ (scheduler1)
> {airflow_home}/dags/group-b/ (scheduler2)
> {airflow_home}/dags/group-etc/ (scheduler3)
> 
> Not sure if this helps, just sharing in case it does.
> 
> Thank you,
> Mario
> 
> 
> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bd...@gmail.com> wrote:
> 
>> I have done quite some work on making it possible to run multiple
>> schedulers at the same time.  At the moment I don’t think there are real
>> blockers actually to do so. We just don’t actively test it.
>> 
>> Database locking is mostly in place (DagRuns and TaskInstances). And I
>> think the worst that can happen is that a task is scheduled twice. The task
>> will detect this most of the time and kill one off if concurrent if not
>> sequential then I will run again in some occasions. Everyone is having
>> idempotent tasks right so no harm done? ;-)
>> 
>> Have you encountered issues? Maybe work those out?
>> 
>> Cheers
>> Bolke.
>> 
>> Verstuurd vanaf mijn iPad
>> 
>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <xd...@gmail.com> het
>> volgende geschreven:
>>> 
>>> Hi Max,
>>> 
>>> Following
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>> <
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E>,
>> I’m trying to prepare an AIP for supporting multiple-scheduler in Airflow
>> (mainly for HA and Higher scheduling performance).
>>> 
>>> Along the process of code checking, I found that there is one attribute
>> of DagModel, “scheduler_lock”. It’s not used at all in current
>> implementation, but it was introduced long time back (2015) to allow
>> multiple schedulers to work together (
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>> <
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620>
>> ).
>>> 
>>> Since you were the original author of it, it would be very helpful if
>> you can kindly share why the multiple-schedulers implementation was removed
>> eventually, and what challenges/complexity there were.
>>> (You already shared a few valuable inputs in the earlier discussion
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>> <
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E>
>> , mainly relating to hiccups around concurrency, cross DAG prioritisation &
>> load on DB. Other than these, anything else you would like to advise?)
>>> 
>>> I will also dive into the git history further to understand it better.
>>> 
>>> Thanks.
>>> 
>>> 
>>> XD
>> 


Re: Multiple Schedulers - "scheduler_lock"

Posted by Mario Urquizo <ma...@gmail.com>.
We have been running multiple schedulers for about 3 months.  We created
multiple services to run airflow schedulers.  The only difference is that
we have each of the schedulers pointed to a directory one level deeper than
the DAG home directory that the workers and webapp use. We have seen much
better scheduling performance but this does not yet help with HA.

DAGS_HOME:
{airflow_home}/dags  (webapp & workers)
{airflow_home}/dags/group-a/ (scheduler1)
{airflow_home}/dags/group-b/ (scheduler2)
{airflow_home}/dags/group-etc/ (scheduler3)

Not sure if this helps, just sharing in case it does.

Thank you,
Mario


On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bd...@gmail.com> wrote:

> I have done quite some work on making it possible to run multiple
> schedulers at the same time.  At the moment I don’t think there are real
> blockers actually to do so. We just don’t actively test it.
>
> Database locking is mostly in place (DagRuns and TaskInstances). And I
> think the worst that can happen is that a task is scheduled twice. The task
> will detect this most of the time and kill one off if concurrent if not
> sequential then I will run again in some occasions. Everyone is having
> idempotent tasks right so no harm done? ;-)
>
> Have you encountered issues? Maybe work those out?
>
> Cheers
> Bolke.
>
> Verstuurd vanaf mijn iPad
>
> > Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <xd...@gmail.com> het
> volgende geschreven:
> >
> > Hi Max,
> >
> > Following
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> <
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E>,
> I’m trying to prepare an AIP for supporting multiple-scheduler in Airflow
> (mainly for HA and Higher scheduling performance).
> >
> > Along the process of code checking, I found that there is one attribute
> of DagModel, “scheduler_lock”. It’s not used at all in current
> implementation, but it was introduced long time back (2015) to allow
> multiple schedulers to work together (
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> <
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620>
> ).
> >
> > Since you were the original author of it, it would be very helpful if
> you can kindly share why the multiple-schedulers implementation was removed
> eventually, and what challenges/complexity there were.
> > (You already shared a few valuable inputs in the earlier discussion
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> <
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E>
> , mainly relating to hiccups around concurrency, cross DAG prioritisation &
> load on DB. Other than these, anything else you would like to advise?)
> >
> > I will also dive into the git history further to understand it better.
> >
> > Thanks.
> >
> >
> > XD
>

Re: Multiple Schedulers - "scheduler_lock"

Posted by Deng Xiaodong <xd...@gmail.com>.
Thanks @Bolke for the inputs!

In that case, possibly we can change part of the AIP scope to “thoroughly test if running multiple schedulers causes issue”.

And a few thoughts about your inputs:
- “Database locking is mostly in place (DagRuns and TaskInstances)”:
let’s say we already have DagRun xxx created, and another scheduler is trying to create it again. Then the creating will fail for sure because of the DB locking. But if it fails gracefully?

- “the worst that can happen is that a task is scheduled twice…..Everyone is having idempotent tasks right so no harm done?”:
Firstly it may mean that we’re “wasting” some scheduler resource (it’s like driving around to find a parking lot, and competing with other drivers meanwhile. It will be much more efficient if there is someone telling me the exact location of an available lot); secondly, some tasks are not idempotent, like inserting a few records into database.


XD


> On 1 Mar 2019, at 11:43 PM, Bolke de Bruin <bd...@gmail.com> wrote:
> 
> I have done quite some work on making it possible to run multiple schedulers at the same time.  At the moment I don’t think there are real blockers actually to do so. We just don’t actively test it.
> 
> Database locking is mostly in place (DagRuns and TaskInstances). And I think the worst that can happen is that a task is scheduled twice. The task will detect this most of the time and kill one off if concurrent if not sequential then I will run again in some occasions. Everyone is having idempotent tasks right so no harm done? ;-)
> 
> Have you encountered issues? Maybe work those out?
> 
> Cheers
> Bolke.
> 
> Verstuurd vanaf mijn iPad
> 
>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <xd...@gmail.com> het volgende geschreven:
>> 
>> Hi Max,
>> 
>> Following https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E <https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E>, I’m trying to prepare an AIP for supporting multiple-scheduler in Airflow (mainly for HA and Higher scheduling performance).
>> 
>> Along the process of code checking, I found that there is one attribute of DagModel, “scheduler_lock”. It’s not used at all in current implementation, but it was introduced long time back (2015) to allow multiple schedulers to work together (https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 <https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620> ).
>> 
>> Since you were the original author of it, it would be very helpful if you can kindly share why the multiple-schedulers implementation was removed eventually, and what challenges/complexity there were.
>> (You already shared a few valuable inputs in the earlier discussion https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E <https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E> , mainly relating to hiccups around concurrency, cross DAG prioritisation & load on DB. Other than these, anything else you would like to advise?)
>> 
>> I will also dive into the git history further to understand it better.
>> 
>> Thanks.
>> 
>> 
>> XD


Re: Multiple Schedulers - "scheduler_lock"

Posted by Bolke de Bruin <bd...@gmail.com>.
I have done quite some work on making it possible to run multiple schedulers at the same time.  At the moment I don’t think there are real blockers actually to do so. We just don’t actively test it.

Database locking is mostly in place (DagRuns and TaskInstances). And I think the worst that can happen is that a task is scheduled twice. The task will detect this most of the time and kill one off if concurrent if not sequential then I will run again in some occasions. Everyone is having idempotent tasks right so no harm done? ;-)

Have you encountered issues? Maybe work those out?

Cheers
Bolke.

Verstuurd vanaf mijn iPad

> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <xd...@gmail.com> het volgende geschreven:
> 
> Hi Max,
> 
> Following https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E <https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E>, I’m trying to prepare an AIP for supporting multiple-scheduler in Airflow (mainly for HA and Higher scheduling performance).
> 
> Along the process of code checking, I found that there is one attribute of DagModel, “scheduler_lock”. It’s not used at all in current implementation, but it was introduced long time back (2015) to allow multiple schedulers to work together (https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 <https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620> ).
> 
> Since you were the original author of it, it would be very helpful if you can kindly share why the multiple-schedulers implementation was removed eventually, and what challenges/complexity there were.
> (You already shared a few valuable inputs in the earlier discussion https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E <https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E> , mainly relating to hiccups around concurrency, cross DAG prioritisation & load on DB. Other than these, anything else you would like to advise?)
> 
> I will also dive into the git history further to understand it better.
> 
> Thanks.
> 
> 
> XD