You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Joy Gao <jo...@wepay.com> on 2018/03/01 00:39:08 UTC

Re: How to add hooks for strong deployment consistency?

+1 on DagFetcher abstraction, very airflow-esque :)

On Wed, Feb 28, 2018 at 11:25 AM, Maxime Beauchemin
<ma...@gmail.com> wrote:
> Addressing a few of your questions / concerns:
>
> * The scheduler uses a multiprocess queue to queue up tasks, each
> subprocess is in charge of a single DAG "scheduler cycle" which triggers
> what it can for active DagRuns. Currently it fills the DagBag from the
> local file system, looking for a specific module where the master process
> last saw that DAG. Fetching the DAG is a metadata operation, DAG artifacts
> shouldn't be too large, we can assume that it takes seconds at most to
> fetch a DAG, which is ok. We generally assume that the scheduler should
> fully cycle every minute or so. Version-aware DagFetcher could also
> implement some sort of caching if that was a concern (shouldn't be though).
> * For consistency within the whole DagRun, the scheduler absolutely has to
> read the right version. If tasks got removed they would never get scheduled
> and consistency cannot be achieved.
> * TaskInstances get created the first time they are identified as runnable
> by the scheduler and are born with a queued status I believe (from memory,
> haven't read the latest code to confirm). The worker double checks and sets
> it as running as part of a database transaction to avoid double-firing.
>
> Max
>
> On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <ch...@crpalmer.com> wrote:
>
>> I'll preface this with the fact that I'm relatively new to Airflow, and
>> haven't played around with a lot of the internals.
>>
>> I find the idea of a DagFetcher interesting but would we worry about
>> slowing down the scheduler significantly? If the scheduler is having to
>> "fetch" multiple different DAG versions, be it git refs or artifacts from
>> Artifactory, we are talking about adding significant time to each scheduler
>> run. Also how would the scheduler know which DAGs to fetch from where if
>> there aren't local files on disk listing those DAGs? Maybe I'm missing
>> something in the implementation.
>>
>> It seems to me that the fetching of the different versions should be
>> delegated to the Task (or TaskInstance) itself. That ensures we only spend
>> the time to "fetch" the version that is needed when it is needed. One down
>> side might be that each TaskInstance running for the same version of the
>> DAG might end up doing the "fetch" independently (duplicating that work).
>>
>> I think this could be done by adding some version attribute to the DagRun
>> that gets set at creation, and have the scheduler pass that version to the
>> TaskInstances when they are created. You could even extend this so that you
>> could have an arbitrary set of "executor_parameters" that get set on a
>> DagRun and are passed to TaskInstances. Then the specific Executor class
>> that is running that TaskInstance could handle the "executor_parameters" as
>> it sees fit.
>>
>> One thing I'm not clear on is how and when TaskInstances are created. When
>> the scheduler first sees a specific DagRun do all the TaskInstances get
>> created immediately, but only some of them get queued? Or does the
>> scheduler only create those TaskInstances which can be queued right now?
>>
>> In particular if a DagRun gets created and while it is running the DAG is
>> updated and a new Task is added, will the scheduler pick up that new Task
>> for the running DagRun? If the answer is yes, then my suggestion above
>> would run the risk of scheduling a Task for a DAG version where that Task
>> didn't exist. I'm sure you could handle that somewhat gracefully but it's a
>> bit ugly.
>>
>> Chris
>>
>> On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin <
>> maximebeauchemin@gmail.com> wrote:
>>
>> > At a higher level I want to say a few things about the idea of enforcing
>> > version consistency within a DagRun.
>> >
>> > One thing we've been talking about is the need for a "DagFetcher"
>> > abstraction, where it's first implementation that would replace and mimic
>> > the current one would be "FileSystemDagFetcher". One specific DagFetcher
>> > implementation may or may not support version semantics, but if it does
>> > should be able to receive a version id and return the proper version of
>> the
>> > DAG object. For instance that first "FileSystemDagFetcher" would not
>> > support version semantic, but perhaps a "GitRepoDagFetcher" would, or an
>> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well.
>> >
>> > Of course that assumes that the scheduler knows and stores the active
>> > version number when generating a new DagRun, and for that information to
>> be
>> > leveraged on subsequent scheduler cycles and on workers when task are
>> > executed.
>> >
>> > This could also enable things like "remote" backfills (non local,
>> > parallelized) of a DAG definition that's on an arbitrary git ref
>> (assuming
>> > a "GitRepoDagFetcher").
>> >
>> > There are [perhaps] unintuitive implications where clearing a single task
>> > would then re-run the old DAG definition on that task (since the version
>> > was stamped in the DagRun and hasn't changed), but deleting/recreating a
>> > DagRun would run the latest version (or any other version that may be
>> > specified for that matter).
>> >
>> > I'm unclear on how much work that represents exactly, but it's certainly
>> > doable and may only require to change part of the DagBag class and a few
>> > other places.
>> >
>> > Max
>> >
>> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <dc...@gmail.com>
>> wrote:
>> >
>> > > Thanks for your feedback!
>> > >
>> > > Option 1 is a non-starter for us. The reason is we have DAGs that take
>> 9+
>> > > hours to run.
>> > >
>> > > Option 2 is more where my mind was going, but it's rather large.  How I
>> > see
>> > > it you need a MVCC DagBag that's aware of multiple versions (what
>> > provides
>> > > version?).  Assuming you can track active dag runs pointing to which
>> > > versions you know how to cleanup (fine with external).  The pro here is
>> > you
>> > > have snapshot isolation for dag_run, con is more bookkeeping and
>> require
>> > > deploy to work with this (last part may be a good thing though).
>> > >
>> > > The only other option I can think of is to lock deploy so the system
>> only
>> > > picks up new versions when no dag_run holds the lock.  This is flawed
>> for
>> > > many reasons, but breaks horrible for dag_runs that takes minutes (I
>> > assume
>> > > 99% do).
>> > >
>> > >
>> > >
>> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <jo...@wepay.com> wrote:
>> > >
>> > > > Hi David!
>> > > >
>> > > > Thank you for clarifying, I think I understand your concern now. We
>> > > > currently also work around this by making sure a dag is turned off
>> > > > when we deploy a new version. We also make sure our jobs are
>> > > > idempotent and retry-enabled in the case when we forget to turn off
>> > > > the job, so the issue hasn't caused us too much headache.
>> > > >
>> > > > I do agree that it would be nice for Airflow to have the option to
>> > > > guarantee a single version of dag per dag run. I see two approaches:
>> > > >
>> > > > (1) If a dag is updated, the current dagrun fails and/or retries.
>> > > > (2) If a dag is updated, the current dagrun continues but uses
>> version
>> > > > before the update.
>> > > >
>> > > > (1) requires some mechanism to compare dag generations. One option is
>> > > > to hash the dagfile and storing that value to the dagrun table, and
>> > > > compare against it each time a task is running. And in the case if
>> the
>> > > > hash value is different, update the hash value, then fail/retry the
>> > > > dag. I think this is a fairly safe approach.
>> > > >
>> > > > (2) is trickier. A dag only has a property "fileloc" which tracks the
>> > > > location of the dag file, but the actual content of the dag file is
>> > > > never versioned. When a task instance starts running, it dynamically
>> > > > re-processes the dag file specified by the fileloc, generate all the
>> > > > task objects from the dag file, and fetch the task object by task_id
>> > > > in order to execute it. So in order to guarantee each dagrun to run a
>> > > > specific version, previous versions must be maintained on disk
>> somehow
>> > > > (maintaining this information in memory is difficult, since if the
>> > > > scheduler/worker shuts down, that information is lost). This makes it
>> > > > a pretty big change, and I haven't thought much on how to implement
>> > > > it.
>> > > >
>> > > > I'm personally leaning towards (1) for sake of simplicity. Note that
>> > > > some users may not want dag to fail/retry even when dag is updated,
>> so
>> > > > this should be an optional feature, not required.
>> > > >
>> > > > My scheduler-foo isn't that great, so curious what others have to say
>> > > > about this.
>> > > >
>> > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <dc...@gmail.com>
>> > > wrote:
>> > > > > Thanks for the reply Joy, let me walk you though things as they are
>> > > today
>> > > > >
>> > > > > 1) we don't stop airflow or disable DAGs while deploying updates to
>> > > > logic,
>> > > > > this is done live once its released
>> > > > > 2) the python script in the DAG folder doesn't actually have DAGs
>> in
>> > it
>> > > > but
>> > > > > is a shim layer to allow us to deploy in a atomic way for a single
>> > host
>> > > > >   2.1) this script reads a file on local disk (less than disk page
>> > > size)
>> > > > to
>> > > > > find latest git commit deployed
>> > > > >   2.2) re-does the airflow DAG load process but pointing to the git
>> > > > commit
>> > > > > path
>> > > > >
>> > > > > Example directory structure
>> > > > >
>> > > > > /airflow/dags/shim.py
>> > > > > /airflow/real_dags/
>> > > > >                             /latest # pointer to latest commit
>> > > > >                             /[git commit]/
>> > > > >
>> > > > > This is how we make sure deploys are consistent within a single
>> task.
>> > > > >
>> > > > >
>> > > > > Now, lets assume we have a fully atomic commit process and are able
>> > to
>> > > > > upgrade DAGs at the exact same moment.
>> > > > >
>> > > > > At time T0 the scheduler knows of DAG V1 and schedules two tasks,
>> > > Task1,
>> > > > > and Task2
>> > > > > At time T1 Task1 is picked up by Worker1, so starts executing the
>> > task
>> > > > (V1
>> > > > > logic)
>> > > > > At time T2 deploy commit happens, current DAG version: V2
>> > > > > At time T3, Task2 is picked up by Worker2, so starts executing the
>> > task
>> > > > (V2
>> > > > > logic)
>> > > > >
>> > > > > In many cases this isn't really a problem (tuning config change to
>> > > hadoop
>> > > > > job), but as we have more people using Airflow this is causing a
>> lot
>> > of
>> > > > > time spent debugging why production acted differently than expected
>> > > (the
>> > > > > problem was already fixed... why is it still here?).  We also see
>> > that
>> > > > some
>> > > > > tasks expect a given behavior from other tasks, and since they live
>> > in
>> > > > the
>> > > > > same git repo they can modify both tasks at the same time if a
>> > breaking
>> > > > > change is needed, but when this rolls out to prod there isn't a way
>> > to
>> > > do
>> > > > > this other than turn off the DAG, and login to all hosts to verify
>> > > fully
>> > > > > deployed.
>> > > > >
>> > > > > We would like to remove this confusion and make
>> generations/versions
>> > > > (same
>> > > > > thing really) exposed to users and make sure for a single dag_run
>> > only
>> > > > one
>> > > > > version is used.
>> > > > >
>> > > > > I hope this is more clear.
>> > > > >
>> > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <jo...@wepay.com> wrote:
>> > > > >
>> > > > >> Hi David,
>> > > > >>
>> > > > >> Do you mind providing a concrete example of the scenario in which
>> > > > >> scheduler/workers see different states (I'm not 100% sure if I
>> > > > understood
>> > > > >> the issue at hand).
>> > > > >>
>> > > > >> And by same dag generation, are you referring to the dag version?
>> > (DAG
>> > > > >> version is currently not supported at all, but I can see it being
>> a
>> > > > >> building block for future use cases).
>> > > > >>
>> > > > >> Joy
>> > > > >>
>> > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell <
>> dcapwell@gmail.com>
>> > > > wrote:
>> > > > >>
>> > > > >> > My current thinking is to add a field to the dag table that is
>> > > > optional
>> > > > >> and
>> > > > >> > provided by the dag. We currently intercept the load path do
>> could
>> > > use
>> > > > >> this
>> > > > >> > field to make sure we load the same generation.  My concern here
>> > is
>> > > > the
>> > > > >> > interaction with the scheduler, not as familiar with that logic
>> to
>> > > > >> predict
>> > > > >> > corner cases were this would fail.
>> > > > >> >
>> > > > >> > Any other recommendations for how this could be done?
>> > > > >> >
>> > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell <
>> dcapwell@gmail.com>
>> > > > wrote:
>> > > > >> >
>> > > > >> > > We have been using airflow for logic that delegates to other
>> > > > systems so
>> > > > >> > > inject a task all tasks depends to make sure all resources
>> used
>> > > are
>> > > > the
>> > > > >> > > same for all tasks in the dag. This works well for tasks that
>> > > > delegates
>> > > > >> > to
>> > > > >> > > external systems but people are starting to need to run logic
>> in
>> > > > >> airflow
>> > > > >> > > and the fact that scheduler and all workers can see different
>> > > > states is
>> > > > >> > > causing issues
>> > > > >> > >
>> > > > >> > > We can make sure that all the code is deployed in a consistent
>> > way
>> > > > but
>> > > > >> > > need help from the scheduler to tell the workers the current
>> > > > generation
>> > > > >> > for
>> > > > >> > > a DAG.
>> > > > >> > >
>> > > > >> > > My question is, what would be the best way to modify airflow
>> to
>> > > > allow
>> > > > >> > DAGs
>> > > > >> > > to define a generation value that the scheduler could send to
>> > > > workers?
>> > > > >> > >
>> > > > >> > > Thanks
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > > >
>> > > >
>> > >
>> >
>>


Re: How to add hooks for strong deployment consistency?

Posted by James Meickle <jm...@quantopian.com>.
I'm currently looking into building a dynamic DAG that will load
user-provided data daily, into user-generated DB tables, using provided
schema definitions. There will be some ordering/dependencies such that
certain datasets depend on others. Since users can add new datasets at any
time, both the number of nodes and their growth from day to day is
unbounded. The way I am thinking of approaching this is to build an
"execution plan" in advance and distribute that as an artifact, instead of
having each Airflow worker re-evaluate the DAG and re-query the database of
user datasets.

That's doable right now by shipping a static artifact, but there are many
warts. I could definitely see this as something that fits well with the
described Airflow "DAGFetcher" abstraction. My wish list for an overall
system based on that concept would look like this:

- The DAGFetcher API is aware of execution dates, so it can query the DAG
generator service with reference to a time period (not just "right now").
- The DAGFetcher API is aware of previous outputs, whether or not the DAG
generator service is. This isn't a cache (performance optimization, records
are ephemeral), but a ledger (immutable point in time record). For some
services, like git, the service is also the ledger. In my case, the DAGRun
is the product of the current git commit (versioned) and the point in time
view of a database (unversioned).
- I can view a "pending" DAGRun in the UI, representing a DAGRun that is
expected but not yet evaluated by the DAGFetcher.
- I can view a "exception" DAGRun in the UI, representing a DAGRun where
the DAGFetcher raised an exception, and retry fetching through the UI.
- I can alert if a DAGRun is in "pending" state for too long, or enters
"exception".
- Multiple DAGRuns can reference the same DAGFetcher result, if there's
been no changes from day to day.
- The UI can represent multiple DAGFetcher results for the same DAG, such
as showing a blank entry for execution dates where a task didn't exist (it
was there historically but was removed, or it's new)

On Mon, Mar 5, 2018 at 4:54 PM, Maxime Beauchemin <
maximebeauchemin@gmail.com> wrote:

> A few notes about the pickling topic to answer William:
> * first reason why we cannot use pickles: jinja template objects are not
> picklable, there's a hack to pickle the content of the template instead of
> the object, but that breaks jinja inheritance and imports
> * pickles are messy, and Airflow allows users to attach objects to DAG
> objects (on_error_callable, on_retry_callable, params, ...) and pickles
> will go down the recursive rabbit hole and import everything large chunks
> of what's in `sys.modules` sometimes. (probably could be mitigated)
> * pickles are a messy serialization format, lots of drawbacks, security
> issues, incompatibility between py2 and py3, ...
> * tpickles have a bad reputation, many people advised avoiding it like
> plague since the feature was first built
> * original approach of pickling to the db is kind of a hack
>
> I also agree that caching is probably required especially around large DAGs
> and for "semi-stateless" web servers to operate properly.
>
> Max
>
>
> On Thu, Mar 1, 2018 at 1:15 PM, David Capwell <dc...@gmail.com> wrote:
>
> > We need two versions but most likely would not use either... That being
> > artifactory and git (would really love for this to be pluggable!!!!!)
> >
> > We have our own dag fetch logic which right now pulls from git, caches,
> > then redirect airflow to that directory.  For us we have airflow
> automated
> > so you push a button to get a cluster, for this reason there are enough
> > instances that we have DDOS attacked git (opps).
> >
> > We are planning to change this to fetch from artifactory, and have a
> > stateful proxy for each cluster so we stop DDOS attacking core
> > infrastructure.
> >
> > On Mar 1, 2018 11:45 AM, "William Wong" <wo...@gmail.com> wrote:
> >
> > Also relatively new to Airflow here. Same as David above, Option 1 is not
> > an option for us either for the same reasons.
> >
> > What I would like to see is that it can be user selectable / modifiable.
> >
> > Use Case:
> > We have a DAG with thousands of task dependencies/tasks. After 24hrs of
> > progressing, we need to take a subset of those tasks and rerun them with
> a
> > different configuration (reasons range from incorrect parameters to
> > infrastructure issues, doesn't really matter here).
> >
> > What I hope can happen:
> > 1. Pause DAG
> > 2. Upload and tag newest dag version
> > 3. Set dag_run to use latest tag,
> > 4. Resolve DAG sync using <insert smart diff behavior here that is
> clearly
> > defined/documented>
> > 5. Unpause DAG
> >
> > I do like the DagFetcher idea. This logic should shim in nicely in the
> > DagBag code. Maxime, I also vote for the GitDagFetcher. Two thoughts
> about
> > the GitDagFetcher:
> > - I probably won't use fuse across 100's of nodes in my k8s/swarm. Not
> sure
> > how this would work without too much trouble.
> > - It might be confusing if some git sha's have no changes to a Dag. all
> > existing runs will be marked as outdated? probably better than nothing
> > anyway.
> >
> > I also vote to have some form of sort of caching behavior. I prefer not
> to
> > read in DAGs all the time. i.e. from the webserver, scheduler, *and* all
> > workers before starting any task over and over again. This is because,
> > unfortunately, the assumption that a DAG only takes seconds to load does
> > not hold true for large dags. With only 10k tasks within a DAG it's
> already
> > on the order of minutes. This would be untenable as we scale up to even
> > larger tags. (Though, I'm testing a fix for this so maybe this might not
> > actually be an issue anymore)
> >
> > FWIW, it seems to me that the DagPickle feature (which, for the love me I
> > can't seem to get it to work, no wonder it's being deprecated) would have
> > solved a lot of these issues fairly easily. Something along the lines of
> > adding pickle_id to dag_run should at  help the scheduler identify the
> DAG
> > version to load and queue. But I'm not sure if it can delete out of sync
> > task instances.
> >
> > Lastly, sorry for the brain dump and derailing the topic, for the
> workers,
> > it seems that importing/loading in the DAG just to execute a single task
> is
> > a bit overkill isn't it? If we kept a caching feature (i.e. pickling),
> > perhaps we can simply cache the task and not worry about the rest of the
> > DAG tasks?
> >
> > Will
> >
> > On Thu, Mar 1, 2018 at 11:30 AM, Maxime Beauchemin <
> >
> > maximebeauchemin@gmail.com> wrote:
> >
> > > I'm curious to hear which DagFetcher abstraction people would build or
> > want
> > > to use.
> > >
> > > So far it sounded like the most popular and flexible approach would be
> a
> > > `GitDagFetcher` where all SHAs and refs become a possibility, as
> opposed
> > to
> > > say a TarballOnS3DagFetcher which would require more manual artifact
> > > management and versioning, which represent additional [human] workflow
> on
> > > top of the already existing git-based workflow.
> > >
> > > One way I've seen this done before is by using this Git fuse (file
> system
> > > in user space) hack that creates a virtual filesystem where all SHAs
> and
> > > refs in the Git repo are exposed as a subfolder, and under each ref
> > > subfolder the whole repo sits as of that ref. Of course all the files
> are
> > > virtual and fetched at access time by the virtual filesystem using the
> > git
> > > api. So if you simply point the DagBag loader to the right [virtual]
> > > directory, it will import the right version of the DAG. In the git
> world,
> > > the alternative to that is managing temp folders and doing shallow
> clones
> > > which seems like much more of a headache. Note that one tradeoff is
> that
> > if
> > > git and whatever it depends has then a need to be highly available.
> > >
> > > Max
> > >
> > > On Wed, Feb 28, 2018 at 6:55 PM, David Capwell <dc...@gmail.com>
> > wrote:
> > >
> > > > Thanks for all the details! With a pluggable fetcher we would be able
> > to
> > > > add our own logic for how to fetch so sounds like a good place to
> start
> > > for
> > > > something like this!
> > > >
> > > > On Wed, Feb 28, 2018, 4:39 PM Joy Gao <jo...@wepay.com> wrote:
> > > >
> > > > > +1 on DagFetcher abstraction, very airflow-esque :)
> > > > >
> > > > > On Wed, Feb 28, 2018 at 11:25 AM, Maxime Beauchemin
> > > > > <ma...@gmail.com> wrote:
> > > > > > Addressing a few of your questions / concerns:
> > > > > >
> > > > > > * The scheduler uses a multiprocess queue to queue up tasks, each
> > > > > > subprocess is in charge of a single DAG "scheduler cycle" which
> > > > triggers
> > > > > > what it can for active DagRuns. Currently it fills the DagBag
> from
> > > the
> > > > > > local file system, looking for a specific module where the master
> > > > process
> > > > > > last saw that DAG. Fetching the DAG is a metadata operation, DAG
> > > > > artifacts
> > > > > > shouldn't be too large, we can assume that it takes seconds at
> most
> > > to
> > > > > > fetch a DAG, which is ok. We generally assume that the scheduler
> > > should
> > > > > > fully cycle every minute or so. Version-aware DagFetcher could
> also
> > > > > > implement some sort of caching if that was a concern (shouldn't
> be
> > > > > though).
> > > > > > * For consistency within the whole DagRun, the scheduler
> absolutely
> > > has
> > > > > to
> > > > > > read the right version. If tasks got removed they would never get
> > > > > scheduled
> > > > > > and consistency cannot be achieved.
> > > > > > * TaskInstances get created the first time they are identified as
> > > > > runnable
> > > > > > by the scheduler and are born with a queued status I believe
> (from
> > > > > memory,
> > > > > > haven't read the latest code to confirm). The worker double
> checks
> > > and
> > > > > sets
> > > > > > it as running as part of a database transaction to avoid
> > > double-firing.
> > > > > >
> > > > > > Max
> > > > > >
> > > > > > On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <
> chris@crpalmer.com>
> > > > > wrote:
> > > > > >
> > > > > >> I'll preface this with the fact that I'm relatively new to
> > Airflow,
> > > > and
> > > > > >> haven't played around with a lot of the internals.
> > > > > >>
> > > > > >> I find the idea of a DagFetcher interesting but would we worry
> > about
> > > > > >> slowing down the scheduler significantly? If the scheduler is
> > having
> > > > to
> > > > > >> "fetch" multiple different DAG versions, be it git refs or
> > artifacts
> > > > > from
> > > > > >> Artifactory, we are talking about adding significant time to
> each
> > > > > scheduler
> > > > > >> run. Also how would the scheduler know which DAGs to fetch from
> > > where
> > > > if
> > > > > >> there aren't local files on disk listing those DAGs? Maybe I'm
> > > missing
> > > > > >> something in the implementation.
> > > > > >>
> > > > > >> It seems to me that the fetching of the different versions
> should
> > be
> > > > > >> delegated to the Task (or TaskInstance) itself. That ensures we
> > only
> > > > > spend
> > > > > >> the time to "fetch" the version that is needed when it is
> needed.
> > > One
> > > > > down
> > > > > >> side might be that each TaskInstance running for the same
> version
> > of
> > > > the
> > > > > >> DAG might end up doing the "fetch" independently (duplicating
> that
> > > > > work).
> > > > > >>
> > > > > >> I think this could be done by adding some version attribute to
> the
> > > > > DagRun
> > > > > >> that gets set at creation, and have the scheduler pass that
> > version
> > > to
> > > > > the
> > > > > >> TaskInstances when they are created. You could even extend this
> so
> > > > that
> > > > > you
> > > > > >> could have an arbitrary set of "executor_parameters" that get
> set
> > > on a
> > > > > >> DagRun and are passed to TaskInstances. Then the specific
> Executor
> > > > class
> > > > > >> that is running that TaskInstance could handle the
> > > > > "executor_parameters" as
> > > > > >> it sees fit.
> > > > > >>
> > > > > >> One thing I'm not clear on is how and when TaskInstances are
> > > created.
> > > > > When
> > > > > >> the scheduler first sees a specific DagRun do all the
> > TaskInstances
> > > > get
> > > > > >> created immediately, but only some of them get queued? Or does
> the
> > > > > >> scheduler only create those TaskInstances which can be queued
> > right
> > > > now?
> > > > > >>
> > > > > >> In particular if a DagRun gets created and while it is running
> the
> > > DAG
> > > > > is
> > > > > >> updated and a new Task is added, will the scheduler pick up that
> > new
> > > > > Task
> > > > > >> for the running DagRun? If the answer is yes, then my suggestion
> > > above
> > > > > >> would run the risk of scheduling a Task for a DAG version where
> > that
> > > > > Task
> > > > > >> didn't exist. I'm sure you could handle that somewhat gracefully
> > but
> > > > > it's a
> > > > > >> bit ugly.
> > > > > >>
> > > > > >> Chris
> > > > > >>
> > > > > >> On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin <
> > > > > >> maximebeauchemin@gmail.com> wrote:
> > > > > >>
> > > > > >> > At a higher level I want to say a few things about the idea of
> > > > > enforcing
> > > > > >> > version consistency within a DagRun.
> > > > > >> >
> > > > > >> > One thing we've been talking about is the need for a
> > "DagFetcher"
> > > > > >> > abstraction, where it's first implementation that would
> replace
> > > and
> > > > > mimic
> > > > > >> > the current one would be "FileSystemDagFetcher". One specific
> > > > > DagFetcher
> > > > > >> > implementation may or may not support version semantics, but
> if
> > it
> > > > > does
> > > > > >> > should be able to receive a version id and return the proper
> > > version
> > > > > of
> > > > > >> the
> > > > > >> > DAG object. For instance that first "FileSystemDagFetcher"
> would
> > > not
> > > > > >> > support version semantic, but perhaps a "GitRepoDagFetcher"
> > would,
> > > > or
> > > > > an
> > > > > >> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as
> well.
> > > > > >> >
> > > > > >> > Of course that assumes that the scheduler knows and stores the
> > > > active
> > > > > >> > version number when generating a new DagRun, and for that
> > > > information
> > > > > to
> > > > > >> be
> > > > > >> > leveraged on subsequent scheduler cycles and on workers when
> > task
> > > > are
> > > > > >> > executed.
> > > > > >> >
> > > > > >> > This could also enable things like "remote" backfills (non
> > local,
> > > > > >> > parallelized) of a DAG definition that's on an arbitrary git
> ref
> > > > > >> (assuming
> > > > > >> > a "GitRepoDagFetcher").
> > > > > >> >
> > > > > >> > There are [perhaps] unintuitive implications where clearing a
> > > single
> > > > > task
> > > > > >> > would then re-run the old DAG definition on that task (since
> the
> > > > > version
> > > > > >> > was stamped in the DagRun and hasn't changed), but
> > > > > deleting/recreating a
> > > > > >> > DagRun would run the latest version (or any other version that
> > may
> > > > be
> > > > > >> > specified for that matter).
> > > > > >> >
> > > > > >> > I'm unclear on how much work that represents exactly, but it's
> > > > > certainly
> > > > > >> > doable and may only require to change part of the DagBag class
> > > and a
> > > > > few
> > > > > >> > other places.
> > > > > >> >
> > > > > >> > Max
> > > > > >> >
> > > > > >> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <
> > > dcapwell@gmail.com>
> > > > > >> wrote:
> > > > > >> >
> > > > > >> > > Thanks for your feedback!
> > > > > >> > >
> > > > > >> > > Option 1 is a non-starter for us. The reason is we have DAGs
> > > that
> > > > > take
> > > > > >> 9+
> > > > > >> > > hours to run.
> > > > > >> > >
> > > > > >> > > Option 2 is more where my mind was going, but it's rather
> > large.
> > > > > How I
> > > > > >> > see
> > > > > >> > > it you need a MVCC DagBag that's aware of multiple versions
> > > (what
> > > > > >> > provides
> > > > > >> > > version?).  Assuming you can track active dag runs pointing
> to
> > > > which
> > > > > >> > > versions you know how to cleanup (fine with external).  The
> > pro
> > > > > here is
> > > > > >> > you
> > > > > >> > > have snapshot isolation for dag_run, con is more bookkeeping
> > and
> > > > > >> require
> > > > > >> > > deploy to work with this (last part may be a good thing
> > though).
> > > > > >> > >
> > > > > >> > > The only other option I can think of is to lock deploy so
> the
> > > > system
> > > > > >> only
> > > > > >> > > picks up new versions when no dag_run holds the lock.  This
> is
> > > > > flawed
> > > > > >> for
> > > > > >> > > many reasons, but breaks horrible for dag_runs that takes
> > > minutes
> > > > (I
> > > > > >> > assume
> > > > > >> > > 99% do).
> > > > > >> > >
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <jo...@wepay.com>
> wrote:
> > > > > >> > >
> > > > > >> > > > Hi David!
> > > > > >> > > >
> > > > > >> > > > Thank you for clarifying, I think I understand your
> concern
> > > now.
> > > > > We
> > > > > >> > > > currently also work around this by making sure a dag is
> > turned
> > > > off
> > > > > >> > > > when we deploy a new version. We also make sure our jobs
> are
> > > > > >> > > > idempotent and retry-enabled in the case when we forget to
> > > turn
> > > > > off
> > > > > >> > > > the job, so the issue hasn't caused us too much headache.
> > > > > >> > > >
> > > > > >> > > > I do agree that it would be nice for Airflow to have the
> > > option
> > > > to
> > > > > >> > > > guarantee a single version of dag per dag run. I see two
> > > > > approaches:
> > > > > >> > > >
> > > > > >> > > > (1) If a dag is updated, the current dagrun fails and/or
> > > > retries.
> > > > > >> > > > (2) If a dag is updated, the current dagrun continues but
> > uses
> > > > > >> version
> > > > > >> > > > before the update.
> > > > > >> > > >
> > > > > >> > > > (1) requires some mechanism to compare dag generations.
> One
> > > > > option is
> > > > > >> > > > to hash the dagfile and storing that value to the dagrun
> > > table,
> > > > > and
> > > > > >> > > > compare against it each time a task is running. And in the
> > > case
> > > > if
> > > > > >> the
> > > > > >> > > > hash value is different, update the hash value, then
> > > fail/retry
> > > > > the
> > > > > >> > > > dag. I think this is a fairly safe approach.
> > > > > >> > > >
> > > > > >> > > > (2) is trickier. A dag only has a property "fileloc" which
> > > > tracks
> > > > > the
> > > > > >> > > > location of the dag file, but the actual content of the
> dag
> > > file
> > > > > is
> > > > > >> > > > never versioned. When a task instance starts running, it
> > > > > dynamically
> > > > > >> > > > re-processes the dag file specified by the fileloc,
> generate
> > > all
> > > > > the
> > > > > >> > > > task objects from the dag file, and fetch the task object
> by
> > > > > task_id
> > > > > >> > > > in order to execute it. So in order to guarantee each
> dagrun
> > > to
> > > > > run a
> > > > > >> > > > specific version, previous versions must be maintained on
> > disk
> > > > > >> somehow
> > > > > >> > > > (maintaining this information in memory is difficult,
> since
> > if
> > > > the
> > > > > >> > > > scheduler/worker shuts down, that information is lost).
> This
> > > > > makes it
> > > > > >> > > > a pretty big change, and I haven't thought much on how to
> > > > > implement
> > > > > >> > > > it.
> > > > > >> > > >
> > > > > >> > > > I'm personally leaning towards (1) for sake of simplicity.
> > > Note
> > > > > that
> > > > > >> > > > some users may not want dag to fail/retry even when dag is
> > > > > updated,
> > > > > >> so
> > > > > >> > > > this should be an optional feature, not required.
> > > > > >> > > >
> > > > > >> > > > My scheduler-foo isn't that great, so curious what others
> > have
> > > > to
> > > > > say
> > > > > >> > > > about this.
> > > > > >> > > >
> > > > > >> > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <
> > > > > dcapwell@gmail.com>
> > > > > >> > > wrote:
> > > > > >> > > > > Thanks for the reply Joy, let me walk you though things
> as
> > > > they
> > > > > are
> > > > > >> > > today
> > > > > >> > > > >
> > > > > >> > > > > 1) we don't stop airflow or disable DAGs while deploying
> > > > > updates to
> > > > > >> > > > logic,
> > > > > >> > > > > this is done live once its released
> > > > > >> > > > > 2) the python script in the DAG folder doesn't actually
> > have
> > > > > DAGs
> > > > > >> in
> > > > > >> > it
> > > > > >> > > > but
> > > > > >> > > > > is a shim layer to allow us to deploy in a atomic way
> for
> > a
> > > > > single
> > > > > >> > host
> > > > > >> > > > >   2.1) this script reads a file on local disk (less than
> > > disk
> > > > > page
> > > > > >> > > size)
> > > > > >> > > > to
> > > > > >> > > > > find latest git commit deployed
> > > > > >> > > > >   2.2) re-does the airflow DAG load process but pointing
> > to
> > > > the
> > > > > git
> > > > > >> > > > commit
> > > > > >> > > > > path
> > > > > >> > > > >
> > > > > >> > > > > Example directory structure
> > > > > >> > > > >
> > > > > >> > > > > /airflow/dags/shim.py
> > > > > >> > > > > /airflow/real_dags/
> > > > > >> > > > >                             /latest # pointer to latest
> > > commit
> > > > > >> > > > >                             /[git commit]/
> > > > > >> > > > >
> > > > > >> > > > > This is how we make sure deploys are consistent within a
> > > > single
> > > > > >> task.
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > Now, lets assume we have a fully atomic commit process
> and
> > > are
> > > > > able
> > > > > >> > to
> > > > > >> > > > > upgrade DAGs at the exact same moment.
> > > > > >> > > > >
> > > > > >> > > > > At time T0 the scheduler knows of DAG V1 and schedules
> two
> > > > > tasks,
> > > > > >> > > Task1,
> > > > > >> > > > > and Task2
> > > > > >> > > > > At time T1 Task1 is picked up by Worker1, so starts
> > > executing
> > > > > the
> > > > > >> > task
> > > > > >> > > > (V1
> > > > > >> > > > > logic)
> > > > > >> > > > > At time T2 deploy commit happens, current DAG version:
> V2
> > > > > >> > > > > At time T3, Task2 is picked up by Worker2, so starts
> > > executing
> > > > > the
> > > > > >> > task
> > > > > >> > > > (V2
> > > > > >> > > > > logic)
> > > > > >> > > > >
> > > > > >> > > > > In many cases this isn't really a problem (tuning config
> > > > change
> > > > > to
> > > > > >> > > hadoop
> > > > > >> > > > > job), but as we have more people using Airflow this is
> > > > causing a
> > > > > >> lot
> > > > > >> > of
> > > > > >> > > > > time spent debugging why production acted differently
> than
> > > > > expected
> > > > > >> > > (the
> > > > > >> > > > > problem was already fixed... why is it still here?).  We
> > > also
> > > > > see
> > > > > >> > that
> > > > > >> > > > some
> > > > > >> > > > > tasks expect a given behavior from other tasks, and
> since
> > > they
> > > > > live
> > > > > >> > in
> > > > > >> > > > the
> > > > > >> > > > > same git repo they can modify both tasks at the same
> time
> > > if a
> > > > > >> > breaking
> > > > > >> > > > > change is needed, but when this rolls out to prod there
> > > isn't
> > > > a
> > > > > way
> > > > > >> > to
> > > > > >> > > do
> > > > > >> > > > > this other than turn off the DAG, and login to all hosts
> > to
> > > > > verify
> > > > > >> > > fully
> > > > > >> > > > > deployed.
> > > > > >> > > > >
> > > > > >> > > > > We would like to remove this confusion and make
> > > > > >> generations/versions
> > > > > >> > > > (same
> > > > > >> > > > > thing really) exposed to users and make sure for a
> single
> > > > > dag_run
> > > > > >> > only
> > > > > >> > > > one
> > > > > >> > > > > version is used.
> > > > > >> > > > >
> > > > > >> > > > > I hope this is more clear.
> > > > > >> > > > >
> > > > > >> > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <
> joyg@wepay.com>
> > > > > wrote:
> > > > > >> > > > >
> > > > > >> > > > >> Hi David,
> > > > > >> > > > >>
> > > > > >> > > > >> Do you mind providing a concrete example of the
> scenario
> > in
> > > > > which
> > > > > >> > > > >> scheduler/workers see different states (I'm not 100%
> sure
> > > if
> > > > I
> > > > > >> > > > understood
> > > > > >> > > > >> the issue at hand).
> > > > > >> > > > >>
> > > > > >> > > > >> And by same dag generation, are you referring to the
> dag
> > > > > version?
> > > > > >> > (DAG
> > > > > >> > > > >> version is currently not supported at all, but I can
> see
> > it
> > > > > being
> > > > > >> a
> > > > > >> > > > >> building block for future use cases).
> > > > > >> > > > >>
> > > > > >> > > > >> Joy
> > > > > >> > > > >>
> > > > > >> > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell <
> > > > > >> dcapwell@gmail.com>
> > > > > >> > > > wrote:
> > > > > >> > > > >>
> > > > > >> > > > >> > My current thinking is to add a field to the dag
> table
> > > that
> > > > > is
> > > > > >> > > > optional
> > > > > >> > > > >> and
> > > > > >> > > > >> > provided by the dag. We currently intercept the load
> > path
> > > > do
> > > > > >> could
> > > > > >> > > use
> > > > > >> > > > >> this
> > > > > >> > > > >> > field to make sure we load the same generation.  My
> > > concern
> > > > > here
> > > > > >> > is
> > > > > >> > > > the
> > > > > >> > > > >> > interaction with the scheduler, not as familiar with
> > that
> > > > > logic
> > > > > >> to
> > > > > >> > > > >> predict
> > > > > >> > > > >> > corner cases were this would fail.
> > > > > >> > > > >> >
> > > > > >> > > > >> > Any other recommendations for how this could be done?
> > > > > >> > > > >> >
> > > > > >> > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell <
> > > > > >> dcapwell@gmail.com>
> > > > > >> > > > wrote:
> > > > > >> > > > >> >
> > > > > >> > > > >> > > We have been using airflow for logic that delegates
> > to
> > > > > other
> > > > > >> > > > systems so
> > > > > >> > > > >> > > inject a task all tasks depends to make sure all
> > > > resources
> > > > > >> used
> > > > > >> > > are
> > > > > >> > > > the
> > > > > >> > > > >> > > same for all tasks in the dag. This works well for
> > > tasks
> > > > > that
> > > > > >> > > > delegates
> > > > > >> > > > >> > to
> > > > > >> > > > >> > > external systems but people are starting to need to
> > run
> > > > > logic
> > > > > >> in
> > > > > >> > > > >> airflow
> > > > > >> > > > >> > > and the fact that scheduler and all workers can see
> > > > > different
> > > > > >> > > > states is
> > > > > >> > > > >> > > causing issues
> > > > > >> > > > >> > >
> > > > > >> > > > >> > > We can make sure that all the code is deployed in a
> > > > > consistent
> > > > > >> > way
> > > > > >> > > > but
> > > > > >> > > > >> > > need help from the scheduler to tell the workers
> the
> > > > > current
> > > > > >> > > > generation
> > > > > >> > > > >> > for
> > > > > >> > > > >> > > a DAG.
> > > > > >> > > > >> > >
> > > > > >> > > > >> > > My question is, what would be the best way to
> modify
> > > > > airflow
> > > > > >> to
> > > > > >> > > > allow
> > > > > >> > > > >> > DAGs
> > > > > >> > > > >> > > to define a generation value that the scheduler
> could
> > > > send
> > > > > to
> > > > > >> > > > workers?
> > > > > >> > > > >> > >
> > > > > >> > > > >> > > Thanks
> > > > > >> > > > >> > >
> > > > > >> > > > >> >
> > > > > >> > > > >>
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: How to add hooks for strong deployment consistency?

Posted by Maxime Beauchemin <ma...@gmail.com>.
A few notes about the pickling topic to answer William:
* first reason why we cannot use pickles: jinja template objects are not
picklable, there's a hack to pickle the content of the template instead of
the object, but that breaks jinja inheritance and imports
* pickles are messy, and Airflow allows users to attach objects to DAG
objects (on_error_callable, on_retry_callable, params, ...) and pickles
will go down the recursive rabbit hole and import everything large chunks
of what's in `sys.modules` sometimes. (probably could be mitigated)
* pickles are a messy serialization format, lots of drawbacks, security
issues, incompatibility between py2 and py3, ...
* tpickles have a bad reputation, many people advised avoiding it like
plague since the feature was first built
* original approach of pickling to the db is kind of a hack

I also agree that caching is probably required especially around large DAGs
and for "semi-stateless" web servers to operate properly.

Max


On Thu, Mar 1, 2018 at 1:15 PM, David Capwell <dc...@gmail.com> wrote:

> We need two versions but most likely would not use either... That being
> artifactory and git (would really love for this to be pluggable!!!!!)
>
> We have our own dag fetch logic which right now pulls from git, caches,
> then redirect airflow to that directory.  For us we have airflow automated
> so you push a button to get a cluster, for this reason there are enough
> instances that we have DDOS attacked git (opps).
>
> We are planning to change this to fetch from artifactory, and have a
> stateful proxy for each cluster so we stop DDOS attacking core
> infrastructure.
>
> On Mar 1, 2018 11:45 AM, "William Wong" <wo...@gmail.com> wrote:
>
> Also relatively new to Airflow here. Same as David above, Option 1 is not
> an option for us either for the same reasons.
>
> What I would like to see is that it can be user selectable / modifiable.
>
> Use Case:
> We have a DAG with thousands of task dependencies/tasks. After 24hrs of
> progressing, we need to take a subset of those tasks and rerun them with a
> different configuration (reasons range from incorrect parameters to
> infrastructure issues, doesn't really matter here).
>
> What I hope can happen:
> 1. Pause DAG
> 2. Upload and tag newest dag version
> 3. Set dag_run to use latest tag,
> 4. Resolve DAG sync using <insert smart diff behavior here that is clearly
> defined/documented>
> 5. Unpause DAG
>
> I do like the DagFetcher idea. This logic should shim in nicely in the
> DagBag code. Maxime, I also vote for the GitDagFetcher. Two thoughts about
> the GitDagFetcher:
> - I probably won't use fuse across 100's of nodes in my k8s/swarm. Not sure
> how this would work without too much trouble.
> - It might be confusing if some git sha's have no changes to a Dag. all
> existing runs will be marked as outdated? probably better than nothing
> anyway.
>
> I also vote to have some form of sort of caching behavior. I prefer not to
> read in DAGs all the time. i.e. from the webserver, scheduler, *and* all
> workers before starting any task over and over again. This is because,
> unfortunately, the assumption that a DAG only takes seconds to load does
> not hold true for large dags. With only 10k tasks within a DAG it's already
> on the order of minutes. This would be untenable as we scale up to even
> larger tags. (Though, I'm testing a fix for this so maybe this might not
> actually be an issue anymore)
>
> FWIW, it seems to me that the DagPickle feature (which, for the love me I
> can't seem to get it to work, no wonder it's being deprecated) would have
> solved a lot of these issues fairly easily. Something along the lines of
> adding pickle_id to dag_run should at  help the scheduler identify the DAG
> version to load and queue. But I'm not sure if it can delete out of sync
> task instances.
>
> Lastly, sorry for the brain dump and derailing the topic, for the workers,
> it seems that importing/loading in the DAG just to execute a single task is
> a bit overkill isn't it? If we kept a caching feature (i.e. pickling),
> perhaps we can simply cache the task and not worry about the rest of the
> DAG tasks?
>
> Will
>
> On Thu, Mar 1, 2018 at 11:30 AM, Maxime Beauchemin <
>
> maximebeauchemin@gmail.com> wrote:
>
> > I'm curious to hear which DagFetcher abstraction people would build or
> want
> > to use.
> >
> > So far it sounded like the most popular and flexible approach would be a
> > `GitDagFetcher` where all SHAs and refs become a possibility, as opposed
> to
> > say a TarballOnS3DagFetcher which would require more manual artifact
> > management and versioning, which represent additional [human] workflow on
> > top of the already existing git-based workflow.
> >
> > One way I've seen this done before is by using this Git fuse (file system
> > in user space) hack that creates a virtual filesystem where all SHAs and
> > refs in the Git repo are exposed as a subfolder, and under each ref
> > subfolder the whole repo sits as of that ref. Of course all the files are
> > virtual and fetched at access time by the virtual filesystem using the
> git
> > api. So if you simply point the DagBag loader to the right [virtual]
> > directory, it will import the right version of the DAG. In the git world,
> > the alternative to that is managing temp folders and doing shallow clones
> > which seems like much more of a headache. Note that one tradeoff is that
> if
> > git and whatever it depends has then a need to be highly available.
> >
> > Max
> >
> > On Wed, Feb 28, 2018 at 6:55 PM, David Capwell <dc...@gmail.com>
> wrote:
> >
> > > Thanks for all the details! With a pluggable fetcher we would be able
> to
> > > add our own logic for how to fetch so sounds like a good place to start
> > for
> > > something like this!
> > >
> > > On Wed, Feb 28, 2018, 4:39 PM Joy Gao <jo...@wepay.com> wrote:
> > >
> > > > +1 on DagFetcher abstraction, very airflow-esque :)
> > > >
> > > > On Wed, Feb 28, 2018 at 11:25 AM, Maxime Beauchemin
> > > > <ma...@gmail.com> wrote:
> > > > > Addressing a few of your questions / concerns:
> > > > >
> > > > > * The scheduler uses a multiprocess queue to queue up tasks, each
> > > > > subprocess is in charge of a single DAG "scheduler cycle" which
> > > triggers
> > > > > what it can for active DagRuns. Currently it fills the DagBag from
> > the
> > > > > local file system, looking for a specific module where the master
> > > process
> > > > > last saw that DAG. Fetching the DAG is a metadata operation, DAG
> > > > artifacts
> > > > > shouldn't be too large, we can assume that it takes seconds at most
> > to
> > > > > fetch a DAG, which is ok. We generally assume that the scheduler
> > should
> > > > > fully cycle every minute or so. Version-aware DagFetcher could also
> > > > > implement some sort of caching if that was a concern (shouldn't be
> > > > though).
> > > > > * For consistency within the whole DagRun, the scheduler absolutely
> > has
> > > > to
> > > > > read the right version. If tasks got removed they would never get
> > > > scheduled
> > > > > and consistency cannot be achieved.
> > > > > * TaskInstances get created the first time they are identified as
> > > > runnable
> > > > > by the scheduler and are born with a queued status I believe (from
> > > > memory,
> > > > > haven't read the latest code to confirm). The worker double checks
> > and
> > > > sets
> > > > > it as running as part of a database transaction to avoid
> > double-firing.
> > > > >
> > > > > Max
> > > > >
> > > > > On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <ch...@crpalmer.com>
> > > > wrote:
> > > > >
> > > > >> I'll preface this with the fact that I'm relatively new to
> Airflow,
> > > and
> > > > >> haven't played around with a lot of the internals.
> > > > >>
> > > > >> I find the idea of a DagFetcher interesting but would we worry
> about
> > > > >> slowing down the scheduler significantly? If the scheduler is
> having
> > > to
> > > > >> "fetch" multiple different DAG versions, be it git refs or
> artifacts
> > > > from
> > > > >> Artifactory, we are talking about adding significant time to each
> > > > scheduler
> > > > >> run. Also how would the scheduler know which DAGs to fetch from
> > where
> > > if
> > > > >> there aren't local files on disk listing those DAGs? Maybe I'm
> > missing
> > > > >> something in the implementation.
> > > > >>
> > > > >> It seems to me that the fetching of the different versions should
> be
> > > > >> delegated to the Task (or TaskInstance) itself. That ensures we
> only
> > > > spend
> > > > >> the time to "fetch" the version that is needed when it is needed.
> > One
> > > > down
> > > > >> side might be that each TaskInstance running for the same version
> of
> > > the
> > > > >> DAG might end up doing the "fetch" independently (duplicating that
> > > > work).
> > > > >>
> > > > >> I think this could be done by adding some version attribute to the
> > > > DagRun
> > > > >> that gets set at creation, and have the scheduler pass that
> version
> > to
> > > > the
> > > > >> TaskInstances when they are created. You could even extend this so
> > > that
> > > > you
> > > > >> could have an arbitrary set of "executor_parameters" that get set
> > on a
> > > > >> DagRun and are passed to TaskInstances. Then the specific Executor
> > > class
> > > > >> that is running that TaskInstance could handle the
> > > > "executor_parameters" as
> > > > >> it sees fit.
> > > > >>
> > > > >> One thing I'm not clear on is how and when TaskInstances are
> > created.
> > > > When
> > > > >> the scheduler first sees a specific DagRun do all the
> TaskInstances
> > > get
> > > > >> created immediately, but only some of them get queued? Or does the
> > > > >> scheduler only create those TaskInstances which can be queued
> right
> > > now?
> > > > >>
> > > > >> In particular if a DagRun gets created and while it is running the
> > DAG
> > > > is
> > > > >> updated and a new Task is added, will the scheduler pick up that
> new
> > > > Task
> > > > >> for the running DagRun? If the answer is yes, then my suggestion
> > above
> > > > >> would run the risk of scheduling a Task for a DAG version where
> that
> > > > Task
> > > > >> didn't exist. I'm sure you could handle that somewhat gracefully
> but
> > > > it's a
> > > > >> bit ugly.
> > > > >>
> > > > >> Chris
> > > > >>
> > > > >> On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin <
> > > > >> maximebeauchemin@gmail.com> wrote:
> > > > >>
> > > > >> > At a higher level I want to say a few things about the idea of
> > > > enforcing
> > > > >> > version consistency within a DagRun.
> > > > >> >
> > > > >> > One thing we've been talking about is the need for a
> "DagFetcher"
> > > > >> > abstraction, where it's first implementation that would replace
> > and
> > > > mimic
> > > > >> > the current one would be "FileSystemDagFetcher". One specific
> > > > DagFetcher
> > > > >> > implementation may or may not support version semantics, but if
> it
> > > > does
> > > > >> > should be able to receive a version id and return the proper
> > version
> > > > of
> > > > >> the
> > > > >> > DAG object. For instance that first "FileSystemDagFetcher" would
> > not
> > > > >> > support version semantic, but perhaps a "GitRepoDagFetcher"
> would,
> > > or
> > > > an
> > > > >> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well.
> > > > >> >
> > > > >> > Of course that assumes that the scheduler knows and stores the
> > > active
> > > > >> > version number when generating a new DagRun, and for that
> > > information
> > > > to
> > > > >> be
> > > > >> > leveraged on subsequent scheduler cycles and on workers when
> task
> > > are
> > > > >> > executed.
> > > > >> >
> > > > >> > This could also enable things like "remote" backfills (non
> local,
> > > > >> > parallelized) of a DAG definition that's on an arbitrary git ref
> > > > >> (assuming
> > > > >> > a "GitRepoDagFetcher").
> > > > >> >
> > > > >> > There are [perhaps] unintuitive implications where clearing a
> > single
> > > > task
> > > > >> > would then re-run the old DAG definition on that task (since the
> > > > version
> > > > >> > was stamped in the DagRun and hasn't changed), but
> > > > deleting/recreating a
> > > > >> > DagRun would run the latest version (or any other version that
> may
> > > be
> > > > >> > specified for that matter).
> > > > >> >
> > > > >> > I'm unclear on how much work that represents exactly, but it's
> > > > certainly
> > > > >> > doable and may only require to change part of the DagBag class
> > and a
> > > > few
> > > > >> > other places.
> > > > >> >
> > > > >> > Max
> > > > >> >
> > > > >> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <
> > dcapwell@gmail.com>
> > > > >> wrote:
> > > > >> >
> > > > >> > > Thanks for your feedback!
> > > > >> > >
> > > > >> > > Option 1 is a non-starter for us. The reason is we have DAGs
> > that
> > > > take
> > > > >> 9+
> > > > >> > > hours to run.
> > > > >> > >
> > > > >> > > Option 2 is more where my mind was going, but it's rather
> large.
> > > > How I
> > > > >> > see
> > > > >> > > it you need a MVCC DagBag that's aware of multiple versions
> > (what
> > > > >> > provides
> > > > >> > > version?).  Assuming you can track active dag runs pointing to
> > > which
> > > > >> > > versions you know how to cleanup (fine with external).  The
> pro
> > > > here is
> > > > >> > you
> > > > >> > > have snapshot isolation for dag_run, con is more bookkeeping
> and
> > > > >> require
> > > > >> > > deploy to work with this (last part may be a good thing
> though).
> > > > >> > >
> > > > >> > > The only other option I can think of is to lock deploy so the
> > > system
> > > > >> only
> > > > >> > > picks up new versions when no dag_run holds the lock.  This is
> > > > flawed
> > > > >> for
> > > > >> > > many reasons, but breaks horrible for dag_runs that takes
> > minutes
> > > (I
> > > > >> > assume
> > > > >> > > 99% do).
> > > > >> > >
> > > > >> > >
> > > > >> > >
> > > > >> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <jo...@wepay.com> wrote:
> > > > >> > >
> > > > >> > > > Hi David!
> > > > >> > > >
> > > > >> > > > Thank you for clarifying, I think I understand your concern
> > now.
> > > > We
> > > > >> > > > currently also work around this by making sure a dag is
> turned
> > > off
> > > > >> > > > when we deploy a new version. We also make sure our jobs are
> > > > >> > > > idempotent and retry-enabled in the case when we forget to
> > turn
> > > > off
> > > > >> > > > the job, so the issue hasn't caused us too much headache.
> > > > >> > > >
> > > > >> > > > I do agree that it would be nice for Airflow to have the
> > option
> > > to
> > > > >> > > > guarantee a single version of dag per dag run. I see two
> > > > approaches:
> > > > >> > > >
> > > > >> > > > (1) If a dag is updated, the current dagrun fails and/or
> > > retries.
> > > > >> > > > (2) If a dag is updated, the current dagrun continues but
> uses
> > > > >> version
> > > > >> > > > before the update.
> > > > >> > > >
> > > > >> > > > (1) requires some mechanism to compare dag generations. One
> > > > option is
> > > > >> > > > to hash the dagfile and storing that value to the dagrun
> > table,
> > > > and
> > > > >> > > > compare against it each time a task is running. And in the
> > case
> > > if
> > > > >> the
> > > > >> > > > hash value is different, update the hash value, then
> > fail/retry
> > > > the
> > > > >> > > > dag. I think this is a fairly safe approach.
> > > > >> > > >
> > > > >> > > > (2) is trickier. A dag only has a property "fileloc" which
> > > tracks
> > > > the
> > > > >> > > > location of the dag file, but the actual content of the dag
> > file
> > > > is
> > > > >> > > > never versioned. When a task instance starts running, it
> > > > dynamically
> > > > >> > > > re-processes the dag file specified by the fileloc, generate
> > all
> > > > the
> > > > >> > > > task objects from the dag file, and fetch the task object by
> > > > task_id
> > > > >> > > > in order to execute it. So in order to guarantee each dagrun
> > to
> > > > run a
> > > > >> > > > specific version, previous versions must be maintained on
> disk
> > > > >> somehow
> > > > >> > > > (maintaining this information in memory is difficult, since
> if
> > > the
> > > > >> > > > scheduler/worker shuts down, that information is lost). This
> > > > makes it
> > > > >> > > > a pretty big change, and I haven't thought much on how to
> > > > implement
> > > > >> > > > it.
> > > > >> > > >
> > > > >> > > > I'm personally leaning towards (1) for sake of simplicity.
> > Note
> > > > that
> > > > >> > > > some users may not want dag to fail/retry even when dag is
> > > > updated,
> > > > >> so
> > > > >> > > > this should be an optional feature, not required.
> > > > >> > > >
> > > > >> > > > My scheduler-foo isn't that great, so curious what others
> have
> > > to
> > > > say
> > > > >> > > > about this.
> > > > >> > > >
> > > > >> > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <
> > > > dcapwell@gmail.com>
> > > > >> > > wrote:
> > > > >> > > > > Thanks for the reply Joy, let me walk you though things as
> > > they
> > > > are
> > > > >> > > today
> > > > >> > > > >
> > > > >> > > > > 1) we don't stop airflow or disable DAGs while deploying
> > > > updates to
> > > > >> > > > logic,
> > > > >> > > > > this is done live once its released
> > > > >> > > > > 2) the python script in the DAG folder doesn't actually
> have
> > > > DAGs
> > > > >> in
> > > > >> > it
> > > > >> > > > but
> > > > >> > > > > is a shim layer to allow us to deploy in a atomic way for
> a
> > > > single
> > > > >> > host
> > > > >> > > > >   2.1) this script reads a file on local disk (less than
> > disk
> > > > page
> > > > >> > > size)
> > > > >> > > > to
> > > > >> > > > > find latest git commit deployed
> > > > >> > > > >   2.2) re-does the airflow DAG load process but pointing
> to
> > > the
> > > > git
> > > > >> > > > commit
> > > > >> > > > > path
> > > > >> > > > >
> > > > >> > > > > Example directory structure
> > > > >> > > > >
> > > > >> > > > > /airflow/dags/shim.py
> > > > >> > > > > /airflow/real_dags/
> > > > >> > > > >                             /latest # pointer to latest
> > commit
> > > > >> > > > >                             /[git commit]/
> > > > >> > > > >
> > > > >> > > > > This is how we make sure deploys are consistent within a
> > > single
> > > > >> task.
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > Now, lets assume we have a fully atomic commit process and
> > are
> > > > able
> > > > >> > to
> > > > >> > > > > upgrade DAGs at the exact same moment.
> > > > >> > > > >
> > > > >> > > > > At time T0 the scheduler knows of DAG V1 and schedules two
> > > > tasks,
> > > > >> > > Task1,
> > > > >> > > > > and Task2
> > > > >> > > > > At time T1 Task1 is picked up by Worker1, so starts
> > executing
> > > > the
> > > > >> > task
> > > > >> > > > (V1
> > > > >> > > > > logic)
> > > > >> > > > > At time T2 deploy commit happens, current DAG version: V2
> > > > >> > > > > At time T3, Task2 is picked up by Worker2, so starts
> > executing
> > > > the
> > > > >> > task
> > > > >> > > > (V2
> > > > >> > > > > logic)
> > > > >> > > > >
> > > > >> > > > > In many cases this isn't really a problem (tuning config
> > > change
> > > > to
> > > > >> > > hadoop
> > > > >> > > > > job), but as we have more people using Airflow this is
> > > causing a
> > > > >> lot
> > > > >> > of
> > > > >> > > > > time spent debugging why production acted differently than
> > > > expected
> > > > >> > > (the
> > > > >> > > > > problem was already fixed... why is it still here?).  We
> > also
> > > > see
> > > > >> > that
> > > > >> > > > some
> > > > >> > > > > tasks expect a given behavior from other tasks, and since
> > they
> > > > live
> > > > >> > in
> > > > >> > > > the
> > > > >> > > > > same git repo they can modify both tasks at the same time
> > if a
> > > > >> > breaking
> > > > >> > > > > change is needed, but when this rolls out to prod there
> > isn't
> > > a
> > > > way
> > > > >> > to
> > > > >> > > do
> > > > >> > > > > this other than turn off the DAG, and login to all hosts
> to
> > > > verify
> > > > >> > > fully
> > > > >> > > > > deployed.
> > > > >> > > > >
> > > > >> > > > > We would like to remove this confusion and make
> > > > >> generations/versions
> > > > >> > > > (same
> > > > >> > > > > thing really) exposed to users and make sure for a single
> > > > dag_run
> > > > >> > only
> > > > >> > > > one
> > > > >> > > > > version is used.
> > > > >> > > > >
> > > > >> > > > > I hope this is more clear.
> > > > >> > > > >
> > > > >> > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <jo...@wepay.com>
> > > > wrote:
> > > > >> > > > >
> > > > >> > > > >> Hi David,
> > > > >> > > > >>
> > > > >> > > > >> Do you mind providing a concrete example of the scenario
> in
> > > > which
> > > > >> > > > >> scheduler/workers see different states (I'm not 100% sure
> > if
> > > I
> > > > >> > > > understood
> > > > >> > > > >> the issue at hand).
> > > > >> > > > >>
> > > > >> > > > >> And by same dag generation, are you referring to the dag
> > > > version?
> > > > >> > (DAG
> > > > >> > > > >> version is currently not supported at all, but I can see
> it
> > > > being
> > > > >> a
> > > > >> > > > >> building block for future use cases).
> > > > >> > > > >>
> > > > >> > > > >> Joy
> > > > >> > > > >>
> > > > >> > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell <
> > > > >> dcapwell@gmail.com>
> > > > >> > > > wrote:
> > > > >> > > > >>
> > > > >> > > > >> > My current thinking is to add a field to the dag table
> > that
> > > > is
> > > > >> > > > optional
> > > > >> > > > >> and
> > > > >> > > > >> > provided by the dag. We currently intercept the load
> path
> > > do
> > > > >> could
> > > > >> > > use
> > > > >> > > > >> this
> > > > >> > > > >> > field to make sure we load the same generation.  My
> > concern
> > > > here
> > > > >> > is
> > > > >> > > > the
> > > > >> > > > >> > interaction with the scheduler, not as familiar with
> that
> > > > logic
> > > > >> to
> > > > >> > > > >> predict
> > > > >> > > > >> > corner cases were this would fail.
> > > > >> > > > >> >
> > > > >> > > > >> > Any other recommendations for how this could be done?
> > > > >> > > > >> >
> > > > >> > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell <
> > > > >> dcapwell@gmail.com>
> > > > >> > > > wrote:
> > > > >> > > > >> >
> > > > >> > > > >> > > We have been using airflow for logic that delegates
> to
> > > > other
> > > > >> > > > systems so
> > > > >> > > > >> > > inject a task all tasks depends to make sure all
> > > resources
> > > > >> used
> > > > >> > > are
> > > > >> > > > the
> > > > >> > > > >> > > same for all tasks in the dag. This works well for
> > tasks
> > > > that
> > > > >> > > > delegates
> > > > >> > > > >> > to
> > > > >> > > > >> > > external systems but people are starting to need to
> run
> > > > logic
> > > > >> in
> > > > >> > > > >> airflow
> > > > >> > > > >> > > and the fact that scheduler and all workers can see
> > > > different
> > > > >> > > > states is
> > > > >> > > > >> > > causing issues
> > > > >> > > > >> > >
> > > > >> > > > >> > > We can make sure that all the code is deployed in a
> > > > consistent
> > > > >> > way
> > > > >> > > > but
> > > > >> > > > >> > > need help from the scheduler to tell the workers the
> > > > current
> > > > >> > > > generation
> > > > >> > > > >> > for
> > > > >> > > > >> > > a DAG.
> > > > >> > > > >> > >
> > > > >> > > > >> > > My question is, what would be the best way to modify
> > > > airflow
> > > > >> to
> > > > >> > > > allow
> > > > >> > > > >> > DAGs
> > > > >> > > > >> > > to define a generation value that the scheduler could
> > > send
> > > > to
> > > > >> > > > workers?
> > > > >> > > > >> > >
> > > > >> > > > >> > > Thanks
> > > > >> > > > >> > >
> > > > >> > > > >> >
> > > > >> > > > >>
> > > > >> > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > > >
> > >
> >
>

Re: How to add hooks for strong deployment consistency?

Posted by David Capwell <dc...@gmail.com>.
We need two versions but most likely would not use either... That being
artifactory and git (would really love for this to be pluggable!!!!!)

We have our own dag fetch logic which right now pulls from git, caches,
then redirect airflow to that directory.  For us we have airflow automated
so you push a button to get a cluster, for this reason there are enough
instances that we have DDOS attacked git (opps).

We are planning to change this to fetch from artifactory, and have a
stateful proxy for each cluster so we stop DDOS attacking core
infrastructure.

On Mar 1, 2018 11:45 AM, "William Wong" <wo...@gmail.com> wrote:

Also relatively new to Airflow here. Same as David above, Option 1 is not
an option for us either for the same reasons.

What I would like to see is that it can be user selectable / modifiable.

Use Case:
We have a DAG with thousands of task dependencies/tasks. After 24hrs of
progressing, we need to take a subset of those tasks and rerun them with a
different configuration (reasons range from incorrect parameters to
infrastructure issues, doesn't really matter here).

What I hope can happen:
1. Pause DAG
2. Upload and tag newest dag version
3. Set dag_run to use latest tag,
4. Resolve DAG sync using <insert smart diff behavior here that is clearly
defined/documented>
5. Unpause DAG

I do like the DagFetcher idea. This logic should shim in nicely in the
DagBag code. Maxime, I also vote for the GitDagFetcher. Two thoughts about
the GitDagFetcher:
- I probably won't use fuse across 100's of nodes in my k8s/swarm. Not sure
how this would work without too much trouble.
- It might be confusing if some git sha's have no changes to a Dag. all
existing runs will be marked as outdated? probably better than nothing
anyway.

I also vote to have some form of sort of caching behavior. I prefer not to
read in DAGs all the time. i.e. from the webserver, scheduler, *and* all
workers before starting any task over and over again. This is because,
unfortunately, the assumption that a DAG only takes seconds to load does
not hold true for large dags. With only 10k tasks within a DAG it's already
on the order of minutes. This would be untenable as we scale up to even
larger tags. (Though, I'm testing a fix for this so maybe this might not
actually be an issue anymore)

FWIW, it seems to me that the DagPickle feature (which, for the love me I
can't seem to get it to work, no wonder it's being deprecated) would have
solved a lot of these issues fairly easily. Something along the lines of
adding pickle_id to dag_run should at  help the scheduler identify the DAG
version to load and queue. But I'm not sure if it can delete out of sync
task instances.

Lastly, sorry for the brain dump and derailing the topic, for the workers,
it seems that importing/loading in the DAG just to execute a single task is
a bit overkill isn't it? If we kept a caching feature (i.e. pickling),
perhaps we can simply cache the task and not worry about the rest of the
DAG tasks?

Will

On Thu, Mar 1, 2018 at 11:30 AM, Maxime Beauchemin <

maximebeauchemin@gmail.com> wrote:

> I'm curious to hear which DagFetcher abstraction people would build or
want
> to use.
>
> So far it sounded like the most popular and flexible approach would be a
> `GitDagFetcher` where all SHAs and refs become a possibility, as opposed
to
> say a TarballOnS3DagFetcher which would require more manual artifact
> management and versioning, which represent additional [human] workflow on
> top of the already existing git-based workflow.
>
> One way I've seen this done before is by using this Git fuse (file system
> in user space) hack that creates a virtual filesystem where all SHAs and
> refs in the Git repo are exposed as a subfolder, and under each ref
> subfolder the whole repo sits as of that ref. Of course all the files are
> virtual and fetched at access time by the virtual filesystem using the git
> api. So if you simply point the DagBag loader to the right [virtual]
> directory, it will import the right version of the DAG. In the git world,
> the alternative to that is managing temp folders and doing shallow clones
> which seems like much more of a headache. Note that one tradeoff is that
if
> git and whatever it depends has then a need to be highly available.
>
> Max
>
> On Wed, Feb 28, 2018 at 6:55 PM, David Capwell <dc...@gmail.com> wrote:
>
> > Thanks for all the details! With a pluggable fetcher we would be able to
> > add our own logic for how to fetch so sounds like a good place to start
> for
> > something like this!
> >
> > On Wed, Feb 28, 2018, 4:39 PM Joy Gao <jo...@wepay.com> wrote:
> >
> > > +1 on DagFetcher abstraction, very airflow-esque :)
> > >
> > > On Wed, Feb 28, 2018 at 11:25 AM, Maxime Beauchemin
> > > <ma...@gmail.com> wrote:
> > > > Addressing a few of your questions / concerns:
> > > >
> > > > * The scheduler uses a multiprocess queue to queue up tasks, each
> > > > subprocess is in charge of a single DAG "scheduler cycle" which
> > triggers
> > > > what it can for active DagRuns. Currently it fills the DagBag from
> the
> > > > local file system, looking for a specific module where the master
> > process
> > > > last saw that DAG. Fetching the DAG is a metadata operation, DAG
> > > artifacts
> > > > shouldn't be too large, we can assume that it takes seconds at most
> to
> > > > fetch a DAG, which is ok. We generally assume that the scheduler
> should
> > > > fully cycle every minute or so. Version-aware DagFetcher could also
> > > > implement some sort of caching if that was a concern (shouldn't be
> > > though).
> > > > * For consistency within the whole DagRun, the scheduler absolutely
> has
> > > to
> > > > read the right version. If tasks got removed they would never get
> > > scheduled
> > > > and consistency cannot be achieved.
> > > > * TaskInstances get created the first time they are identified as
> > > runnable
> > > > by the scheduler and are born with a queued status I believe (from
> > > memory,
> > > > haven't read the latest code to confirm). The worker double checks
> and
> > > sets
> > > > it as running as part of a database transaction to avoid
> double-firing.
> > > >
> > > > Max
> > > >
> > > > On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <ch...@crpalmer.com>
> > > wrote:
> > > >
> > > >> I'll preface this with the fact that I'm relatively new to Airflow,
> > and
> > > >> haven't played around with a lot of the internals.
> > > >>
> > > >> I find the idea of a DagFetcher interesting but would we worry
about
> > > >> slowing down the scheduler significantly? If the scheduler is
having
> > to
> > > >> "fetch" multiple different DAG versions, be it git refs or
artifacts
> > > from
> > > >> Artifactory, we are talking about adding significant time to each
> > > scheduler
> > > >> run. Also how would the scheduler know which DAGs to fetch from
> where
> > if
> > > >> there aren't local files on disk listing those DAGs? Maybe I'm
> missing
> > > >> something in the implementation.
> > > >>
> > > >> It seems to me that the fetching of the different versions should
be
> > > >> delegated to the Task (or TaskInstance) itself. That ensures we
only
> > > spend
> > > >> the time to "fetch" the version that is needed when it is needed.
> One
> > > down
> > > >> side might be that each TaskInstance running for the same version
of
> > the
> > > >> DAG might end up doing the "fetch" independently (duplicating that
> > > work).
> > > >>
> > > >> I think this could be done by adding some version attribute to the
> > > DagRun
> > > >> that gets set at creation, and have the scheduler pass that version
> to
> > > the
> > > >> TaskInstances when they are created. You could even extend this so
> > that
> > > you
> > > >> could have an arbitrary set of "executor_parameters" that get set
> on a
> > > >> DagRun and are passed to TaskInstances. Then the specific Executor
> > class
> > > >> that is running that TaskInstance could handle the
> > > "executor_parameters" as
> > > >> it sees fit.
> > > >>
> > > >> One thing I'm not clear on is how and when TaskInstances are
> created.
> > > When
> > > >> the scheduler first sees a specific DagRun do all the TaskInstances
> > get
> > > >> created immediately, but only some of them get queued? Or does the
> > > >> scheduler only create those TaskInstances which can be queued right
> > now?
> > > >>
> > > >> In particular if a DagRun gets created and while it is running the
> DAG
> > > is
> > > >> updated and a new Task is added, will the scheduler pick up that
new
> > > Task
> > > >> for the running DagRun? If the answer is yes, then my suggestion
> above
> > > >> would run the risk of scheduling a Task for a DAG version where
that
> > > Task
> > > >> didn't exist. I'm sure you could handle that somewhat gracefully
but
> > > it's a
> > > >> bit ugly.
> > > >>
> > > >> Chris
> > > >>
> > > >> On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin <
> > > >> maximebeauchemin@gmail.com> wrote:
> > > >>
> > > >> > At a higher level I want to say a few things about the idea of
> > > enforcing
> > > >> > version consistency within a DagRun.
> > > >> >
> > > >> > One thing we've been talking about is the need for a "DagFetcher"
> > > >> > abstraction, where it's first implementation that would replace
> and
> > > mimic
> > > >> > the current one would be "FileSystemDagFetcher". One specific
> > > DagFetcher
> > > >> > implementation may or may not support version semantics, but if
it
> > > does
> > > >> > should be able to receive a version id and return the proper
> version
> > > of
> > > >> the
> > > >> > DAG object. For instance that first "FileSystemDagFetcher" would
> not
> > > >> > support version semantic, but perhaps a "GitRepoDagFetcher"
would,
> > or
> > > an
> > > >> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well.
> > > >> >
> > > >> > Of course that assumes that the scheduler knows and stores the
> > active
> > > >> > version number when generating a new DagRun, and for that
> > information
> > > to
> > > >> be
> > > >> > leveraged on subsequent scheduler cycles and on workers when task
> > are
> > > >> > executed.
> > > >> >
> > > >> > This could also enable things like "remote" backfills (non local,
> > > >> > parallelized) of a DAG definition that's on an arbitrary git ref
> > > >> (assuming
> > > >> > a "GitRepoDagFetcher").
> > > >> >
> > > >> > There are [perhaps] unintuitive implications where clearing a
> single
> > > task
> > > >> > would then re-run the old DAG definition on that task (since the
> > > version
> > > >> > was stamped in the DagRun and hasn't changed), but
> > > deleting/recreating a
> > > >> > DagRun would run the latest version (or any other version that
may
> > be
> > > >> > specified for that matter).
> > > >> >
> > > >> > I'm unclear on how much work that represents exactly, but it's
> > > certainly
> > > >> > doable and may only require to change part of the DagBag class
> and a
> > > few
> > > >> > other places.
> > > >> >
> > > >> > Max
> > > >> >
> > > >> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <
> dcapwell@gmail.com>
> > > >> wrote:
> > > >> >
> > > >> > > Thanks for your feedback!
> > > >> > >
> > > >> > > Option 1 is a non-starter for us. The reason is we have DAGs
> that
> > > take
> > > >> 9+
> > > >> > > hours to run.
> > > >> > >
> > > >> > > Option 2 is more where my mind was going, but it's rather
large.
> > > How I
> > > >> > see
> > > >> > > it you need a MVCC DagBag that's aware of multiple versions
> (what
> > > >> > provides
> > > >> > > version?).  Assuming you can track active dag runs pointing to
> > which
> > > >> > > versions you know how to cleanup (fine with external).  The pro
> > > here is
> > > >> > you
> > > >> > > have snapshot isolation for dag_run, con is more bookkeeping
and
> > > >> require
> > > >> > > deploy to work with this (last part may be a good thing
though).
> > > >> > >
> > > >> > > The only other option I can think of is to lock deploy so the
> > system
> > > >> only
> > > >> > > picks up new versions when no dag_run holds the lock.  This is
> > > flawed
> > > >> for
> > > >> > > many reasons, but breaks horrible for dag_runs that takes
> minutes
> > (I
> > > >> > assume
> > > >> > > 99% do).
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <jo...@wepay.com> wrote:
> > > >> > >
> > > >> > > > Hi David!
> > > >> > > >
> > > >> > > > Thank you for clarifying, I think I understand your concern
> now.
> > > We
> > > >> > > > currently also work around this by making sure a dag is
turned
> > off
> > > >> > > > when we deploy a new version. We also make sure our jobs are
> > > >> > > > idempotent and retry-enabled in the case when we forget to
> turn
> > > off
> > > >> > > > the job, so the issue hasn't caused us too much headache.
> > > >> > > >
> > > >> > > > I do agree that it would be nice for Airflow to have the
> option
> > to
> > > >> > > > guarantee a single version of dag per dag run. I see two
> > > approaches:
> > > >> > > >
> > > >> > > > (1) If a dag is updated, the current dagrun fails and/or
> > retries.
> > > >> > > > (2) If a dag is updated, the current dagrun continues but
uses
> > > >> version
> > > >> > > > before the update.
> > > >> > > >
> > > >> > > > (1) requires some mechanism to compare dag generations. One
> > > option is
> > > >> > > > to hash the dagfile and storing that value to the dagrun
> table,
> > > and
> > > >> > > > compare against it each time a task is running. And in the
> case
> > if
> > > >> the
> > > >> > > > hash value is different, update the hash value, then
> fail/retry
> > > the
> > > >> > > > dag. I think this is a fairly safe approach.
> > > >> > > >
> > > >> > > > (2) is trickier. A dag only has a property "fileloc" which
> > tracks
> > > the
> > > >> > > > location of the dag file, but the actual content of the dag
> file
> > > is
> > > >> > > > never versioned. When a task instance starts running, it
> > > dynamically
> > > >> > > > re-processes the dag file specified by the fileloc, generate
> all
> > > the
> > > >> > > > task objects from the dag file, and fetch the task object by
> > > task_id
> > > >> > > > in order to execute it. So in order to guarantee each dagrun
> to
> > > run a
> > > >> > > > specific version, previous versions must be maintained on
disk
> > > >> somehow
> > > >> > > > (maintaining this information in memory is difficult, since
if
> > the
> > > >> > > > scheduler/worker shuts down, that information is lost). This
> > > makes it
> > > >> > > > a pretty big change, and I haven't thought much on how to
> > > implement
> > > >> > > > it.
> > > >> > > >
> > > >> > > > I'm personally leaning towards (1) for sake of simplicity.
> Note
> > > that
> > > >> > > > some users may not want dag to fail/retry even when dag is
> > > updated,
> > > >> so
> > > >> > > > this should be an optional feature, not required.
> > > >> > > >
> > > >> > > > My scheduler-foo isn't that great, so curious what others
have
> > to
> > > say
> > > >> > > > about this.
> > > >> > > >
> > > >> > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <
> > > dcapwell@gmail.com>
> > > >> > > wrote:
> > > >> > > > > Thanks for the reply Joy, let me walk you though things as
> > they
> > > are
> > > >> > > today
> > > >> > > > >
> > > >> > > > > 1) we don't stop airflow or disable DAGs while deploying
> > > updates to
> > > >> > > > logic,
> > > >> > > > > this is done live once its released
> > > >> > > > > 2) the python script in the DAG folder doesn't actually
have
> > > DAGs
> > > >> in
> > > >> > it
> > > >> > > > but
> > > >> > > > > is a shim layer to allow us to deploy in a atomic way for a
> > > single
> > > >> > host
> > > >> > > > >   2.1) this script reads a file on local disk (less than
> disk
> > > page
> > > >> > > size)
> > > >> > > > to
> > > >> > > > > find latest git commit deployed
> > > >> > > > >   2.2) re-does the airflow DAG load process but pointing to
> > the
> > > git
> > > >> > > > commit
> > > >> > > > > path
> > > >> > > > >
> > > >> > > > > Example directory structure
> > > >> > > > >
> > > >> > > > > /airflow/dags/shim.py
> > > >> > > > > /airflow/real_dags/
> > > >> > > > >                             /latest # pointer to latest
> commit
> > > >> > > > >                             /[git commit]/
> > > >> > > > >
> > > >> > > > > This is how we make sure deploys are consistent within a
> > single
> > > >> task.
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > Now, lets assume we have a fully atomic commit process and
> are
> > > able
> > > >> > to
> > > >> > > > > upgrade DAGs at the exact same moment.
> > > >> > > > >
> > > >> > > > > At time T0 the scheduler knows of DAG V1 and schedules two
> > > tasks,
> > > >> > > Task1,
> > > >> > > > > and Task2
> > > >> > > > > At time T1 Task1 is picked up by Worker1, so starts
> executing
> > > the
> > > >> > task
> > > >> > > > (V1
> > > >> > > > > logic)
> > > >> > > > > At time T2 deploy commit happens, current DAG version: V2
> > > >> > > > > At time T3, Task2 is picked up by Worker2, so starts
> executing
> > > the
> > > >> > task
> > > >> > > > (V2
> > > >> > > > > logic)
> > > >> > > > >
> > > >> > > > > In many cases this isn't really a problem (tuning config
> > change
> > > to
> > > >> > > hadoop
> > > >> > > > > job), but as we have more people using Airflow this is
> > causing a
> > > >> lot
> > > >> > of
> > > >> > > > > time spent debugging why production acted differently than
> > > expected
> > > >> > > (the
> > > >> > > > > problem was already fixed... why is it still here?).  We
> also
> > > see
> > > >> > that
> > > >> > > > some
> > > >> > > > > tasks expect a given behavior from other tasks, and since
> they
> > > live
> > > >> > in
> > > >> > > > the
> > > >> > > > > same git repo they can modify both tasks at the same time
> if a
> > > >> > breaking
> > > >> > > > > change is needed, but when this rolls out to prod there
> isn't
> > a
> > > way
> > > >> > to
> > > >> > > do
> > > >> > > > > this other than turn off the DAG, and login to all hosts to
> > > verify
> > > >> > > fully
> > > >> > > > > deployed.
> > > >> > > > >
> > > >> > > > > We would like to remove this confusion and make
> > > >> generations/versions
> > > >> > > > (same
> > > >> > > > > thing really) exposed to users and make sure for a single
> > > dag_run
> > > >> > only
> > > >> > > > one
> > > >> > > > > version is used.
> > > >> > > > >
> > > >> > > > > I hope this is more clear.
> > > >> > > > >
> > > >> > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <jo...@wepay.com>
> > > wrote:
> > > >> > > > >
> > > >> > > > >> Hi David,
> > > >> > > > >>
> > > >> > > > >> Do you mind providing a concrete example of the scenario
in
> > > which
> > > >> > > > >> scheduler/workers see different states (I'm not 100% sure
> if
> > I
> > > >> > > > understood
> > > >> > > > >> the issue at hand).
> > > >> > > > >>
> > > >> > > > >> And by same dag generation, are you referring to the dag
> > > version?
> > > >> > (DAG
> > > >> > > > >> version is currently not supported at all, but I can see
it
> > > being
> > > >> a
> > > >> > > > >> building block for future use cases).
> > > >> > > > >>
> > > >> > > > >> Joy
> > > >> > > > >>
> > > >> > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell <
> > > >> dcapwell@gmail.com>
> > > >> > > > wrote:
> > > >> > > > >>
> > > >> > > > >> > My current thinking is to add a field to the dag table
> that
> > > is
> > > >> > > > optional
> > > >> > > > >> and
> > > >> > > > >> > provided by the dag. We currently intercept the load
path
> > do
> > > >> could
> > > >> > > use
> > > >> > > > >> this
> > > >> > > > >> > field to make sure we load the same generation.  My
> concern
> > > here
> > > >> > is
> > > >> > > > the
> > > >> > > > >> > interaction with the scheduler, not as familiar with
that
> > > logic
> > > >> to
> > > >> > > > >> predict
> > > >> > > > >> > corner cases were this would fail.
> > > >> > > > >> >
> > > >> > > > >> > Any other recommendations for how this could be done?
> > > >> > > > >> >
> > > >> > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell <
> > > >> dcapwell@gmail.com>
> > > >> > > > wrote:
> > > >> > > > >> >
> > > >> > > > >> > > We have been using airflow for logic that delegates to
> > > other
> > > >> > > > systems so
> > > >> > > > >> > > inject a task all tasks depends to make sure all
> > resources
> > > >> used
> > > >> > > are
> > > >> > > > the
> > > >> > > > >> > > same for all tasks in the dag. This works well for
> tasks
> > > that
> > > >> > > > delegates
> > > >> > > > >> > to
> > > >> > > > >> > > external systems but people are starting to need to
run
> > > logic
> > > >> in
> > > >> > > > >> airflow
> > > >> > > > >> > > and the fact that scheduler and all workers can see
> > > different
> > > >> > > > states is
> > > >> > > > >> > > causing issues
> > > >> > > > >> > >
> > > >> > > > >> > > We can make sure that all the code is deployed in a
> > > consistent
> > > >> > way
> > > >> > > > but
> > > >> > > > >> > > need help from the scheduler to tell the workers the
> > > current
> > > >> > > > generation
> > > >> > > > >> > for
> > > >> > > > >> > > a DAG.
> > > >> > > > >> > >
> > > >> > > > >> > > My question is, what would be the best way to modify
> > > airflow
> > > >> to
> > > >> > > > allow
> > > >> > > > >> > DAGs
> > > >> > > > >> > > to define a generation value that the scheduler could
> > send
> > > to
> > > >> > > > workers?
> > > >> > > > >> > >
> > > >> > > > >> > > Thanks
> > > >> > > > >> > >
> > > >> > > > >> >
> > > >> > > > >>
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> > >
> >
>

Re: How to add hooks for strong deployment consistency?

Posted by William Wong <wo...@gmail.com>.
Also relatively new to Airflow here. Same as David above, Option 1 is not
an option for us either for the same reasons.

What I would like to see is that it can be user selectable / modifiable.

Use Case:
We have a DAG with thousands of task dependencies/tasks. After 24hrs of
progressing, we need to take a subset of those tasks and rerun them with a
different configuration (reasons range from incorrect parameters to
infrastructure issues, doesn't really matter here).

What I hope can happen:
1. Pause DAG
2. Upload and tag newest dag version
3. Set dag_run to use latest tag,
4. Resolve DAG sync using <insert smart diff behavior here that is clearly
defined/documented>
5. Unpause DAG

I do like the DagFetcher idea. This logic should shim in nicely in the
DagBag code. Maxime, I also vote for the GitDagFetcher. Two thoughts about
the GitDagFetcher:
- I probably won't use fuse across 100's of nodes in my k8s/swarm. Not sure
how this would work without too much trouble.
- It might be confusing if some git sha's have no changes to a Dag. all
existing runs will be marked as outdated? probably better than nothing
anyway.

I also vote to have some form of sort of caching behavior. I prefer not to
read in DAGs all the time. i.e. from the webserver, scheduler, *and* all
workers before starting any task over and over again. This is because,
unfortunately, the assumption that a DAG only takes seconds to load does
not hold true for large dags. With only 10k tasks within a DAG it's already
on the order of minutes. This would be untenable as we scale up to even
larger tags. (Though, I'm testing a fix for this so maybe this might not
actually be an issue anymore)

FWIW, it seems to me that the DagPickle feature (which, for the love me I
can't seem to get it to work, no wonder it's being deprecated) would have
solved a lot of these issues fairly easily. Something along the lines of
adding pickle_id to dag_run should at  help the scheduler identify the DAG
version to load and queue. But I'm not sure if it can delete out of sync
task instances.

Lastly, sorry for the brain dump and derailing the topic, for the workers,
it seems that importing/loading in the DAG just to execute a single task is
a bit overkill isn't it? If we kept a caching feature (i.e. pickling),
perhaps we can simply cache the task and not worry about the rest of the
DAG tasks?

Will

On Thu, Mar 1, 2018 at 11:30 AM, Maxime Beauchemin <
maximebeauchemin@gmail.com> wrote:

> I'm curious to hear which DagFetcher abstraction people would build or want
> to use.
>
> So far it sounded like the most popular and flexible approach would be a
> `GitDagFetcher` where all SHAs and refs become a possibility, as opposed to
> say a TarballOnS3DagFetcher which would require more manual artifact
> management and versioning, which represent additional [human] workflow on
> top of the already existing git-based workflow.
>
> One way I've seen this done before is by using this Git fuse (file system
> in user space) hack that creates a virtual filesystem where all SHAs and
> refs in the Git repo are exposed as a subfolder, and under each ref
> subfolder the whole repo sits as of that ref. Of course all the files are
> virtual and fetched at access time by the virtual filesystem using the git
> api. So if you simply point the DagBag loader to the right [virtual]
> directory, it will import the right version of the DAG. In the git world,
> the alternative to that is managing temp folders and doing shallow clones
> which seems like much more of a headache. Note that one tradeoff is that if
> git and whatever it depends has then a need to be highly available.
>
> Max
>
> On Wed, Feb 28, 2018 at 6:55 PM, David Capwell <dc...@gmail.com> wrote:
>
> > Thanks for all the details! With a pluggable fetcher we would be able to
> > add our own logic for how to fetch so sounds like a good place to start
> for
> > something like this!
> >
> > On Wed, Feb 28, 2018, 4:39 PM Joy Gao <jo...@wepay.com> wrote:
> >
> > > +1 on DagFetcher abstraction, very airflow-esque :)
> > >
> > > On Wed, Feb 28, 2018 at 11:25 AM, Maxime Beauchemin
> > > <ma...@gmail.com> wrote:
> > > > Addressing a few of your questions / concerns:
> > > >
> > > > * The scheduler uses a multiprocess queue to queue up tasks, each
> > > > subprocess is in charge of a single DAG "scheduler cycle" which
> > triggers
> > > > what it can for active DagRuns. Currently it fills the DagBag from
> the
> > > > local file system, looking for a specific module where the master
> > process
> > > > last saw that DAG. Fetching the DAG is a metadata operation, DAG
> > > artifacts
> > > > shouldn't be too large, we can assume that it takes seconds at most
> to
> > > > fetch a DAG, which is ok. We generally assume that the scheduler
> should
> > > > fully cycle every minute or so. Version-aware DagFetcher could also
> > > > implement some sort of caching if that was a concern (shouldn't be
> > > though).
> > > > * For consistency within the whole DagRun, the scheduler absolutely
> has
> > > to
> > > > read the right version. If tasks got removed they would never get
> > > scheduled
> > > > and consistency cannot be achieved.
> > > > * TaskInstances get created the first time they are identified as
> > > runnable
> > > > by the scheduler and are born with a queued status I believe (from
> > > memory,
> > > > haven't read the latest code to confirm). The worker double checks
> and
> > > sets
> > > > it as running as part of a database transaction to avoid
> double-firing.
> > > >
> > > > Max
> > > >
> > > > On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <ch...@crpalmer.com>
> > > wrote:
> > > >
> > > >> I'll preface this with the fact that I'm relatively new to Airflow,
> > and
> > > >> haven't played around with a lot of the internals.
> > > >>
> > > >> I find the idea of a DagFetcher interesting but would we worry about
> > > >> slowing down the scheduler significantly? If the scheduler is having
> > to
> > > >> "fetch" multiple different DAG versions, be it git refs or artifacts
> > > from
> > > >> Artifactory, we are talking about adding significant time to each
> > > scheduler
> > > >> run. Also how would the scheduler know which DAGs to fetch from
> where
> > if
> > > >> there aren't local files on disk listing those DAGs? Maybe I'm
> missing
> > > >> something in the implementation.
> > > >>
> > > >> It seems to me that the fetching of the different versions should be
> > > >> delegated to the Task (or TaskInstance) itself. That ensures we only
> > > spend
> > > >> the time to "fetch" the version that is needed when it is needed.
> One
> > > down
> > > >> side might be that each TaskInstance running for the same version of
> > the
> > > >> DAG might end up doing the "fetch" independently (duplicating that
> > > work).
> > > >>
> > > >> I think this could be done by adding some version attribute to the
> > > DagRun
> > > >> that gets set at creation, and have the scheduler pass that version
> to
> > > the
> > > >> TaskInstances when they are created. You could even extend this so
> > that
> > > you
> > > >> could have an arbitrary set of "executor_parameters" that get set
> on a
> > > >> DagRun and are passed to TaskInstances. Then the specific Executor
> > class
> > > >> that is running that TaskInstance could handle the
> > > "executor_parameters" as
> > > >> it sees fit.
> > > >>
> > > >> One thing I'm not clear on is how and when TaskInstances are
> created.
> > > When
> > > >> the scheduler first sees a specific DagRun do all the TaskInstances
> > get
> > > >> created immediately, but only some of them get queued? Or does the
> > > >> scheduler only create those TaskInstances which can be queued right
> > now?
> > > >>
> > > >> In particular if a DagRun gets created and while it is running the
> DAG
> > > is
> > > >> updated and a new Task is added, will the scheduler pick up that new
> > > Task
> > > >> for the running DagRun? If the answer is yes, then my suggestion
> above
> > > >> would run the risk of scheduling a Task for a DAG version where that
> > > Task
> > > >> didn't exist. I'm sure you could handle that somewhat gracefully but
> > > it's a
> > > >> bit ugly.
> > > >>
> > > >> Chris
> > > >>
> > > >> On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin <
> > > >> maximebeauchemin@gmail.com> wrote:
> > > >>
> > > >> > At a higher level I want to say a few things about the idea of
> > > enforcing
> > > >> > version consistency within a DagRun.
> > > >> >
> > > >> > One thing we've been talking about is the need for a "DagFetcher"
> > > >> > abstraction, where it's first implementation that would replace
> and
> > > mimic
> > > >> > the current one would be "FileSystemDagFetcher". One specific
> > > DagFetcher
> > > >> > implementation may or may not support version semantics, but if it
> > > does
> > > >> > should be able to receive a version id and return the proper
> version
> > > of
> > > >> the
> > > >> > DAG object. For instance that first "FileSystemDagFetcher" would
> not
> > > >> > support version semantic, but perhaps a "GitRepoDagFetcher" would,
> > or
> > > an
> > > >> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well.
> > > >> >
> > > >> > Of course that assumes that the scheduler knows and stores the
> > active
> > > >> > version number when generating a new DagRun, and for that
> > information
> > > to
> > > >> be
> > > >> > leveraged on subsequent scheduler cycles and on workers when task
> > are
> > > >> > executed.
> > > >> >
> > > >> > This could also enable things like "remote" backfills (non local,
> > > >> > parallelized) of a DAG definition that's on an arbitrary git ref
> > > >> (assuming
> > > >> > a "GitRepoDagFetcher").
> > > >> >
> > > >> > There are [perhaps] unintuitive implications where clearing a
> single
> > > task
> > > >> > would then re-run the old DAG definition on that task (since the
> > > version
> > > >> > was stamped in the DagRun and hasn't changed), but
> > > deleting/recreating a
> > > >> > DagRun would run the latest version (or any other version that may
> > be
> > > >> > specified for that matter).
> > > >> >
> > > >> > I'm unclear on how much work that represents exactly, but it's
> > > certainly
> > > >> > doable and may only require to change part of the DagBag class
> and a
> > > few
> > > >> > other places.
> > > >> >
> > > >> > Max
> > > >> >
> > > >> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <
> dcapwell@gmail.com>
> > > >> wrote:
> > > >> >
> > > >> > > Thanks for your feedback!
> > > >> > >
> > > >> > > Option 1 is a non-starter for us. The reason is we have DAGs
> that
> > > take
> > > >> 9+
> > > >> > > hours to run.
> > > >> > >
> > > >> > > Option 2 is more where my mind was going, but it's rather large.
> > > How I
> > > >> > see
> > > >> > > it you need a MVCC DagBag that's aware of multiple versions
> (what
> > > >> > provides
> > > >> > > version?).  Assuming you can track active dag runs pointing to
> > which
> > > >> > > versions you know how to cleanup (fine with external).  The pro
> > > here is
> > > >> > you
> > > >> > > have snapshot isolation for dag_run, con is more bookkeeping and
> > > >> require
> > > >> > > deploy to work with this (last part may be a good thing though).
> > > >> > >
> > > >> > > The only other option I can think of is to lock deploy so the
> > system
> > > >> only
> > > >> > > picks up new versions when no dag_run holds the lock.  This is
> > > flawed
> > > >> for
> > > >> > > many reasons, but breaks horrible for dag_runs that takes
> minutes
> > (I
> > > >> > assume
> > > >> > > 99% do).
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <jo...@wepay.com> wrote:
> > > >> > >
> > > >> > > > Hi David!
> > > >> > > >
> > > >> > > > Thank you for clarifying, I think I understand your concern
> now.
> > > We
> > > >> > > > currently also work around this by making sure a dag is turned
> > off
> > > >> > > > when we deploy a new version. We also make sure our jobs are
> > > >> > > > idempotent and retry-enabled in the case when we forget to
> turn
> > > off
> > > >> > > > the job, so the issue hasn't caused us too much headache.
> > > >> > > >
> > > >> > > > I do agree that it would be nice for Airflow to have the
> option
> > to
> > > >> > > > guarantee a single version of dag per dag run. I see two
> > > approaches:
> > > >> > > >
> > > >> > > > (1) If a dag is updated, the current dagrun fails and/or
> > retries.
> > > >> > > > (2) If a dag is updated, the current dagrun continues but uses
> > > >> version
> > > >> > > > before the update.
> > > >> > > >
> > > >> > > > (1) requires some mechanism to compare dag generations. One
> > > option is
> > > >> > > > to hash the dagfile and storing that value to the dagrun
> table,
> > > and
> > > >> > > > compare against it each time a task is running. And in the
> case
> > if
> > > >> the
> > > >> > > > hash value is different, update the hash value, then
> fail/retry
> > > the
> > > >> > > > dag. I think this is a fairly safe approach.
> > > >> > > >
> > > >> > > > (2) is trickier. A dag only has a property "fileloc" which
> > tracks
> > > the
> > > >> > > > location of the dag file, but the actual content of the dag
> file
> > > is
> > > >> > > > never versioned. When a task instance starts running, it
> > > dynamically
> > > >> > > > re-processes the dag file specified by the fileloc, generate
> all
> > > the
> > > >> > > > task objects from the dag file, and fetch the task object by
> > > task_id
> > > >> > > > in order to execute it. So in order to guarantee each dagrun
> to
> > > run a
> > > >> > > > specific version, previous versions must be maintained on disk
> > > >> somehow
> > > >> > > > (maintaining this information in memory is difficult, since if
> > the
> > > >> > > > scheduler/worker shuts down, that information is lost). This
> > > makes it
> > > >> > > > a pretty big change, and I haven't thought much on how to
> > > implement
> > > >> > > > it.
> > > >> > > >
> > > >> > > > I'm personally leaning towards (1) for sake of simplicity.
> Note
> > > that
> > > >> > > > some users may not want dag to fail/retry even when dag is
> > > updated,
> > > >> so
> > > >> > > > this should be an optional feature, not required.
> > > >> > > >
> > > >> > > > My scheduler-foo isn't that great, so curious what others have
> > to
> > > say
> > > >> > > > about this.
> > > >> > > >
> > > >> > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <
> > > dcapwell@gmail.com>
> > > >> > > wrote:
> > > >> > > > > Thanks for the reply Joy, let me walk you though things as
> > they
> > > are
> > > >> > > today
> > > >> > > > >
> > > >> > > > > 1) we don't stop airflow or disable DAGs while deploying
> > > updates to
> > > >> > > > logic,
> > > >> > > > > this is done live once its released
> > > >> > > > > 2) the python script in the DAG folder doesn't actually have
> > > DAGs
> > > >> in
> > > >> > it
> > > >> > > > but
> > > >> > > > > is a shim layer to allow us to deploy in a atomic way for a
> > > single
> > > >> > host
> > > >> > > > >   2.1) this script reads a file on local disk (less than
> disk
> > > page
> > > >> > > size)
> > > >> > > > to
> > > >> > > > > find latest git commit deployed
> > > >> > > > >   2.2) re-does the airflow DAG load process but pointing to
> > the
> > > git
> > > >> > > > commit
> > > >> > > > > path
> > > >> > > > >
> > > >> > > > > Example directory structure
> > > >> > > > >
> > > >> > > > > /airflow/dags/shim.py
> > > >> > > > > /airflow/real_dags/
> > > >> > > > >                             /latest # pointer to latest
> commit
> > > >> > > > >                             /[git commit]/
> > > >> > > > >
> > > >> > > > > This is how we make sure deploys are consistent within a
> > single
> > > >> task.
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > Now, lets assume we have a fully atomic commit process and
> are
> > > able
> > > >> > to
> > > >> > > > > upgrade DAGs at the exact same moment.
> > > >> > > > >
> > > >> > > > > At time T0 the scheduler knows of DAG V1 and schedules two
> > > tasks,
> > > >> > > Task1,
> > > >> > > > > and Task2
> > > >> > > > > At time T1 Task1 is picked up by Worker1, so starts
> executing
> > > the
> > > >> > task
> > > >> > > > (V1
> > > >> > > > > logic)
> > > >> > > > > At time T2 deploy commit happens, current DAG version: V2
> > > >> > > > > At time T3, Task2 is picked up by Worker2, so starts
> executing
> > > the
> > > >> > task
> > > >> > > > (V2
> > > >> > > > > logic)
> > > >> > > > >
> > > >> > > > > In many cases this isn't really a problem (tuning config
> > change
> > > to
> > > >> > > hadoop
> > > >> > > > > job), but as we have more people using Airflow this is
> > causing a
> > > >> lot
> > > >> > of
> > > >> > > > > time spent debugging why production acted differently than
> > > expected
> > > >> > > (the
> > > >> > > > > problem was already fixed... why is it still here?).  We
> also
> > > see
> > > >> > that
> > > >> > > > some
> > > >> > > > > tasks expect a given behavior from other tasks, and since
> they
> > > live
> > > >> > in
> > > >> > > > the
> > > >> > > > > same git repo they can modify both tasks at the same time
> if a
> > > >> > breaking
> > > >> > > > > change is needed, but when this rolls out to prod there
> isn't
> > a
> > > way
> > > >> > to
> > > >> > > do
> > > >> > > > > this other than turn off the DAG, and login to all hosts to
> > > verify
> > > >> > > fully
> > > >> > > > > deployed.
> > > >> > > > >
> > > >> > > > > We would like to remove this confusion and make
> > > >> generations/versions
> > > >> > > > (same
> > > >> > > > > thing really) exposed to users and make sure for a single
> > > dag_run
> > > >> > only
> > > >> > > > one
> > > >> > > > > version is used.
> > > >> > > > >
> > > >> > > > > I hope this is more clear.
> > > >> > > > >
> > > >> > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <jo...@wepay.com>
> > > wrote:
> > > >> > > > >
> > > >> > > > >> Hi David,
> > > >> > > > >>
> > > >> > > > >> Do you mind providing a concrete example of the scenario in
> > > which
> > > >> > > > >> scheduler/workers see different states (I'm not 100% sure
> if
> > I
> > > >> > > > understood
> > > >> > > > >> the issue at hand).
> > > >> > > > >>
> > > >> > > > >> And by same dag generation, are you referring to the dag
> > > version?
> > > >> > (DAG
> > > >> > > > >> version is currently not supported at all, but I can see it
> > > being
> > > >> a
> > > >> > > > >> building block for future use cases).
> > > >> > > > >>
> > > >> > > > >> Joy
> > > >> > > > >>
> > > >> > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell <
> > > >> dcapwell@gmail.com>
> > > >> > > > wrote:
> > > >> > > > >>
> > > >> > > > >> > My current thinking is to add a field to the dag table
> that
> > > is
> > > >> > > > optional
> > > >> > > > >> and
> > > >> > > > >> > provided by the dag. We currently intercept the load path
> > do
> > > >> could
> > > >> > > use
> > > >> > > > >> this
> > > >> > > > >> > field to make sure we load the same generation.  My
> concern
> > > here
> > > >> > is
> > > >> > > > the
> > > >> > > > >> > interaction with the scheduler, not as familiar with that
> > > logic
> > > >> to
> > > >> > > > >> predict
> > > >> > > > >> > corner cases were this would fail.
> > > >> > > > >> >
> > > >> > > > >> > Any other recommendations for how this could be done?
> > > >> > > > >> >
> > > >> > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell <
> > > >> dcapwell@gmail.com>
> > > >> > > > wrote:
> > > >> > > > >> >
> > > >> > > > >> > > We have been using airflow for logic that delegates to
> > > other
> > > >> > > > systems so
> > > >> > > > >> > > inject a task all tasks depends to make sure all
> > resources
> > > >> used
> > > >> > > are
> > > >> > > > the
> > > >> > > > >> > > same for all tasks in the dag. This works well for
> tasks
> > > that
> > > >> > > > delegates
> > > >> > > > >> > to
> > > >> > > > >> > > external systems but people are starting to need to run
> > > logic
> > > >> in
> > > >> > > > >> airflow
> > > >> > > > >> > > and the fact that scheduler and all workers can see
> > > different
> > > >> > > > states is
> > > >> > > > >> > > causing issues
> > > >> > > > >> > >
> > > >> > > > >> > > We can make sure that all the code is deployed in a
> > > consistent
> > > >> > way
> > > >> > > > but
> > > >> > > > >> > > need help from the scheduler to tell the workers the
> > > current
> > > >> > > > generation
> > > >> > > > >> > for
> > > >> > > > >> > > a DAG.
> > > >> > > > >> > >
> > > >> > > > >> > > My question is, what would be the best way to modify
> > > airflow
> > > >> to
> > > >> > > > allow
> > > >> > > > >> > DAGs
> > > >> > > > >> > > to define a generation value that the scheduler could
> > send
> > > to
> > > >> > > > workers?
> > > >> > > > >> > >
> > > >> > > > >> > > Thanks
> > > >> > > > >> > >
> > > >> > > > >> >
> > > >> > > > >>
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> > >
> >
>

Re: How to add hooks for strong deployment consistency?

Posted by Maxime Beauchemin <ma...@gmail.com>.
I'm curious to hear which DagFetcher abstraction people would build or want
to use.

So far it sounded like the most popular and flexible approach would be a
`GitDagFetcher` where all SHAs and refs become a possibility, as opposed to
say a TarballOnS3DagFetcher which would require more manual artifact
management and versioning, which represent additional [human] workflow on
top of the already existing git-based workflow.

One way I've seen this done before is by using this Git fuse (file system
in user space) hack that creates a virtual filesystem where all SHAs and
refs in the Git repo are exposed as a subfolder, and under each ref
subfolder the whole repo sits as of that ref. Of course all the files are
virtual and fetched at access time by the virtual filesystem using the git
api. So if you simply point the DagBag loader to the right [virtual]
directory, it will import the right version of the DAG. In the git world,
the alternative to that is managing temp folders and doing shallow clones
which seems like much more of a headache. Note that one tradeoff is that if
git and whatever it depends has then a need to be highly available.

Max

On Wed, Feb 28, 2018 at 6:55 PM, David Capwell <dc...@gmail.com> wrote:

> Thanks for all the details! With a pluggable fetcher we would be able to
> add our own logic for how to fetch so sounds like a good place to start for
> something like this!
>
> On Wed, Feb 28, 2018, 4:39 PM Joy Gao <jo...@wepay.com> wrote:
>
> > +1 on DagFetcher abstraction, very airflow-esque :)
> >
> > On Wed, Feb 28, 2018 at 11:25 AM, Maxime Beauchemin
> > <ma...@gmail.com> wrote:
> > > Addressing a few of your questions / concerns:
> > >
> > > * The scheduler uses a multiprocess queue to queue up tasks, each
> > > subprocess is in charge of a single DAG "scheduler cycle" which
> triggers
> > > what it can for active DagRuns. Currently it fills the DagBag from the
> > > local file system, looking for a specific module where the master
> process
> > > last saw that DAG. Fetching the DAG is a metadata operation, DAG
> > artifacts
> > > shouldn't be too large, we can assume that it takes seconds at most to
> > > fetch a DAG, which is ok. We generally assume that the scheduler should
> > > fully cycle every minute or so. Version-aware DagFetcher could also
> > > implement some sort of caching if that was a concern (shouldn't be
> > though).
> > > * For consistency within the whole DagRun, the scheduler absolutely has
> > to
> > > read the right version. If tasks got removed they would never get
> > scheduled
> > > and consistency cannot be achieved.
> > > * TaskInstances get created the first time they are identified as
> > runnable
> > > by the scheduler and are born with a queued status I believe (from
> > memory,
> > > haven't read the latest code to confirm). The worker double checks and
> > sets
> > > it as running as part of a database transaction to avoid double-firing.
> > >
> > > Max
> > >
> > > On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <ch...@crpalmer.com>
> > wrote:
> > >
> > >> I'll preface this with the fact that I'm relatively new to Airflow,
> and
> > >> haven't played around with a lot of the internals.
> > >>
> > >> I find the idea of a DagFetcher interesting but would we worry about
> > >> slowing down the scheduler significantly? If the scheduler is having
> to
> > >> "fetch" multiple different DAG versions, be it git refs or artifacts
> > from
> > >> Artifactory, we are talking about adding significant time to each
> > scheduler
> > >> run. Also how would the scheduler know which DAGs to fetch from where
> if
> > >> there aren't local files on disk listing those DAGs? Maybe I'm missing
> > >> something in the implementation.
> > >>
> > >> It seems to me that the fetching of the different versions should be
> > >> delegated to the Task (or TaskInstance) itself. That ensures we only
> > spend
> > >> the time to "fetch" the version that is needed when it is needed. One
> > down
> > >> side might be that each TaskInstance running for the same version of
> the
> > >> DAG might end up doing the "fetch" independently (duplicating that
> > work).
> > >>
> > >> I think this could be done by adding some version attribute to the
> > DagRun
> > >> that gets set at creation, and have the scheduler pass that version to
> > the
> > >> TaskInstances when they are created. You could even extend this so
> that
> > you
> > >> could have an arbitrary set of "executor_parameters" that get set on a
> > >> DagRun and are passed to TaskInstances. Then the specific Executor
> class
> > >> that is running that TaskInstance could handle the
> > "executor_parameters" as
> > >> it sees fit.
> > >>
> > >> One thing I'm not clear on is how and when TaskInstances are created.
> > When
> > >> the scheduler first sees a specific DagRun do all the TaskInstances
> get
> > >> created immediately, but only some of them get queued? Or does the
> > >> scheduler only create those TaskInstances which can be queued right
> now?
> > >>
> > >> In particular if a DagRun gets created and while it is running the DAG
> > is
> > >> updated and a new Task is added, will the scheduler pick up that new
> > Task
> > >> for the running DagRun? If the answer is yes, then my suggestion above
> > >> would run the risk of scheduling a Task for a DAG version where that
> > Task
> > >> didn't exist. I'm sure you could handle that somewhat gracefully but
> > it's a
> > >> bit ugly.
> > >>
> > >> Chris
> > >>
> > >> On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin <
> > >> maximebeauchemin@gmail.com> wrote:
> > >>
> > >> > At a higher level I want to say a few things about the idea of
> > enforcing
> > >> > version consistency within a DagRun.
> > >> >
> > >> > One thing we've been talking about is the need for a "DagFetcher"
> > >> > abstraction, where it's first implementation that would replace and
> > mimic
> > >> > the current one would be "FileSystemDagFetcher". One specific
> > DagFetcher
> > >> > implementation may or may not support version semantics, but if it
> > does
> > >> > should be able to receive a version id and return the proper version
> > of
> > >> the
> > >> > DAG object. For instance that first "FileSystemDagFetcher" would not
> > >> > support version semantic, but perhaps a "GitRepoDagFetcher" would,
> or
> > an
> > >> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well.
> > >> >
> > >> > Of course that assumes that the scheduler knows and stores the
> active
> > >> > version number when generating a new DagRun, and for that
> information
> > to
> > >> be
> > >> > leveraged on subsequent scheduler cycles and on workers when task
> are
> > >> > executed.
> > >> >
> > >> > This could also enable things like "remote" backfills (non local,
> > >> > parallelized) of a DAG definition that's on an arbitrary git ref
> > >> (assuming
> > >> > a "GitRepoDagFetcher").
> > >> >
> > >> > There are [perhaps] unintuitive implications where clearing a single
> > task
> > >> > would then re-run the old DAG definition on that task (since the
> > version
> > >> > was stamped in the DagRun and hasn't changed), but
> > deleting/recreating a
> > >> > DagRun would run the latest version (or any other version that may
> be
> > >> > specified for that matter).
> > >> >
> > >> > I'm unclear on how much work that represents exactly, but it's
> > certainly
> > >> > doable and may only require to change part of the DagBag class and a
> > few
> > >> > other places.
> > >> >
> > >> > Max
> > >> >
> > >> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <dc...@gmail.com>
> > >> wrote:
> > >> >
> > >> > > Thanks for your feedback!
> > >> > >
> > >> > > Option 1 is a non-starter for us. The reason is we have DAGs that
> > take
> > >> 9+
> > >> > > hours to run.
> > >> > >
> > >> > > Option 2 is more where my mind was going, but it's rather large.
> > How I
> > >> > see
> > >> > > it you need a MVCC DagBag that's aware of multiple versions (what
> > >> > provides
> > >> > > version?).  Assuming you can track active dag runs pointing to
> which
> > >> > > versions you know how to cleanup (fine with external).  The pro
> > here is
> > >> > you
> > >> > > have snapshot isolation for dag_run, con is more bookkeeping and
> > >> require
> > >> > > deploy to work with this (last part may be a good thing though).
> > >> > >
> > >> > > The only other option I can think of is to lock deploy so the
> system
> > >> only
> > >> > > picks up new versions when no dag_run holds the lock.  This is
> > flawed
> > >> for
> > >> > > many reasons, but breaks horrible for dag_runs that takes minutes
> (I
> > >> > assume
> > >> > > 99% do).
> > >> > >
> > >> > >
> > >> > >
> > >> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <jo...@wepay.com> wrote:
> > >> > >
> > >> > > > Hi David!
> > >> > > >
> > >> > > > Thank you for clarifying, I think I understand your concern now.
> > We
> > >> > > > currently also work around this by making sure a dag is turned
> off
> > >> > > > when we deploy a new version. We also make sure our jobs are
> > >> > > > idempotent and retry-enabled in the case when we forget to turn
> > off
> > >> > > > the job, so the issue hasn't caused us too much headache.
> > >> > > >
> > >> > > > I do agree that it would be nice for Airflow to have the option
> to
> > >> > > > guarantee a single version of dag per dag run. I see two
> > approaches:
> > >> > > >
> > >> > > > (1) If a dag is updated, the current dagrun fails and/or
> retries.
> > >> > > > (2) If a dag is updated, the current dagrun continues but uses
> > >> version
> > >> > > > before the update.
> > >> > > >
> > >> > > > (1) requires some mechanism to compare dag generations. One
> > option is
> > >> > > > to hash the dagfile and storing that value to the dagrun table,
> > and
> > >> > > > compare against it each time a task is running. And in the case
> if
> > >> the
> > >> > > > hash value is different, update the hash value, then fail/retry
> > the
> > >> > > > dag. I think this is a fairly safe approach.
> > >> > > >
> > >> > > > (2) is trickier. A dag only has a property "fileloc" which
> tracks
> > the
> > >> > > > location of the dag file, but the actual content of the dag file
> > is
> > >> > > > never versioned. When a task instance starts running, it
> > dynamically
> > >> > > > re-processes the dag file specified by the fileloc, generate all
> > the
> > >> > > > task objects from the dag file, and fetch the task object by
> > task_id
> > >> > > > in order to execute it. So in order to guarantee each dagrun to
> > run a
> > >> > > > specific version, previous versions must be maintained on disk
> > >> somehow
> > >> > > > (maintaining this information in memory is difficult, since if
> the
> > >> > > > scheduler/worker shuts down, that information is lost). This
> > makes it
> > >> > > > a pretty big change, and I haven't thought much on how to
> > implement
> > >> > > > it.
> > >> > > >
> > >> > > > I'm personally leaning towards (1) for sake of simplicity. Note
> > that
> > >> > > > some users may not want dag to fail/retry even when dag is
> > updated,
> > >> so
> > >> > > > this should be an optional feature, not required.
> > >> > > >
> > >> > > > My scheduler-foo isn't that great, so curious what others have
> to
> > say
> > >> > > > about this.
> > >> > > >
> > >> > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <
> > dcapwell@gmail.com>
> > >> > > wrote:
> > >> > > > > Thanks for the reply Joy, let me walk you though things as
> they
> > are
> > >> > > today
> > >> > > > >
> > >> > > > > 1) we don't stop airflow or disable DAGs while deploying
> > updates to
> > >> > > > logic,
> > >> > > > > this is done live once its released
> > >> > > > > 2) the python script in the DAG folder doesn't actually have
> > DAGs
> > >> in
> > >> > it
> > >> > > > but
> > >> > > > > is a shim layer to allow us to deploy in a atomic way for a
> > single
> > >> > host
> > >> > > > >   2.1) this script reads a file on local disk (less than disk
> > page
> > >> > > size)
> > >> > > > to
> > >> > > > > find latest git commit deployed
> > >> > > > >   2.2) re-does the airflow DAG load process but pointing to
> the
> > git
> > >> > > > commit
> > >> > > > > path
> > >> > > > >
> > >> > > > > Example directory structure
> > >> > > > >
> > >> > > > > /airflow/dags/shim.py
> > >> > > > > /airflow/real_dags/
> > >> > > > >                             /latest # pointer to latest commit
> > >> > > > >                             /[git commit]/
> > >> > > > >
> > >> > > > > This is how we make sure deploys are consistent within a
> single
> > >> task.
> > >> > > > >
> > >> > > > >
> > >> > > > > Now, lets assume we have a fully atomic commit process and are
> > able
> > >> > to
> > >> > > > > upgrade DAGs at the exact same moment.
> > >> > > > >
> > >> > > > > At time T0 the scheduler knows of DAG V1 and schedules two
> > tasks,
> > >> > > Task1,
> > >> > > > > and Task2
> > >> > > > > At time T1 Task1 is picked up by Worker1, so starts executing
> > the
> > >> > task
> > >> > > > (V1
> > >> > > > > logic)
> > >> > > > > At time T2 deploy commit happens, current DAG version: V2
> > >> > > > > At time T3, Task2 is picked up by Worker2, so starts executing
> > the
> > >> > task
> > >> > > > (V2
> > >> > > > > logic)
> > >> > > > >
> > >> > > > > In many cases this isn't really a problem (tuning config
> change
> > to
> > >> > > hadoop
> > >> > > > > job), but as we have more people using Airflow this is
> causing a
> > >> lot
> > >> > of
> > >> > > > > time spent debugging why production acted differently than
> > expected
> > >> > > (the
> > >> > > > > problem was already fixed... why is it still here?).  We also
> > see
> > >> > that
> > >> > > > some
> > >> > > > > tasks expect a given behavior from other tasks, and since they
> > live
> > >> > in
> > >> > > > the
> > >> > > > > same git repo they can modify both tasks at the same time if a
> > >> > breaking
> > >> > > > > change is needed, but when this rolls out to prod there isn't
> a
> > way
> > >> > to
> > >> > > do
> > >> > > > > this other than turn off the DAG, and login to all hosts to
> > verify
> > >> > > fully
> > >> > > > > deployed.
> > >> > > > >
> > >> > > > > We would like to remove this confusion and make
> > >> generations/versions
> > >> > > > (same
> > >> > > > > thing really) exposed to users and make sure for a single
> > dag_run
> > >> > only
> > >> > > > one
> > >> > > > > version is used.
> > >> > > > >
> > >> > > > > I hope this is more clear.
> > >> > > > >
> > >> > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <jo...@wepay.com>
> > wrote:
> > >> > > > >
> > >> > > > >> Hi David,
> > >> > > > >>
> > >> > > > >> Do you mind providing a concrete example of the scenario in
> > which
> > >> > > > >> scheduler/workers see different states (I'm not 100% sure if
> I
> > >> > > > understood
> > >> > > > >> the issue at hand).
> > >> > > > >>
> > >> > > > >> And by same dag generation, are you referring to the dag
> > version?
> > >> > (DAG
> > >> > > > >> version is currently not supported at all, but I can see it
> > being
> > >> a
> > >> > > > >> building block for future use cases).
> > >> > > > >>
> > >> > > > >> Joy
> > >> > > > >>
> > >> > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell <
> > >> dcapwell@gmail.com>
> > >> > > > wrote:
> > >> > > > >>
> > >> > > > >> > My current thinking is to add a field to the dag table that
> > is
> > >> > > > optional
> > >> > > > >> and
> > >> > > > >> > provided by the dag. We currently intercept the load path
> do
> > >> could
> > >> > > use
> > >> > > > >> this
> > >> > > > >> > field to make sure we load the same generation.  My concern
> > here
> > >> > is
> > >> > > > the
> > >> > > > >> > interaction with the scheduler, not as familiar with that
> > logic
> > >> to
> > >> > > > >> predict
> > >> > > > >> > corner cases were this would fail.
> > >> > > > >> >
> > >> > > > >> > Any other recommendations for how this could be done?
> > >> > > > >> >
> > >> > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell <
> > >> dcapwell@gmail.com>
> > >> > > > wrote:
> > >> > > > >> >
> > >> > > > >> > > We have been using airflow for logic that delegates to
> > other
> > >> > > > systems so
> > >> > > > >> > > inject a task all tasks depends to make sure all
> resources
> > >> used
> > >> > > are
> > >> > > > the
> > >> > > > >> > > same for all tasks in the dag. This works well for tasks
> > that
> > >> > > > delegates
> > >> > > > >> > to
> > >> > > > >> > > external systems but people are starting to need to run
> > logic
> > >> in
> > >> > > > >> airflow
> > >> > > > >> > > and the fact that scheduler and all workers can see
> > different
> > >> > > > states is
> > >> > > > >> > > causing issues
> > >> > > > >> > >
> > >> > > > >> > > We can make sure that all the code is deployed in a
> > consistent
> > >> > way
> > >> > > > but
> > >> > > > >> > > need help from the scheduler to tell the workers the
> > current
> > >> > > > generation
> > >> > > > >> > for
> > >> > > > >> > > a DAG.
> > >> > > > >> > >
> > >> > > > >> > > My question is, what would be the best way to modify
> > airflow
> > >> to
> > >> > > > allow
> > >> > > > >> > DAGs
> > >> > > > >> > > to define a generation value that the scheduler could
> send
> > to
> > >> > > > workers?
> > >> > > > >> > >
> > >> > > > >> > > Thanks
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >>
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> >
>

Re: How to add hooks for strong deployment consistency?

Posted by David Capwell <dc...@gmail.com>.
Thanks for all the details! With a pluggable fetcher we would be able to
add our own logic for how to fetch so sounds like a good place to start for
something like this!

On Wed, Feb 28, 2018, 4:39 PM Joy Gao <jo...@wepay.com> wrote:

> +1 on DagFetcher abstraction, very airflow-esque :)
>
> On Wed, Feb 28, 2018 at 11:25 AM, Maxime Beauchemin
> <ma...@gmail.com> wrote:
> > Addressing a few of your questions / concerns:
> >
> > * The scheduler uses a multiprocess queue to queue up tasks, each
> > subprocess is in charge of a single DAG "scheduler cycle" which triggers
> > what it can for active DagRuns. Currently it fills the DagBag from the
> > local file system, looking for a specific module where the master process
> > last saw that DAG. Fetching the DAG is a metadata operation, DAG
> artifacts
> > shouldn't be too large, we can assume that it takes seconds at most to
> > fetch a DAG, which is ok. We generally assume that the scheduler should
> > fully cycle every minute or so. Version-aware DagFetcher could also
> > implement some sort of caching if that was a concern (shouldn't be
> though).
> > * For consistency within the whole DagRun, the scheduler absolutely has
> to
> > read the right version. If tasks got removed they would never get
> scheduled
> > and consistency cannot be achieved.
> > * TaskInstances get created the first time they are identified as
> runnable
> > by the scheduler and are born with a queued status I believe (from
> memory,
> > haven't read the latest code to confirm). The worker double checks and
> sets
> > it as running as part of a database transaction to avoid double-firing.
> >
> > Max
> >
> > On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <ch...@crpalmer.com>
> wrote:
> >
> >> I'll preface this with the fact that I'm relatively new to Airflow, and
> >> haven't played around with a lot of the internals.
> >>
> >> I find the idea of a DagFetcher interesting but would we worry about
> >> slowing down the scheduler significantly? If the scheduler is having to
> >> "fetch" multiple different DAG versions, be it git refs or artifacts
> from
> >> Artifactory, we are talking about adding significant time to each
> scheduler
> >> run. Also how would the scheduler know which DAGs to fetch from where if
> >> there aren't local files on disk listing those DAGs? Maybe I'm missing
> >> something in the implementation.
> >>
> >> It seems to me that the fetching of the different versions should be
> >> delegated to the Task (or TaskInstance) itself. That ensures we only
> spend
> >> the time to "fetch" the version that is needed when it is needed. One
> down
> >> side might be that each TaskInstance running for the same version of the
> >> DAG might end up doing the "fetch" independently (duplicating that
> work).
> >>
> >> I think this could be done by adding some version attribute to the
> DagRun
> >> that gets set at creation, and have the scheduler pass that version to
> the
> >> TaskInstances when they are created. You could even extend this so that
> you
> >> could have an arbitrary set of "executor_parameters" that get set on a
> >> DagRun and are passed to TaskInstances. Then the specific Executor class
> >> that is running that TaskInstance could handle the
> "executor_parameters" as
> >> it sees fit.
> >>
> >> One thing I'm not clear on is how and when TaskInstances are created.
> When
> >> the scheduler first sees a specific DagRun do all the TaskInstances get
> >> created immediately, but only some of them get queued? Or does the
> >> scheduler only create those TaskInstances which can be queued right now?
> >>
> >> In particular if a DagRun gets created and while it is running the DAG
> is
> >> updated and a new Task is added, will the scheduler pick up that new
> Task
> >> for the running DagRun? If the answer is yes, then my suggestion above
> >> would run the risk of scheduling a Task for a DAG version where that
> Task
> >> didn't exist. I'm sure you could handle that somewhat gracefully but
> it's a
> >> bit ugly.
> >>
> >> Chris
> >>
> >> On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin <
> >> maximebeauchemin@gmail.com> wrote:
> >>
> >> > At a higher level I want to say a few things about the idea of
> enforcing
> >> > version consistency within a DagRun.
> >> >
> >> > One thing we've been talking about is the need for a "DagFetcher"
> >> > abstraction, where it's first implementation that would replace and
> mimic
> >> > the current one would be "FileSystemDagFetcher". One specific
> DagFetcher
> >> > implementation may or may not support version semantics, but if it
> does
> >> > should be able to receive a version id and return the proper version
> of
> >> the
> >> > DAG object. For instance that first "FileSystemDagFetcher" would not
> >> > support version semantic, but perhaps a "GitRepoDagFetcher" would, or
> an
> >> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well.
> >> >
> >> > Of course that assumes that the scheduler knows and stores the active
> >> > version number when generating a new DagRun, and for that information
> to
> >> be
> >> > leveraged on subsequent scheduler cycles and on workers when task are
> >> > executed.
> >> >
> >> > This could also enable things like "remote" backfills (non local,
> >> > parallelized) of a DAG definition that's on an arbitrary git ref
> >> (assuming
> >> > a "GitRepoDagFetcher").
> >> >
> >> > There are [perhaps] unintuitive implications where clearing a single
> task
> >> > would then re-run the old DAG definition on that task (since the
> version
> >> > was stamped in the DagRun and hasn't changed), but
> deleting/recreating a
> >> > DagRun would run the latest version (or any other version that may be
> >> > specified for that matter).
> >> >
> >> > I'm unclear on how much work that represents exactly, but it's
> certainly
> >> > doable and may only require to change part of the DagBag class and a
> few
> >> > other places.
> >> >
> >> > Max
> >> >
> >> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <dc...@gmail.com>
> >> wrote:
> >> >
> >> > > Thanks for your feedback!
> >> > >
> >> > > Option 1 is a non-starter for us. The reason is we have DAGs that
> take
> >> 9+
> >> > > hours to run.
> >> > >
> >> > > Option 2 is more where my mind was going, but it's rather large.
> How I
> >> > see
> >> > > it you need a MVCC DagBag that's aware of multiple versions (what
> >> > provides
> >> > > version?).  Assuming you can track active dag runs pointing to which
> >> > > versions you know how to cleanup (fine with external).  The pro
> here is
> >> > you
> >> > > have snapshot isolation for dag_run, con is more bookkeeping and
> >> require
> >> > > deploy to work with this (last part may be a good thing though).
> >> > >
> >> > > The only other option I can think of is to lock deploy so the system
> >> only
> >> > > picks up new versions when no dag_run holds the lock.  This is
> flawed
> >> for
> >> > > many reasons, but breaks horrible for dag_runs that takes minutes (I
> >> > assume
> >> > > 99% do).
> >> > >
> >> > >
> >> > >
> >> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <jo...@wepay.com> wrote:
> >> > >
> >> > > > Hi David!
> >> > > >
> >> > > > Thank you for clarifying, I think I understand your concern now.
> We
> >> > > > currently also work around this by making sure a dag is turned off
> >> > > > when we deploy a new version. We also make sure our jobs are
> >> > > > idempotent and retry-enabled in the case when we forget to turn
> off
> >> > > > the job, so the issue hasn't caused us too much headache.
> >> > > >
> >> > > > I do agree that it would be nice for Airflow to have the option to
> >> > > > guarantee a single version of dag per dag run. I see two
> approaches:
> >> > > >
> >> > > > (1) If a dag is updated, the current dagrun fails and/or retries.
> >> > > > (2) If a dag is updated, the current dagrun continues but uses
> >> version
> >> > > > before the update.
> >> > > >
> >> > > > (1) requires some mechanism to compare dag generations. One
> option is
> >> > > > to hash the dagfile and storing that value to the dagrun table,
> and
> >> > > > compare against it each time a task is running. And in the case if
> >> the
> >> > > > hash value is different, update the hash value, then fail/retry
> the
> >> > > > dag. I think this is a fairly safe approach.
> >> > > >
> >> > > > (2) is trickier. A dag only has a property "fileloc" which tracks
> the
> >> > > > location of the dag file, but the actual content of the dag file
> is
> >> > > > never versioned. When a task instance starts running, it
> dynamically
> >> > > > re-processes the dag file specified by the fileloc, generate all
> the
> >> > > > task objects from the dag file, and fetch the task object by
> task_id
> >> > > > in order to execute it. So in order to guarantee each dagrun to
> run a
> >> > > > specific version, previous versions must be maintained on disk
> >> somehow
> >> > > > (maintaining this information in memory is difficult, since if the
> >> > > > scheduler/worker shuts down, that information is lost). This
> makes it
> >> > > > a pretty big change, and I haven't thought much on how to
> implement
> >> > > > it.
> >> > > >
> >> > > > I'm personally leaning towards (1) for sake of simplicity. Note
> that
> >> > > > some users may not want dag to fail/retry even when dag is
> updated,
> >> so
> >> > > > this should be an optional feature, not required.
> >> > > >
> >> > > > My scheduler-foo isn't that great, so curious what others have to
> say
> >> > > > about this.
> >> > > >
> >> > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <
> dcapwell@gmail.com>
> >> > > wrote:
> >> > > > > Thanks for the reply Joy, let me walk you though things as they
> are
> >> > > today
> >> > > > >
> >> > > > > 1) we don't stop airflow or disable DAGs while deploying
> updates to
> >> > > > logic,
> >> > > > > this is done live once its released
> >> > > > > 2) the python script in the DAG folder doesn't actually have
> DAGs
> >> in
> >> > it
> >> > > > but
> >> > > > > is a shim layer to allow us to deploy in a atomic way for a
> single
> >> > host
> >> > > > >   2.1) this script reads a file on local disk (less than disk
> page
> >> > > size)
> >> > > > to
> >> > > > > find latest git commit deployed
> >> > > > >   2.2) re-does the airflow DAG load process but pointing to the
> git
> >> > > > commit
> >> > > > > path
> >> > > > >
> >> > > > > Example directory structure
> >> > > > >
> >> > > > > /airflow/dags/shim.py
> >> > > > > /airflow/real_dags/
> >> > > > >                             /latest # pointer to latest commit
> >> > > > >                             /[git commit]/
> >> > > > >
> >> > > > > This is how we make sure deploys are consistent within a single
> >> task.
> >> > > > >
> >> > > > >
> >> > > > > Now, lets assume we have a fully atomic commit process and are
> able
> >> > to
> >> > > > > upgrade DAGs at the exact same moment.
> >> > > > >
> >> > > > > At time T0 the scheduler knows of DAG V1 and schedules two
> tasks,
> >> > > Task1,
> >> > > > > and Task2
> >> > > > > At time T1 Task1 is picked up by Worker1, so starts executing
> the
> >> > task
> >> > > > (V1
> >> > > > > logic)
> >> > > > > At time T2 deploy commit happens, current DAG version: V2
> >> > > > > At time T3, Task2 is picked up by Worker2, so starts executing
> the
> >> > task
> >> > > > (V2
> >> > > > > logic)
> >> > > > >
> >> > > > > In many cases this isn't really a problem (tuning config change
> to
> >> > > hadoop
> >> > > > > job), but as we have more people using Airflow this is causing a
> >> lot
> >> > of
> >> > > > > time spent debugging why production acted differently than
> expected
> >> > > (the
> >> > > > > problem was already fixed... why is it still here?).  We also
> see
> >> > that
> >> > > > some
> >> > > > > tasks expect a given behavior from other tasks, and since they
> live
> >> > in
> >> > > > the
> >> > > > > same git repo they can modify both tasks at the same time if a
> >> > breaking
> >> > > > > change is needed, but when this rolls out to prod there isn't a
> way
> >> > to
> >> > > do
> >> > > > > this other than turn off the DAG, and login to all hosts to
> verify
> >> > > fully
> >> > > > > deployed.
> >> > > > >
> >> > > > > We would like to remove this confusion and make
> >> generations/versions
> >> > > > (same
> >> > > > > thing really) exposed to users and make sure for a single
> dag_run
> >> > only
> >> > > > one
> >> > > > > version is used.
> >> > > > >
> >> > > > > I hope this is more clear.
> >> > > > >
> >> > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <jo...@wepay.com>
> wrote:
> >> > > > >
> >> > > > >> Hi David,
> >> > > > >>
> >> > > > >> Do you mind providing a concrete example of the scenario in
> which
> >> > > > >> scheduler/workers see different states (I'm not 100% sure if I
> >> > > > understood
> >> > > > >> the issue at hand).
> >> > > > >>
> >> > > > >> And by same dag generation, are you referring to the dag
> version?
> >> > (DAG
> >> > > > >> version is currently not supported at all, but I can see it
> being
> >> a
> >> > > > >> building block for future use cases).
> >> > > > >>
> >> > > > >> Joy
> >> > > > >>
> >> > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell <
> >> dcapwell@gmail.com>
> >> > > > wrote:
> >> > > > >>
> >> > > > >> > My current thinking is to add a field to the dag table that
> is
> >> > > > optional
> >> > > > >> and
> >> > > > >> > provided by the dag. We currently intercept the load path do
> >> could
> >> > > use
> >> > > > >> this
> >> > > > >> > field to make sure we load the same generation.  My concern
> here
> >> > is
> >> > > > the
> >> > > > >> > interaction with the scheduler, not as familiar with that
> logic
> >> to
> >> > > > >> predict
> >> > > > >> > corner cases were this would fail.
> >> > > > >> >
> >> > > > >> > Any other recommendations for how this could be done?
> >> > > > >> >
> >> > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell <
> >> dcapwell@gmail.com>
> >> > > > wrote:
> >> > > > >> >
> >> > > > >> > > We have been using airflow for logic that delegates to
> other
> >> > > > systems so
> >> > > > >> > > inject a task all tasks depends to make sure all resources
> >> used
> >> > > are
> >> > > > the
> >> > > > >> > > same for all tasks in the dag. This works well for tasks
> that
> >> > > > delegates
> >> > > > >> > to
> >> > > > >> > > external systems but people are starting to need to run
> logic
> >> in
> >> > > > >> airflow
> >> > > > >> > > and the fact that scheduler and all workers can see
> different
> >> > > > states is
> >> > > > >> > > causing issues
> >> > > > >> > >
> >> > > > >> > > We can make sure that all the code is deployed in a
> consistent
> >> > way
> >> > > > but
> >> > > > >> > > need help from the scheduler to tell the workers the
> current
> >> > > > generation
> >> > > > >> > for
> >> > > > >> > > a DAG.
> >> > > > >> > >
> >> > > > >> > > My question is, what would be the best way to modify
> airflow
> >> to
> >> > > > allow
> >> > > > >> > DAGs
> >> > > > >> > > to define a generation value that the scheduler could send
> to
> >> > > > workers?
> >> > > > >> > >
> >> > > > >> > > Thanks
> >> > > > >> > >
> >> > > > >> >
> >> > > > >>
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
>
>