You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Jarek Potiuk <Ja...@polidea.com> on 2020/08/02 07:47:39 UTC

Re: [PROPOSAL][AIP-36 DAG Versioning]

Few points from my sid (and proposal!):

1) Agree with Max -  with a rather strong NO for pickles (however,
indeed cloudpickle solves some of the problems). Pickles came up in
our discussion in Polidea recently and the overall message was "no". I
agree with Max here - if we can ship python code, turning that into
pickle for transit makes little sense to me and brings a plethora of
problems.

2) I think indeed the versioning solution should treat the "DagRun"
structure atomically. While I see why we would like to go with the
UI/Scheduler only first rather than implementing them in the workers,
adding the "mixed version" is where it breaks down IMHO. Reasoning
about such "mixed version" dag is next to impossible. The current
behavior is not well defined and non-deterministic (depends on
scheduler delays, syncing, type of deployment, restarts of the works
etc.) we are moving it up to UI (thus users) rather than solving the
problem. So I am not a big fan of this and would rather solve it
"well" with atomicity.

3) I see the point of Dan as well - we had many discussions and many
times the idea about "submitting" the DAG for execution via the API
came up - and it makes sense IMHO.

Proposal: Implement full versioning with code shipping via DB wheels
BLOB (akin to serialized DAGs).

I understand that the big issue is how to actually "ship" the code to
the worker. And - maybe a wild idea - we can kill several birds with
the same stone.

There were plenty of discussions on how we could do that but one was
never truly explored - using wheel packages.

For those who do not know them, there is the PEP:
https://www.python.org/dev/peps/pep-0427/

Wheels allow to "package" python code in a standard way. They are
portable ("purelib" + contain .py rather than .pyc code), they have
metadata, versioning information, they can be signed for security,
They can contain other packages or python code, Why don't we let
scheduler to pack the fingerprinted version of the DAG in a .whl and
store it as a blob in a DB next to the serialized form?

There were concerns about the size of the code to keep in the DB - but
we already use the DB for serialized DAGs and it works fine (I believe
we only need to add compressing of the JSon serialized form - as we've
learned from AirBnb during their talk at the Airflow Summit - wheels
are already compressed). Also - each task will only need the
particular "version" of one DAG so even if we keep many of them in the
DB, the old version will pretty soon go "cold" and will never be
retrieved (and most DBs will handle it well with caching/indexes).

And if we want to add "callables" from other files - there is nothing
to stop the person who defines dag to add list of files that should be
packaged together with the main DAG file (additional_python_files =
["common/my_fantastic_library.py"] in DAG constructor). Or we could
auto-add all files after the DAG gets imported (i.e. package
automatically all files that are imported for that particular DAG from
the "dags" folder"). That should be rather easy.

This way we could ship the code to workers for the exact version that
the DagRun uses. And they can be cached and unpacked/installed to a
virtualenv for the execution of that single task. That should be super
quick. Such virtualenv can be wiped out after execution.

Then we got what Max wants (atomicity of DagRuns) and what Dan wants
(the API could simply accept "Wheel file + the Dag id". We have the
isolation between tasks running on the same worker (based on
virtualenv) so that each process in the same worker can run a
different version of the same Dag. We have much less confusion for the
UI.

Extra bonus 1: we can expand it to package different dependencies in
the wheels as well - so that if an operator requires a different
(newer) version of a python library, it could be packaged together
with the DAG in the same .whl file. This is also a highly requested
feature.
Extra bonus 2: workers will stop depending on the DAG file mount (!)
which was our long term goal and indeed as Dan mentioned - a great
step towards multi-tenancy.

J.






On Fri, Jul 31, 2020 at 6:41 AM Maxime Beauchemin
<ma...@gmail.com> wrote:
>
> Having tried it early on, I'd advocate pretty strongly against pickles and
> would rather not get too deep into the why here. Short story is they can
> pull the entire memory space or much more than you want, and it's
> impossible to reason about where they end. For that reason and other
> reasons, they're a security issue. Oh and some objects are not picklable
> (Jinja templates! to name a problematic one...). I've also seen
> secret-related classes that raise when pickled (thank god!).
>
> About callback and other things like that, it's quite a puzzle in python.
> One solution would be to point to a python namespace
> callback="preset.airflow_utils.slack_callback" and assume the function has
> to exist in the remote interpreter. Personally I like the DagFetcher idea
> (it could be great to get a pointer to that mailing list thread here),
> specifically the GitDagFetcher. I don't know how [un]reasonable it is, but
> I hate pickles so much that shipping source code around seems much more
> reasonable to me. I think out there there's a talk from Mike Star about
> Dataswarm at FB and he may mention how their workers may git shallow clone
> the pipeline repo. Or maybe they use that "beautifully ugly" hack to use
> a gitfs fuse [file system in user space] on the worker [could get deeper
> into that, not sure how reasonable that is either].
>
> About fingerprints, a simple `start_date = datetime.now() - timedelta(1)`
> may lead to a never-repeating fingerprint. From memory the spec doesn't
> list out the properties considered to build the hash. It be helpful to
> specify and review that list.
>
> Max
>
> On Wed, Jul 29, 2020 at 5:20 AM Kaxil Naik <ka...@gmail.com> wrote:
>
> > Thanks, both Max and Dan for your comments, please check my reply below:
> >
> >
> > >  Personally I vote for a DAG version to be pinned and consistent for the
> > > duration of the DAG run. Some of the reasons why:
> > > - it's easier to reason about, and therefore visualize and troubleshoot
> > > - it prevents some cases where dependencies are never met
> > > - it prevents the explosion of artifact/metadata (one serialization per
> > > dagrun as opposed to one per scheduler cycle) in the case of a dynamic
> > DAG
> > > whose fingerprint is never the same.
> >
> >
> > In this AIP, we were only looking to fix the current "Viewing behaviour"
> > and
> > we were intentionally not changing the execution behaviour.
> > The change you are suggesting means we need to introduce DAG Versioning for
> > the
> > workers too. This will need more work as can't use the Serialised
> > Representation
> > to run the task since users could use custom modules in a different part of
> > code,
> > example the PythonOperator has python_callable that allows running any
> > arbitrary code.
> > A similar case is with the *on_*_callbacks* defined on DAG.
> >
> > Based on the current scope of the AIP, we still plan to use the actual DAG
> > files for the
> > execution and not use Serialized DAGs for the workers.
> >
> > To account for all the custom modules we will have to start looking at
> > pickle (cloudpickle).
> >
> > I'm certain that there are lots of
> > > those DAGs out there, and that it will overwhelm the metadata database,
> > and
> > > confuse the users. For an hourly DAG is would mean 24 artifact per day
> > > instead of 1000+
> >
> >
> > What kind of dynamic DAGs are we talking about here, I would think the DAG
> > signature won't change
> > but I might be wrong, can you give an example, please.
> >
> > If backwards compatibility in behavior is a concern, I'd recommend adding a
> > > flag to the DAG class and/or config and make sure we're doing the right
> > > thing by default. People who want backward compatibility would have to
> > > change that default. But again, that's a lot of extra and confusing
> > > complexity that will likely be the source of bugs and user confusion.
> > > Having a clear, easy to reason about execution model is super important.
> >
> > Think about visualizing a DAG that shapeshifted 5 times during its
> > > execution, how does anyone make sense of that?
> >
> >
> > Wouldn't that be an edge case? How often would someone change the DAG
> > structure in the middle of
> > a DAG execution. And since if they do change, the Graph View should show
> > all the tasks that were
> > run, if it just shows based on the latest version, the behaviour would be
> > the same as now.
> >
> > --------
> >
> > Strongly agree with Max's points, also I feel the right way to go about
> > > this is instead of Airflow schedulers/webservers/workers reading DAG
> > Python
> > > files, they would instead read from serialized representations of the
> > DAGs
> > > (e.g. json representation in the Airflow DB). Instead of DAG owners
> > pushing
> > > their DAG files to the Airflow components via varying mechanisms (e.g.
> > > git), they would instead call an Airflow CLI to push the serialized DAG
> > > representations to the DB, and for things like dynamic DAGs you could
> > > populate them from a DAG or another service.
> >
> >
> > Airflow Webserver and the Scheduler will definitely read from the
> > Serialized representation as
> > they don't need all the code from the DAG files.
> >
> > While the workers definitely need access to DAG files as the
> > tasks/operators would be using
> > code form custom modules and classes which are required to run the tasks.
> >
> > If we do want to go down that route we will have to use something like
> > cloudpickle that serializes
> > entire DAG file and their dependencies. And also ensure that someone is not
> > able to change the pickled
> > source when sending from executor to the worker as that poses a big
> > security risk.
> >
> > - Kaxil
> >
> > On Wed, Jul 29, 2020 at 12:43 PM Jacob Ward <jw...@brandwatch.com> wrote:
> >
> > > I came here to say what Max has said, only less eloquently.
> > >
> > > I do have one concern with locking the version for a single run.
> > Currently
> > > it is possible for a user to create a dag which intentionally changes as
> > a
> > > dag executes, i.e. dynamically creating a task for the dag during a run
> > by
> > > modifying external data, but this change would prevent that. I'm of the
> > > opinion that this situation is bad practice anyway so it doesn't matter
> > if
> > > we make it impossible to do, but others may disagree.
> > >
> > > On Tue, 28 Jul 2020 at 17:08, Dan Davydov <dd...@twitter.com.invalid>
> > > wrote:
> > >
> > > > Strongly agree with Max's points, also I feel the right way to go about
> > > > this is instead of Airflow schedulers/webservers/workers reading DAG
> > > Python
> > > > files, they would instead read from serialized representations of the
> > > DAGs
> > > > (e.g. json representation in the Airflow DB). Instead of DAG owners
> > > pushing
> > > > their DAG files to the Airflow components via varying mechanisms (e.g.
> > > > git), they would instead call an Airflow CLI to push the serialized DAG
> > > > representations to the DB, and for things like dynamic DAGs you could
> > > > populate them from a DAG or another service.
> > > >
> > > > This would also enable other features like stronger
> > > security/multi-tenancy.
> > > >
> > > > On Tue, Jul 28, 2020 at 6:44 PM Maxime Beauchemin <
> > > > maximebeauchemin@gmail.com> wrote:
> > > >
> > > > > > "mixed version"
> > > > >
> > > > > Personally I vote for a DAG version to be pinned and consistent for
> > the
> > > > > duration of the DAG run. Some of the reasons why:
> > > > > - it's easier to reason about, and therefore visualize and
> > troubleshoot
> > > > > - it prevents some cases where dependencies are never met
> > > > > - it prevents the explosion of artifact/metadata (one serialization
> > per
> > > > > dagrun as opposed to one per scheduler cycle) in the case of a
> > dynamic
> > > > DAG
> > > > > whose fingerprint is never the same. I'm certain that there are lots
> > of
> > > > > those DAGs out there, and that it will overwhelm the metadata
> > database,
> > > > and
> > > > > confuse the users. For an hourly DAG is would mean 24 artifact per
> > day
> > > > > instead of 1000+
> > > > >
> > > > > If backwards compatibility in behavior is a concern, I'd recommend
> > > > adding a
> > > > > flag to the DAG class and/or config and make sure we're doing the
> > right
> > > > > thing by default. People who want backward compatibility would have
> > to
> > > > > change that default. But again, that's a lot of extra and confusing
> > > > > complexity that will likely be the source of bugs and user confusion.
> > > > > Having a clear, easy to reason about execution model is super
> > > important.
> > > > >
> > > > > Think about visualizing a DAG that shapeshifted 5 times during its
> > > > > execution, how does anyone make sense of that?
> > > > >
> > > > > Max
> > > > >
> > > > > On Tue, Jul 28, 2020 at 3:14 AM Kaxil Naik <ka...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Thanks Max for your comments.
> > > > > >
> > > > > >
> > > > > > *DAG Fingerprinting: *this can be tricky, especially in regards to
> > > > > dynamic
> > > > > > > DAGs, where in some cases each parsing of the DAG can result in a
> > > > > > different
> > > > > > > fingerprint. I think DAG and tasks attributes are left out from
> > the
> > > > > > > proposal that should be considered as part of the fingerprint,
> > like
> > > > > > trigger
> > > > > > > rules or task start/end datetime. We should do a full pass of all
> > > DAG
> > > > > > > arguments and make sure we're not forgetting anything that can
> > > change
> > > > > > > scheduling logic. Also, let's be careful that something as simple
> > > as
> > > > a
> > > > > > > dynamic start or end date on a task could lead to a different
> > > version
> > > > > > each
> > > > > > > time you parse.
> > > > > >
> > > > > >
> > > > > >
> > > > > > The short version of Dag Fingerprinting would be
> > > > > > just a hash of the Serialized DAG.
> > > > > >
> > > > > > *Example DAG*: https://imgur.com/TVuoN3p
> > > > > > *Example Serialized DAG*: https://imgur.com/LmA2Bpr
> > > > > >
> > > > > > It contains all the task & DAG parameters. When they change,
> > > Scheduler
> > > > > > writes
> > > > > > a new version of Serialized DAGs to the DB. The Webserver then
> > reads
> > > > the
> > > > > > DAGs from the DB.
> > > > > >
> > > > > > I'd recommend limiting serialization/storage of one version
> > > > > > > per DAG Run, as opposed to potentially everytime the DAG is
> > parsed
> > > -
> > > > > once
> > > > > > > the version for a DAG run is pinned, fingerprinting is not
> > > > re-evaluated
> > > > > > > until the next DAG run is ready to get created.
> > > > > >
> > > > > >
> > > > > > This is to handle Scenario 3 where a DAG structure is changed
> > > mid-way.
> > > > > > Since we don't intend to
> > > > > > change the execution behaviour, if we limit Storage of 1 version
> > per
> > > > DAG,
> > > > > > it won't actually show what
> > > > > > was run.
> > > > > >
> > > > > > Example Dag v1: Task A -> Task B -> Task C
> > > > > > The worker has completed the execution of Task B and is just about
> > to
> > > > > > complete the execution of Task B.
> > > > > >
> > > > > > The 2nd version of DAG is deployed: Task A -> Task D
> > > > > > Now Scheduler queued Task D and it will run to completion. (Task C
> > > > won't
> > > > > > run)
> > > > > >
> > > > > > In this case, "the actual representation of the DAG" that run is
> > > > neither
> > > > > v1
> > > > > > nor v2 but a "mixed version"
> > > > > >  (Task A -> Task B -> Task D). The plan is that the Scheduler will
> > > > create
> > > > > > this "mixed version" based on what ran
> > > > > > and the Graph View would show this "mixed version".
> > > > > >
> > > > > > There would also be a toggle button on the Graph View to select v1
> > or
> > > > v2
> > > > > > where the tasks will be highlighted to show
> > > > > > that a particular task was in v1 or v2 as shown in
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/download/attachments/158868919/Picture%201.png?version=2&modificationDate=1595612863000&api=v2
> > > > > >
> > > > > >
> > > > > >
> > > > > > *Visualizing change in the tree view:* I think this is very complex
> > > and
> > > > > > > many things can make this view impossible to render (task
> > > dependency
> > > > > > > reversal, cycles across versions, ...). Maybe a better visual
> > > > approach
> > > > > > > would be to render independent, individual tree views for each
> > DAG
> > > > > > version
> > > > > > > (side by side), and doing best effort aligning the tasks across
> > > > blocks
> > > > > > and
> > > > > > > "linking" tasks with lines across blocks when necessary.
> > > > > >
> > > > > >
> > > > > > Agreed, the plan is to do the best effort aligning.
> > > > > > At this point in time, task additions to the end of the DAG are
> > > > expected
> > > > > to
> > > > > > be compatible,
> > > > > > but changes to task structure within the DAG may cause the tree
> > view
> > > > not
> > > > > to
> > > > > > incorporate “old” and “new” in the same view, hence that won't be
> > > > shown.
> > > > > >
> > > > > > Regards,
> > > > > > Kaxil
> > > > > >
> > > > > > On Mon, Jul 27, 2020 at 6:02 PM Maxime Beauchemin <
> > > > > > maximebeauchemin@gmail.com> wrote:
> > > > > >
> > > > > > > Some notes and ideas:
> > > > > > >
> > > > > > > *DAG Fingerprinting: *this can be tricky, especially in regards
> > to
> > > > > > dynamic
> > > > > > > DAGs, where in some cases each parsing of the DAG can result in a
> > > > > > different
> > > > > > > fingerprint. I think DAG and tasks attributes are left out from
> > the
> > > > > > > proposal that should be considered as part of the fingerprint,
> > like
> > > > > > trigger
> > > > > > > rules or task start/end datetime. We should do a full pass of all
> > > DAG
> > > > > > > arguments and make sure we're not forgetting anything that can
> > > change
> > > > > > > scheduling logic. Also, let's be careful that something as simple
> > > as
> > > > a
> > > > > > > dynamic start or end date on a task could lead to a different
> > > version
> > > > > > each
> > > > > > > time you parse. I'd recommend limiting serialization/storage of
> > one
> > > > > > version
> > > > > > > per DAG Run, as opposed to potentially everytime the DAG is
> > parsed
> > > -
> > > > > once
> > > > > > > the version for a DAG run is pinned, fingerprinting is not
> > > > re-evaluated
> > > > > > > until the next DAG run is ready to get created.
> > > > > > >
> > > > > > > *Visualizing change in the tree view:* I think this is very
> > complex
> > > > and
> > > > > > > many things can make this view impossible to render (task
> > > dependency
> > > > > > > reversal, cycles across versions, ...). Maybe a better visual
> > > > approach
> > > > > > > would be to render independent, individual tree views for each
> > DAG
> > > > > > version
> > > > > > > (side by side), and doing best effort aligning the tasks across
> > > > blocks
> > > > > > and
> > > > > > > "linking" tasks with lines across blocks when necessary.
> > > > > > >
> > > > > > > On Fri, Jul 24, 2020 at 12:46 PM Vikram Koka <
> > vikram@astronomer.io
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Team,
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > We just created 'AIP-36 DAG Versioning' on Confluence and would
> > > > very
> > > > > > much
> > > > > > > > appreciate feedback and suggestions from the community.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-36+DAG+Versioning
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > The DAG Versioning concept has been discussed on multiple
> > > occasions
> > > > > in
> > > > > > > the
> > > > > > > > past and has been a topic highlighted as part of Airflow 2.0 as
> > > > well.
> > > > > > We
> > > > > > > at
> > > > > > > > Astronomer have heard data engineers at several enterprises ask
> > > > about
> > > > > > > this
> > > > > > > > feature as well, for easier debugging when changes are made to
> > > DAGs
> > > > > as
> > > > > > a
> > > > > > > > result of evolving business needs.
> > > > > > > >
> > > > > > > >
> > > > > > > > As described in the AIP, we have a proposal focused on ensuring
> > > > that
> > > > > > the
> > > > > > > > visibility behaviour of Airflow is correct, without changing
> > the
> > > > > > > execution
> > > > > > > > behaviour. We considered changing the execution behaviour as
> > > well,
> > > > > but
> > > > > > > > decided that the risks in changing execution behavior were too
> > > high
> > > > > as
> > > > > > > > compared to the benefits and therefore decided to limit the
> > scope
> > > > to
> > > > > > only
> > > > > > > > making sure that the visibility was correct.
> > > > > > > >
> > > > > > > >
> > > > > > > > We would like to attempt this based on our experience running
> > > > Airflow
> > > > > > as
> > > > > > > a
> > > > > > > > service. We believe that this benefits Airflow as a project and
> > > the
> > > > > > > > development experience of data engineers using Airflow across
> > the
> > > > > > world.
> > > > > > > >
> > > > > > > >
> > > > > > > >  Any feedback, suggestions, and comments would be greatly
> > > > > appreciated.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Best Regards,
> > > > > > > >
> > > > > > > >
> > > > > > > > Kaxil Naik, Ryan Hamilton, Ash Berlin-Taylor, and Vikram Koka
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Jacob Ward    |    Graduate Data Infrastructure Engineer
> > >
> > > jward@brandwatch.com
> > >
> > >
> > > NEW YORK   | BOSTON   | BRIGHTON   | LONDON   | BERLIN |   STUTTGART |
> > > PARIS   | SINGAPORE | SYDNEY
> > >
> >



--

Jarek Potiuk
Polidea | Principal Software Engineer

M: +48 660 796 129

Re: [PROPOSAL][AIP-36 DAG Versioning]

Posted by Jacob Ferriero <jf...@google.com.INVALID>.
I like the idea of wheels.

> There were concerns about the size of the code to keep in the DB
I think this may still be a valid concern and json serialization may not be
comparable in size to compressed wheel if "additional packages" are many or
large.
If DAG Version = Wheel we could consider having a package index internal to
airflow with a package per DAG id.
The DB could just reference a version of the package and the worker pulls
that version.

It seems to me that there is danger of storing in the DB which every
airflow process running user code has direct access to wreak havoc on.
DAG Version as packages feels like an opportunity to have a separate
storage layer where we enforce immutability of Dag Versions (append / read
only access).

This may be important for those interested in using the Airflow Versions as
an audit trail of "what [user] code got run on my airflow".
Imagine corner case of a bad actor who writes a one-off DAG to bootstrap
the airflow cluster's credentials and then deletes itself entirely from the
DB at the end of execution or replaces another DAG's wheel.
With storing DAGs in package index we'd have some evidence of this bad
acting DAG stored in the package index and avoid one DAG mutating another
DAGs DAG Version wheel.

> Extra bonus 1: we can expand it to package different dependencies in
> the wheels as well - so that if an operator requires a different
> (newer) version of a python library, it could be packaged together
> with the DAG in the same .whl file. This is also a highly requested
feature.Huge +1 to Extra bonus 1 This is very common pain point of Apache
Beam Python SDK users with DataflowPythonOperator.

On Sun, Aug 2, 2020 at 12:48 AM Jarek Potiuk <Ja...@polidea.com>
wrote:

> Few points from my sid (and proposal!):
>
> 1) Agree with Max -  with a rather strong NO for pickles (however,
> indeed cloudpickle solves some of the problems). Pickles came up in
> our discussion in Polidea recently and the overall message was "no". I
> agree with Max here - if we can ship python code, turning that into
> pickle for transit makes little sense to me and brings a plethora of
> problems.
>
> 2) I think indeed the versioning solution should treat the "DagRun"
> structure atomically. While I see why we would like to go with the
> UI/Scheduler only first rather than implementing them in the workers,
> adding the "mixed version" is where it breaks down IMHO. Reasoning
> about such "mixed version" dag is next to impossible. The current
> behavior is not well defined and non-deterministic (depends on
> scheduler delays, syncing, type of deployment, restarts of the works
> etc.) we are moving it up to UI (thus users) rather than solving the
> problem. So I am not a big fan of this and would rather solve it
> "well" with atomicity.
>
> 3) I see the point of Dan as well - we had many discussions and many
> times the idea about "submitting" the DAG for execution via the API
> came up - and it makes sense IMHO.
>
> Proposal: Implement full versioning with code shipping via DB wheels
> BLOB (akin to serialized DAGs).
>
> I understand that the big issue is how to actually "ship" the code to
> the worker. And - maybe a wild idea - we can kill several birds with
> the same stone.
>
> There were plenty of discussions on how we could do that but one was
> never truly explored - using wheel packages.
>
> For those who do not know them, there is the PEP:
> https://www.python.org/dev/peps/pep-0427/
>
> Wheels allow to "package" python code in a standard way. They are
> portable ("purelib" + contain .py rather than .pyc code), they have
> metadata, versioning information, they can be signed for security,
> They can contain other packages or python code, Why don't we let
> scheduler to pack the fingerprinted version of the DAG in a .whl and
> store it as a blob in a DB next to the serialized form?
>
> There were concerns about the size of the code to keep in the DB - but
> we already use the DB for serialized DAGs and it works fine (I believe
> we only need to add compressing of the JSon serialized form - as we've
> learned from AirBnb during their talk at the Airflow Summit - wheels
> are already compressed). Also - each task will only need the
> particular "version" of one DAG so even if we keep many of them in the
> DB, the old version will pretty soon go "cold" and will never be
> retrieved (and most DBs will handle it well with caching/indexes).
>
> And if we want to add "callables" from other files - there is nothing
> to stop the person who defines dag to add list of files that should be
> packaged together with the main DAG file (additional_python_files =
> ["common/my_fantastic_library.py"] in DAG constructor). Or we could
> auto-add all files after the DAG gets imported (i.e. package
> automatically all files that are imported for that particular DAG from
> the "dags" folder"). That should be rather easy.
>
> This way we could ship the code to workers for the exact version that
> the DagRun uses. And they can be cached and unpacked/installed to a
> virtualenv for the execution of that single task. That should be super
> quick. Such virtualenv can be wiped out after execution.
>
> Then we got what Max wants (atomicity of DagRuns) and what Dan wants
> (the API could simply accept "Wheel file + the Dag id". We have the
> isolation between tasks running on the same worker (based on
> virtualenv) so that each process in the same worker can run a
> different version of the same Dag. We have much less confusion for the
> UI.
>
> Extra bonus 1: we can expand it to package different dependencies in
> the wheels as well - so that if an operator requires a different
> (newer) version of a python library, it could be packaged together
> with the DAG in the same .whl file. This is also a highly requested
> feature.
> Extra bonus 2: workers will stop depending on the DAG file mount (!)
> which was our long term goal and indeed as Dan mentioned - a great
> step towards multi-tenancy.
>
> J.
>
>
>
>
>
>
> On Fri, Jul 31, 2020 at 6:41 AM Maxime Beauchemin
> <ma...@gmail.com> wrote:
> >
> > Having tried it early on, I'd advocate pretty strongly against pickles
> and
> > would rather not get too deep into the why here. Short story is they can
> > pull the entire memory space or much more than you want, and it's
> > impossible to reason about where they end. For that reason and other
> > reasons, they're a security issue. Oh and some objects are not picklable
> > (Jinja templates! to name a problematic one...). I've also seen
> > secret-related classes that raise when pickled (thank god!).
> >
> > About callback and other things like that, it's quite a puzzle in python.
> > One solution would be to point to a python namespace
> > callback="preset.airflow_utils.slack_callback" and assume the function
> has
> > to exist in the remote interpreter. Personally I like the DagFetcher idea
> > (it could be great to get a pointer to that mailing list thread here),
> > specifically the GitDagFetcher. I don't know how [un]reasonable it is,
> but
> > I hate pickles so much that shipping source code around seems much more
> > reasonable to me. I think out there there's a talk from Mike Star about
> > Dataswarm at FB and he may mention how their workers may git shallow
> clone
> > the pipeline repo. Or maybe they use that "beautifully ugly" hack to use
> > a gitfs fuse [file system in user space] on the worker [could get deeper
> > into that, not sure how reasonable that is either].
> >
> > About fingerprints, a simple `start_date = datetime.now() - timedelta(1)`
> > may lead to a never-repeating fingerprint. From memory the spec doesn't
> > list out the properties considered to build the hash. It be helpful to
> > specify and review that list.
> >
> > Max
> >
> > On Wed, Jul 29, 2020 at 5:20 AM Kaxil Naik <ka...@gmail.com> wrote:
> >
> > > Thanks, both Max and Dan for your comments, please check my reply
> below:
> > >
> > >
> > > >  Personally I vote for a DAG version to be pinned and consistent for
> the
> > > > duration of the DAG run. Some of the reasons why:
> > > > - it's easier to reason about, and therefore visualize and
> troubleshoot
> > > > - it prevents some cases where dependencies are never met
> > > > - it prevents the explosion of artifact/metadata (one serialization
> per
> > > > dagrun as opposed to one per scheduler cycle) in the case of a
> dynamic
> > > DAG
> > > > whose fingerprint is never the same.
> > >
> > >
> > > In this AIP, we were only looking to fix the current "Viewing
> behaviour"
> > > and
> > > we were intentionally not changing the execution behaviour.
> > > The change you are suggesting means we need to introduce DAG
> Versioning for
> > > the
> > > workers too. This will need more work as can't use the Serialised
> > > Representation
> > > to run the task since users could use custom modules in a different
> part of
> > > code,
> > > example the PythonOperator has python_callable that allows running any
> > > arbitrary code.
> > > A similar case is with the *on_*_callbacks* defined on DAG.
> > >
> > > Based on the current scope of the AIP, we still plan to use the actual
> DAG
> > > files for the
> > > execution and not use Serialized DAGs for the workers.
> > >
> > > To account for all the custom modules we will have to start looking at
> > > pickle (cloudpickle).
> > >
> > > I'm certain that there are lots of
> > > > those DAGs out there, and that it will overwhelm the metadata
> database,
> > > and
> > > > confuse the users. For an hourly DAG is would mean 24 artifact per
> day
> > > > instead of 1000+
> > >
> > >
> > > What kind of dynamic DAGs are we talking about here, I would think the
> DAG
> > > signature won't change
> > > but I might be wrong, can you give an example, please.
> > >
> > > If backwards compatibility in behavior is a concern, I'd recommend
> adding a
> > > > flag to the DAG class and/or config and make sure we're doing the
> right
> > > > thing by default. People who want backward compatibility would have
> to
> > > > change that default. But again, that's a lot of extra and confusing
> > > > complexity that will likely be the source of bugs and user confusion.
> > > > Having a clear, easy to reason about execution model is super
> important.
> > >
> > > Think about visualizing a DAG that shapeshifted 5 times during its
> > > > execution, how does anyone make sense of that?
> > >
> > >
> > > Wouldn't that be an edge case? How often would someone change the DAG
> > > structure in the middle of
> > > a DAG execution. And since if they do change, the Graph View should
> show
> > > all the tasks that were
> > > run, if it just shows based on the latest version, the behaviour would
> be
> > > the same as now.
> > >
> > > --------
> > >
> > > Strongly agree with Max's points, also I feel the right way to go about
> > > > this is instead of Airflow schedulers/webservers/workers reading DAG
> > > Python
> > > > files, they would instead read from serialized representations of the
> > > DAGs
> > > > (e.g. json representation in the Airflow DB). Instead of DAG owners
> > > pushing
> > > > their DAG files to the Airflow components via varying mechanisms
> (e.g.
> > > > git), they would instead call an Airflow CLI to push the serialized
> DAG
> > > > representations to the DB, and for things like dynamic DAGs you could
> > > > populate them from a DAG or another service.
> > >
> > >
> > > Airflow Webserver and the Scheduler will definitely read from the
> > > Serialized representation as
> > > they don't need all the code from the DAG files.
> > >
> > > While the workers definitely need access to DAG files as the
> > > tasks/operators would be using
> > > code form custom modules and classes which are required to run the
> tasks.
> > >
> > > If we do want to go down that route we will have to use something like
> > > cloudpickle that serializes
> > > entire DAG file and their dependencies. And also ensure that someone
> is not
> > > able to change the pickled
> > > source when sending from executor to the worker as that poses a big
> > > security risk.
> > >
> > > - Kaxil
> > >
> > > On Wed, Jul 29, 2020 at 12:43 PM Jacob Ward <jw...@brandwatch.com>
> wrote:
> > >
> > > > I came here to say what Max has said, only less eloquently.
> > > >
> > > > I do have one concern with locking the version for a single run.
> > > Currently
> > > > it is possible for a user to create a dag which intentionally
> changes as
> > > a
> > > > dag executes, i.e. dynamically creating a task for the dag during a
> run
> > > by
> > > > modifying external data, but this change would prevent that. I'm of
> the
> > > > opinion that this situation is bad practice anyway so it doesn't
> matter
> > > if
> > > > we make it impossible to do, but others may disagree.
> > > >
> > > > On Tue, 28 Jul 2020 at 17:08, Dan Davydov
> <dd...@twitter.com.invalid>
> > > > wrote:
> > > >
> > > > > Strongly agree with Max's points, also I feel the right way to go
> about
> > > > > this is instead of Airflow schedulers/webservers/workers reading
> DAG
> > > > Python
> > > > > files, they would instead read from serialized representations of
> the
> > > > DAGs
> > > > > (e.g. json representation in the Airflow DB). Instead of DAG owners
> > > > pushing
> > > > > their DAG files to the Airflow components via varying mechanisms
> (e.g.
> > > > > git), they would instead call an Airflow CLI to push the
> serialized DAG
> > > > > representations to the DB, and for things like dynamic DAGs you
> could
> > > > > populate them from a DAG or another service.
> > > > >
> > > > > This would also enable other features like stronger
> > > > security/multi-tenancy.
> > > > >
> > > > > On Tue, Jul 28, 2020 at 6:44 PM Maxime Beauchemin <
> > > > > maximebeauchemin@gmail.com> wrote:
> > > > >
> > > > > > > "mixed version"
> > > > > >
> > > > > > Personally I vote for a DAG version to be pinned and consistent
> for
> > > the
> > > > > > duration of the DAG run. Some of the reasons why:
> > > > > > - it's easier to reason about, and therefore visualize and
> > > troubleshoot
> > > > > > - it prevents some cases where dependencies are never met
> > > > > > - it prevents the explosion of artifact/metadata (one
> serialization
> > > per
> > > > > > dagrun as opposed to one per scheduler cycle) in the case of a
> > > dynamic
> > > > > DAG
> > > > > > whose fingerprint is never the same. I'm certain that there are
> lots
> > > of
> > > > > > those DAGs out there, and that it will overwhelm the metadata
> > > database,
> > > > > and
> > > > > > confuse the users. For an hourly DAG is would mean 24 artifact
> per
> > > day
> > > > > > instead of 1000+
> > > > > >
> > > > > > If backwards compatibility in behavior is a concern, I'd
> recommend
> > > > > adding a
> > > > > > flag to the DAG class and/or config and make sure we're doing the
> > > right
> > > > > > thing by default. People who want backward compatibility would
> have
> > > to
> > > > > > change that default. But again, that's a lot of extra and
> confusing
> > > > > > complexity that will likely be the source of bugs and user
> confusion.
> > > > > > Having a clear, easy to reason about execution model is super
> > > > important.
> > > > > >
> > > > > > Think about visualizing a DAG that shapeshifted 5 times during
> its
> > > > > > execution, how does anyone make sense of that?
> > > > > >
> > > > > > Max
> > > > > >
> > > > > > On Tue, Jul 28, 2020 at 3:14 AM Kaxil Naik <ka...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Thanks Max for your comments.
> > > > > > >
> > > > > > >
> > > > > > > *DAG Fingerprinting: *this can be tricky, especially in
> regards to
> > > > > > dynamic
> > > > > > > > DAGs, where in some cases each parsing of the DAG can result
> in a
> > > > > > > different
> > > > > > > > fingerprint. I think DAG and tasks attributes are left out
> from
> > > the
> > > > > > > > proposal that should be considered as part of the
> fingerprint,
> > > like
> > > > > > > trigger
> > > > > > > > rules or task start/end datetime. We should do a full pass
> of all
> > > > DAG
> > > > > > > > arguments and make sure we're not forgetting anything that
> can
> > > > change
> > > > > > > > scheduling logic. Also, let's be careful that something as
> simple
> > > > as
> > > > > a
> > > > > > > > dynamic start or end date on a task could lead to a different
> > > > version
> > > > > > > each
> > > > > > > > time you parse.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > The short version of Dag Fingerprinting would be
> > > > > > > just a hash of the Serialized DAG.
> > > > > > >
> > > > > > > *Example DAG*: https://imgur.com/TVuoN3p
> > > > > > > *Example Serialized DAG*: https://imgur.com/LmA2Bpr
> > > > > > >
> > > > > > > It contains all the task & DAG parameters. When they change,
> > > > Scheduler
> > > > > > > writes
> > > > > > > a new version of Serialized DAGs to the DB. The Webserver then
> > > reads
> > > > > the
> > > > > > > DAGs from the DB.
> > > > > > >
> > > > > > > I'd recommend limiting serialization/storage of one version
> > > > > > > > per DAG Run, as opposed to potentially everytime the DAG is
> > > parsed
> > > > -
> > > > > > once
> > > > > > > > the version for a DAG run is pinned, fingerprinting is not
> > > > > re-evaluated
> > > > > > > > until the next DAG run is ready to get created.
> > > > > > >
> > > > > > >
> > > > > > > This is to handle Scenario 3 where a DAG structure is changed
> > > > mid-way.
> > > > > > > Since we don't intend to
> > > > > > > change the execution behaviour, if we limit Storage of 1
> version
> > > per
> > > > > DAG,
> > > > > > > it won't actually show what
> > > > > > > was run.
> > > > > > >
> > > > > > > Example Dag v1: Task A -> Task B -> Task C
> > > > > > > The worker has completed the execution of Task B and is just
> about
> > > to
> > > > > > > complete the execution of Task B.
> > > > > > >
> > > > > > > The 2nd version of DAG is deployed: Task A -> Task D
> > > > > > > Now Scheduler queued Task D and it will run to completion.
> (Task C
> > > > > won't
> > > > > > > run)
> > > > > > >
> > > > > > > In this case, "the actual representation of the DAG" that run
> is
> > > > > neither
> > > > > > v1
> > > > > > > nor v2 but a "mixed version"
> > > > > > >  (Task A -> Task B -> Task D). The plan is that the Scheduler
> will
> > > > > create
> > > > > > > this "mixed version" based on what ran
> > > > > > > and the Graph View would show this "mixed version".
> > > > > > >
> > > > > > > There would also be a toggle button on the Graph View to
> select v1
> > > or
> > > > > v2
> > > > > > > where the tasks will be highlighted to show
> > > > > > > that a particular task was in v1 or v2 as shown in
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/download/attachments/158868919/Picture%201.png?version=2&modificationDate=1595612863000&api=v2
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > *Visualizing change in the tree view:* I think this is very
> complex
> > > > and
> > > > > > > > many things can make this view impossible to render (task
> > > > dependency
> > > > > > > > reversal, cycles across versions, ...). Maybe a better visual
> > > > > approach
> > > > > > > > would be to render independent, individual tree views for
> each
> > > DAG
> > > > > > > version
> > > > > > > > (side by side), and doing best effort aligning the tasks
> across
> > > > > blocks
> > > > > > > and
> > > > > > > > "linking" tasks with lines across blocks when necessary.
> > > > > > >
> > > > > > >
> > > > > > > Agreed, the plan is to do the best effort aligning.
> > > > > > > At this point in time, task additions to the end of the DAG are
> > > > > expected
> > > > > > to
> > > > > > > be compatible,
> > > > > > > but changes to task structure within the DAG may cause the tree
> > > view
> > > > > not
> > > > > > to
> > > > > > > incorporate “old” and “new” in the same view, hence that won't
> be
> > > > > shown.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Kaxil
> > > > > > >
> > > > > > > On Mon, Jul 27, 2020 at 6:02 PM Maxime Beauchemin <
> > > > > > > maximebeauchemin@gmail.com> wrote:
> > > > > > >
> > > > > > > > Some notes and ideas:
> > > > > > > >
> > > > > > > > *DAG Fingerprinting: *this can be tricky, especially in
> regards
> > > to
> > > > > > > dynamic
> > > > > > > > DAGs, where in some cases each parsing of the DAG can result
> in a
> > > > > > > different
> > > > > > > > fingerprint. I think DAG and tasks attributes are left out
> from
> > > the
> > > > > > > > proposal that should be considered as part of the
> fingerprint,
> > > like
> > > > > > > trigger
> > > > > > > > rules or task start/end datetime. We should do a full pass
> of all
> > > > DAG
> > > > > > > > arguments and make sure we're not forgetting anything that
> can
> > > > change
> > > > > > > > scheduling logic. Also, let's be careful that something as
> simple
> > > > as
> > > > > a
> > > > > > > > dynamic start or end date on a task could lead to a different
> > > > version
> > > > > > > each
> > > > > > > > time you parse. I'd recommend limiting serialization/storage
> of
> > > one
> > > > > > > version
> > > > > > > > per DAG Run, as opposed to potentially everytime the DAG is
> > > parsed
> > > > -
> > > > > > once
> > > > > > > > the version for a DAG run is pinned, fingerprinting is not
> > > > > re-evaluated
> > > > > > > > until the next DAG run is ready to get created.
> > > > > > > >
> > > > > > > > *Visualizing change in the tree view:* I think this is very
> > > complex
> > > > > and
> > > > > > > > many things can make this view impossible to render (task
> > > > dependency
> > > > > > > > reversal, cycles across versions, ...). Maybe a better visual
> > > > > approach
> > > > > > > > would be to render independent, individual tree views for
> each
> > > DAG
> > > > > > > version
> > > > > > > > (side by side), and doing best effort aligning the tasks
> across
> > > > > blocks
> > > > > > > and
> > > > > > > > "linking" tasks with lines across blocks when necessary.
> > > > > > > >
> > > > > > > > On Fri, Jul 24, 2020 at 12:46 PM Vikram Koka <
> > > vikram@astronomer.io
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Team,
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > We just created 'AIP-36 DAG Versioning' on Confluence and
> would
> > > > > very
> > > > > > > much
> > > > > > > > > appreciate feedback and suggestions from the community.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-36+DAG+Versioning
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > The DAG Versioning concept has been discussed on multiple
> > > > occasions
> > > > > > in
> > > > > > > > the
> > > > > > > > > past and has been a topic highlighted as part of Airflow
> 2.0 as
> > > > > well.
> > > > > > > We
> > > > > > > > at
> > > > > > > > > Astronomer have heard data engineers at several
> enterprises ask
> > > > > about
> > > > > > > > this
> > > > > > > > > feature as well, for easier debugging when changes are
> made to
> > > > DAGs
> > > > > > as
> > > > > > > a
> > > > > > > > > result of evolving business needs.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > As described in the AIP, we have a proposal focused on
> ensuring
> > > > > that
> > > > > > > the
> > > > > > > > > visibility behaviour of Airflow is correct, without
> changing
> > > the
> > > > > > > > execution
> > > > > > > > > behaviour. We considered changing the execution behaviour
> as
> > > > well,
> > > > > > but
> > > > > > > > > decided that the risks in changing execution behavior were
> too
> > > > high
> > > > > > as
> > > > > > > > > compared to the benefits and therefore decided to limit the
> > > scope
> > > > > to
> > > > > > > only
> > > > > > > > > making sure that the visibility was correct.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > We would like to attempt this based on our experience
> running
> > > > > Airflow
> > > > > > > as
> > > > > > > > a
> > > > > > > > > service. We believe that this benefits Airflow as a
> project and
> > > > the
> > > > > > > > > development experience of data engineers using Airflow
> across
> > > the
> > > > > > > world.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >  Any feedback, suggestions, and comments would be greatly
> > > > > > appreciated.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Best Regards,
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Kaxil Naik, Ryan Hamilton, Ash Berlin-Taylor, and Vikram
> Koka
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Jacob Ward    |    Graduate Data Infrastructure Engineer
> > > >
> > > > jward@brandwatch.com
> > > >
> > > >
> > > > NEW YORK   | BOSTON   | BRIGHTON   | LONDON   | BERLIN |   STUTTGART
> |
> > > > PARIS   | SINGAPORE | SYDNEY
> > > >
> > >
>
>
>
> --
>
> Jarek Potiuk
> Polidea | Principal Software Engineer
>
> M: +48 660 796 129 <+48%20660%20796%20129>
>


-- 

*Jacob Ferriero*

Strategic Cloud Engineer: Data Engineering

jferriero@google.com

617-714-2509

Re: [PROPOSAL][AIP-36 DAG Versioning]

Posted by Jarek Potiuk <ja...@potiuk.com>.
I think Airflow Summit and some 2.3.0 teething had (un) successfully :)
dragged most of the committers from the few AIPs, but I believe there will
shortly be a real "reinvigorating" of some work there (speaking for myself
though :)).

On Fri, May 27, 2022 at 3:28 AM Max Payton <mp...@lyft.com.invalid> wrote:

> Hey, I was wondering if the resurrected AIP was ever published? This is
> something that we (Lyft) are very interested in, and would like to
> contribute to as well.
> *Max Payton*
> He/Him/His
> Software Engineer
> 202.441.7757 <+12024417757>
> [image: Lyft] <http://www.lyft.com/>
>
>
> On Tue, Feb 15, 2022 at 4:23 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> Woohoo! Looking forward to it!
>>
>> On Tue, Feb 15, 2022 at 1:11 PM Kaxil Naik <ka...@gmail.com> wrote:
>> >
>> > Hey folks,
>> >
>> > Just reviving this old thread to provide an update that we (Astronomer)
>> will be resurrecting AIP-36 DAG Versioning with a different scope in the
>> coming days that will be more consistent with what has been discussed in
>> this thread.
>> >
>> > Regards,
>> > Kaxil
>> >
>> > On Thu, Aug 13, 2020 at 9:32 PM Jarek Potiuk <Ja...@polidea.com>
>> wrote:
>> >>
>> >> I fully agree with the "user" not having to know any of the "wheel'
>> >> details. Similarly as they do not have to know python interpreter or
>> the
>> >> underlying libc library details. This  all should be hidden from the
>> users.
>> >>
>> >> I think the wheels API that we might have there, does not have to be
>> >> user-facing. We could - rather easily - make a client that points to a
>> DAG
>> >> file and builds appropriate wheel package under-the-hood and submits
>> it. I
>> >> reallly doubt any of the users will directly use the API to submit
>> DAGs -
>> >> they will use some clients built on top of it.
>> >>
>> >> I think we should separate the user side form the implementation -
>> >> similarly as we do not expect the users to know any details on how "DAG
>> >> Fetcher" should work - in any case with the DAG fetcher, we need to
>> define
>> >> how DAG fetcher will make sure about "atomicity" anyway - how to make
>> sure
>> >> that you get a "consistent" version of all the dependent python files
>> when
>> >> you fetch them? This is the part of DAG fetcher that i do not like
>> because
>> >> it assumes that "someone else" maintains the consistency and provides
>> the
>> >> "consistent view" somewhere on the "DAG Server" side (whatever the
>> server
>> >> side is).
>> >>
>> >> There were many ideas about some kind of manifest describing the files
>> etc,
>> >> but I think all of that depends on some kind of ability of providing a
>> >> "snapshot" of files that will be consistent set to execute. With 'DAG
>> >> Fetcher" this is somthing that "DAG Fetching server" has to provide.
>> It's
>> >> super easy if that "server" is GIT - we already use it for GIT sync.
>> But
>> >> it's rather difficult to provide a good abstraction for it for
>> "generic"
>> >> DAG fetcher.
>> >>
>> >> IMHO this is far easier to provide such consistent set at a "submission
>> >> time". In pretty-much all cases, the user submitting the job already
>> has
>> >> consistent set of python files that the DAG uses. This is pretty much
>> >> given. I think the job of the "submission" mechanism is to make a
>> >> "snapshot" out of that consistent set and submit this snapshot, rather
>> than
>> >> individual files. Git provides it out of the box, but if we want to be
>> >> generic - I see no other way than to build such "snapshot" locally. And
>> >> Wheels seems like a very good candidate - if only it's an
>> implementation
>> >> detail and will be hidden from the users.
>> >>
>> >> J.
>> >>
>> >>
>> >>
>> >>
>> >> On Tue, Aug 11, 2020 at 8:33 PM Ash Berlin-Taylor <as...@apache.org>
>> wrote:
>> >>
>> >> > Anything to doing with the process of building wheels should be a
>> "power
>> >> > user" only feature, and should not be required for many users - many
>> many
>> >> > users of airflow are not primarily Python developers, but data
>> scientists,
>> >> > and needing them to understand anything about the python build
>> toolchain is
>> >> > too much of a learning curve for the benefit.
>> >> >
>> >> > After all it is very rare that people hit the multiple concurrent
>> versions
>> >> > of a dag.
>> >> >
>> >> > -ash
>> >> >
>> >> > On 10 August 2020 17:37:32 BST, Tomasz Urbaszek <
>> turbaszek@apache.org>
>> >> > wrote:
>> >> > >I like the idea of wheels as this is probably the "most pythonic"
>> >> > >solution. And "DAG version" is not only defined by DAG code but also
>> >> > >by all dependencies the DAG uses (custom functions, libraries etc)
>> and
>> >> > >it seems that wheels can address that.
>> >> > >
>> >> > >However, I second Ash - keeping wheels in db doesn't sound good. In
>> my
>> >> > >opinion, DAG fetcher is the right solution and the idea surfaces
>> every
>> >> > >time we talk about serialization. This abstraction has a lot of pros
>> >> > >as it allows a lot of customization (wheels, local fs, remote fs,
>> >> > >wheels etc).
>> >> > >
>> >> > >Apart from that, if we decided to use wheels we should provide a CLI
>> >> > >command to ease the process of building them. Also, I'm wondering
>> >> > >about developers' workflow. Moving between code of different DAG
>> >> > >version sounds easy if you use git but... what if someone doesn't
>> use
>> >> > >it?
>> >> > >
>> >> > >Tomek
>> >> > >
>> >> > >
>> >> > >On Sat, Aug 8, 2020 at 9:49 AM Ash Berlin-Taylor <as...@apache.org>
>> >> > >wrote:
>> >> > >>
>> >> > >> Quick comment (as I'm still mostly on paternity leave):
>> >> > >>
>> >> > >> Storing wheels in the db sounds like a bad Idea to me, especially
>> if
>> >> > >we need to store deps in there too (and if we don't store deps, then
>> >> > >they are incomplete) - they could get very large, and I've stored
>> blobs
>> >> > >of ~10mb in postgres before: I don't recommend it. It "works" but
>> >> > >operating it is tricky.
>> >> > >>
>> >> > >>
>> >> > >>
>> >> > >> > the API could simply accept "Wheel file + the Dag id"
>> >> > >>
>> >> > >> This sounds like a huge security risk.
>> >> > >>
>> >> > >>
>> >> > >> My main concern with this idea is that it seems a lot of
>> complexity
>> >> > >we are putting on users. Doubly so if they are already using docker
>> >> > >where there already exists an Ideal packaging and distribution that
>> >> > >could contain dag + needed code.
>> >> > >>
>> >> > >> (Sorry for the brevity)
>> >> > >>
>> >> > >> -ash
>> >> > >>
>> >> > >>
>> >> > >> On 2 August 2020 08:47:39 BST, Jarek Potiuk
>> >> > ><Ja...@polidea.com> wrote:
>> >> > >> >Few points from my sid (and proposal!):
>> >> > >> >
>> >> > >> >1) Agree with Max -  with a rather strong NO for pickles
>> (however,
>> >> > >> >indeed cloudpickle solves some of the problems). Pickles came up
>> in
>> >> > >> >our discussion in Polidea recently and the overall message was
>> "no".
>> >> > >I
>> >> > >> >agree with Max here - if we can ship python code, turning that
>> into
>> >> > >> >pickle for transit makes little sense to me and brings a
>> plethora of
>> >> > >> >problems.
>> >> > >> >
>> >> > >> >2) I think indeed the versioning solution should treat the
>> "DagRun"
>> >> > >> >structure atomically. While I see why we would like to go with
>> the
>> >> > >> >UI/Scheduler only first rather than implementing them in the
>> >> > >workers,
>> >> > >> >adding the "mixed version" is where it breaks down IMHO.
>> Reasoning
>> >> > >> >about such "mixed version" dag is next to impossible. The current
>> >> > >> >behavior is not well defined and non-deterministic (depends on
>> >> > >> >scheduler delays, syncing, type of deployment, restarts of the
>> works
>> >> > >> >etc.) we are moving it up to UI (thus users) rather than solving
>> the
>> >> > >> >problem. So I am not a big fan of this and would rather solve it
>> >> > >> >"well" with atomicity.
>> >> > >> >
>> >> > >> >3) I see the point of Dan as well - we had many discussions and
>> many
>> >> > >> >times the idea about "submitting" the DAG for execution via the
>> API
>> >> > >> >came up - and it makes sense IMHO.
>> >> > >> >
>> >> > >> >Proposal: Implement full versioning with code shipping via DB
>> wheels
>> >> > >> >BLOB (akin to serialized DAGs).
>> >> > >> >
>> >> > >> >I understand that the big issue is how to actually "ship" the
>> code
>> >> > >to
>> >> > >> >the worker. And - maybe a wild idea - we can kill several birds
>> with
>> >> > >> >the same stone.
>> >> > >> >
>> >> > >> >There were plenty of discussions on how we could do that but one
>> was
>> >> > >> >never truly explored - using wheel packages.
>> >> > >> >
>> >> > >> >For those who do not know them, there is the PEP:
>> >> > >> >https://www.python.org/dev/peps/pep-0427/
>> >> > >> >
>> >> > >> >Wheels allow to "package" python code in a standard way. They are
>> >> > >> >portable ("purelib" + contain .py rather than .pyc code), they
>> have
>> >> > >> >metadata, versioning information, they can be signed for
>> security,
>> >> > >> >They can contain other packages or python code, Why don't we let
>> >> > >> >scheduler to pack the fingerprinted version of the DAG in a .whl
>> and
>> >> > >> >store it as a blob in a DB next to the serialized form?
>> >> > >> >
>> >> > >> >There were concerns about the size of the code to keep in the DB
>> -
>> >> > >but
>> >> > >> >we already use the DB for serialized DAGs and it works fine (I
>> >> > >believe
>> >> > >> >we only need to add compressing of the JSon serialized form - as
>> >> > >we've
>> >> > >> >learned from AirBnb during their talk at the Airflow Summit -
>> wheels
>> >> > >> >are already compressed). Also - each task will only need the
>> >> > >> >particular "version" of one DAG so even if we keep many of them
>> in
>> >> > >the
>> >> > >> >DB, the old version will pretty soon go "cold" and will never be
>> >> > >> >retrieved (and most DBs will handle it well with
>> caching/indexes).
>> >> > >> >
>> >> > >> >And if we want to add "callables" from other files - there is
>> >> > >nothing
>> >> > >> >to stop the person who defines dag to add list of files that
>> should
>> >> > >be
>> >> > >> >packaged together with the main DAG file
>> (additional_python_files =
>> >> > >> >["common/my_fantastic_library.py"] in DAG constructor). Or we
>> could
>> >> > >> >auto-add all files after the DAG gets imported (i.e. package
>> >> > >> >automatically all files that are imported for that particular DAG
>> >> > >from
>> >> > >> >the "dags" folder"). That should be rather easy.
>> >> > >> >
>> >> > >> >This way we could ship the code to workers for the exact version
>> >> > >that
>> >> > >> >the DagRun uses. And they can be cached and unpacked/installed
>> to a
>> >> > >> >virtualenv for the execution of that single task. That should be
>> >> > >super
>> >> > >> >quick. Such virtualenv can be wiped out after execution.
>> >> > >> >
>> >> > >> >Then we got what Max wants (atomicity of DagRuns) and what Dan
>> wants
>> >> > >> >(the API could simply accept "Wheel file + the Dag id". We have
>> the
>> >> > >> >isolation between tasks running on the same worker (based on
>> >> > >> >virtualenv) so that each process in the same worker can run a
>> >> > >> >different version of the same Dag. We have much less confusion
>> for
>> >> > >the
>> >> > >> >UI.
>> >> > >> >
>> >> > >> >Extra bonus 1: we can expand it to package different
>> dependencies in
>> >> > >> >the wheels as well - so that if an operator requires a different
>> >> > >> >(newer) version of a python library, it could be packaged
>> together
>> >> > >> >with the DAG in the same .whl file. This is also a highly
>> requested
>> >> > >> >feature.
>> >> > >> >Extra bonus 2: workers will stop depending on the DAG file mount
>> (!)
>> >> > >> >which was our long term goal and indeed as Dan mentioned - a
>> great
>> >> > >> >step towards multi-tenancy.
>> >> > >> >
>> >> > >> >J.
>> >> > >> >
>> >> > >> >
>> >> > >> >
>> >> > >> >
>> >> > >> >
>> >> > >> >
>> >> > >> >On Fri, Jul 31, 2020 at 6:41 AM Maxime Beauchemin
>> >> > >> ><ma...@gmail.com> wrote:
>> >> > >> >>
>> >> > >> >> Having tried it early on, I'd advocate pretty strongly against
>> >> > >> >pickles and
>> >> > >> >> would rather not get too deep into the why here. Short story is
>> >> > >they
>> >> > >> >can
>> >> > >> >> pull the entire memory space or much more than you want, and
>> it's
>> >> > >> >> impossible to reason about where they end. For that reason and
>> >> > >other
>> >> > >> >> reasons, they're a security issue. Oh and some objects are not
>> >> > >> >picklable
>> >> > >> >> (Jinja templates! to name a problematic one...). I've also seen
>> >> > >> >> secret-related classes that raise when pickled (thank god!).
>> >> > >> >>
>> >> > >> >> About callback and other things like that, it's quite a puzzle
>> in
>> >> > >> >python.
>> >> > >> >> One solution would be to point to a python namespace
>> >> > >> >> callback="preset.airflow_utils.slack_callback" and assume the
>> >> > >> >function has
>> >> > >> >> to exist in the remote interpreter. Personally I like the
>> >> > >DagFetcher
>> >> > >> >idea
>> >> > >> >> (it could be great to get a pointer to that mailing list thread
>> >> > >> >here),
>> >> > >> >> specifically the GitDagFetcher. I don't know how
>> [un]reasonable it
>> >> > >> >is, but
>> >> > >> >> I hate pickles so much that shipping source code around seems
>> much
>> >> > >> >more
>> >> > >> >> reasonable to me. I think out there there's a talk from Mike
>> Star
>> >> > >> >about
>> >> > >> >> Dataswarm at FB and he may mention how their workers may git
>> >> > >shallow
>> >> > >> >clone
>> >> > >> >> the pipeline repo. Or maybe they use that "beautifully ugly"
>> hack
>> >> > >to
>> >> > >> >use
>> >> > >> >> a gitfs fuse [file system in user space] on the worker [could
>> get
>> >> > >> >deeper
>> >> > >> >> into that, not sure how reasonable that is either].
>> >> > >> >>
>> >> > >> >> About fingerprints, a simple `start_date = datetime.now() -
>> >> > >> >timedelta(1)`
>> >> > >> >> may lead to a never-repeating fingerprint. From memory the spec
>> >> > >> >doesn't
>> >> > >> >> list out the properties considered to build the hash. It be
>> >> > >helpful
>> >> > >> >to
>> >> > >> >> specify and review that list.
>> >> > >> >>
>> >> > >> >> Max
>> >> > >> >>
>> >> > >> >> On Wed, Jul 29, 2020 at 5:20 AM Kaxil Naik <
>> kaxilnaik@gmail.com>
>> >> > >> >wrote:
>> >> > >> >>
>> >> > >> >> > Thanks, both Max and Dan for your comments, please check my
>> >> > >reply
>> >> > >> >below:
>> >> > >> >> >
>> >> > >> >> >
>> >> > >> >> > >  Personally I vote for a DAG version to be pinned and
>> >> > >consistent
>> >> > >> >for the
>> >> > >> >> > > duration of the DAG run. Some of the reasons why:
>> >> > >> >> > > - it's easier to reason about, and therefore visualize and
>> >> > >> >troubleshoot
>> >> > >> >> > > - it prevents some cases where dependencies are never met
>> >> > >> >> > > - it prevents the explosion of artifact/metadata (one
>> >> > >> >serialization per
>> >> > >> >> > > dagrun as opposed to one per scheduler cycle) in the case
>> of a
>> >> > >> >dynamic
>> >> > >> >> > DAG
>> >> > >> >> > > whose fingerprint is never the same.
>> >> > >> >> >
>> >> > >> >> >
>> >> > >> >> > In this AIP, we were only looking to fix the current "Viewing
>> >> > >> >behaviour"
>> >> > >> >> > and
>> >> > >> >> > we were intentionally not changing the execution behaviour.
>> >> > >> >> > The change you are suggesting means we need to introduce DAG
>> >> > >> >Versioning for
>> >> > >> >> > the
>> >> > >> >> > workers too. This will need more work as can't use the
>> >> > >Serialised
>> >> > >> >> > Representation
>> >> > >> >> > to run the task since users could use custom modules in a
>> >> > >different
>> >> > >> >part of
>> >> > >> >> > code,
>> >> > >> >> > example the PythonOperator has python_callable that allows
>> >> > >running
>> >> > >> >any
>> >> > >> >> > arbitrary code.
>> >> > >> >> > A similar case is with the *on_*_callbacks* defined on DAG.
>> >> > >> >> >
>> >> > >> >> > Based on the current scope of the AIP, we still plan to use
>> the
>> >> > >> >actual DAG
>> >> > >> >> > files for the
>> >> > >> >> > execution and not use Serialized DAGs for the workers.
>> >> > >> >> >
>> >> > >> >> > To account for all the custom modules we will have to start
>> >> > >looking
>> >> > >> >at
>> >> > >> >> > pickle (cloudpickle).
>> >> > >> >> >
>> >> > >> >> > I'm certain that there are lots of
>> >> > >> >> > > those DAGs out there, and that it will overwhelm the
>> metadata
>> >> > >> >database,
>> >> > >> >> > and
>> >> > >> >> > > confuse the users. For an hourly DAG is would mean 24
>> artifact
>> >> > >> >per day
>> >> > >> >> > > instead of 1000+
>> >> > >> >> >
>> >> > >> >> >
>> >> > >> >> > What kind of dynamic DAGs are we talking about here, I would
>> >> > >think
>> >> > >> >the DAG
>> >> > >> >> > signature won't change
>> >> > >> >> > but I might be wrong, can you give an example, please.
>> >> > >> >> >
>> >> > >> >> > If backwards compatibility in behavior is a concern, I'd
>> >> > >recommend
>> >> > >> >adding a
>> >> > >> >> > > flag to the DAG class and/or config and make sure we're
>> doing
>> >> > >the
>> >> > >> >right
>> >> > >> >> > > thing by default. People who want backward compatibility
>> would
>> >> > >> >have to
>> >> > >> >> > > change that default. But again, that's a lot of extra and
>> >> > >> >confusing
>> >> > >> >> > > complexity that will likely be the source of bugs and user
>> >> > >> >confusion.
>> >> > >> >> > > Having a clear, easy to reason about execution model is
>> super
>> >> > >> >important.
>> >> > >> >> >
>> >> > >> >> > Think about visualizing a DAG that shapeshifted 5 times
>> during
>> >> > >its
>> >> > >> >> > > execution, how does anyone make sense of that?
>> >> > >> >> >
>> >> > >> >> >
>> >> > >> >> > Wouldn't that be an edge case? How often would someone change
>> >> > >the
>> >> > >> >DAG
>> >> > >> >> > structure in the middle of
>> >> > >> >> > a DAG execution. And since if they do change, the Graph View
>> >> > >should
>> >> > >> >show
>> >> > >> >> > all the tasks that were
>> >> > >> >> > run, if it just shows based on the latest version, the
>> behaviour
>> >> > >> >would be
>> >> > >> >> > the same as now.
>> >> > >> >> >
>> >> > >> >> > --------
>> >> > >> >> >
>> >> > >> >> > Strongly agree with Max's points, also I feel the right way
>> to
>> >> > >go
>> >> > >> >about
>> >> > >> >> > > this is instead of Airflow schedulers/webservers/workers
>> >> > >reading
>> >> > >> >DAG
>> >> > >> >> > Python
>> >> > >> >> > > files, they would instead read from serialized
>> representations
>> >> > >of
>> >> > >> >the
>> >> > >> >> > DAGs
>> >> > >> >> > > (e.g. json representation in the Airflow DB). Instead of
>> DAG
>> >> > >> >owners
>> >> > >> >> > pushing
>> >> > >> >> > > their DAG files to the Airflow components via varying
>> >> > >mechanisms
>> >> > >> >(e.g.
>> >> > >> >> > > git), they would instead call an Airflow CLI to push the
>> >> > >> >serialized DAG
>> >> > >> >> > > representations to the DB, and for things like dynamic DAGs
>> >> > >you
>> >> > >> >could
>> >> > >> >> > > populate them from a DAG or another service.
>> >> > >> >> >
>> >> > >> >> >
>> >> > >> >> > Airflow Webserver and the Scheduler will definitely read from
>> >> > >the
>> >> > >> >> > Serialized representation as
>> >> > >> >> > they don't need all the code from the DAG files.
>> >> > >> >> >
>> >> > >> >> > While the workers definitely need access to DAG files as the
>> >> > >> >> > tasks/operators would be using
>> >> > >> >> > code form custom modules and classes which are required to
>> run
>> >> > >the
>> >> > >> >tasks.
>> >> > >> >> >
>> >> > >> >> > If we do want to go down that route we will have to use
>> >> > >something
>> >> > >> >like
>> >> > >> >> > cloudpickle that serializes
>> >> > >> >> > entire DAG file and their dependencies. And also ensure that
>> >> > >> >someone is not
>> >> > >> >> > able to change the pickled
>> >> > >> >> > source when sending from executor to the worker as that
>> poses a
>> >> > >big
>> >> > >> >> > security risk.
>> >> > >> >> >
>> >> > >> >> > - Kaxil
>> >> > >> >> >
>> >> > >> >> > On Wed, Jul 29, 2020 at 12:43 PM Jacob Ward
>> >> > ><jw...@brandwatch.com>
>> >> > >> >wrote:
>> >> > >> >> >
>> >> > >> >> > > I came here to say what Max has said, only less eloquently.
>> >> > >> >> > >
>> >> > >> >> > > I do have one concern with locking the version for a single
>> >> > >run.
>> >> > >> >> > Currently
>> >> > >> >> > > it is possible for a user to create a dag which
>> intentionally
>> >> > >> >changes as
>> >> > >> >> > a
>> >> > >> >> > > dag executes, i.e. dynamically creating a task for the dag
>> >> > >during
>> >> > >> >a run
>> >> > >> >> > by
>> >> > >> >> > > modifying external data, but this change would prevent
>> that.
>> >> > >I'm
>> >> > >> >of the
>> >> > >> >> > > opinion that this situation is bad practice anyway so it
>> >> > >doesn't
>> >> > >> >matter
>> >> > >> >> > if
>> >> > >> >> > > we make it impossible to do, but others may disagree.
>> >> > >> >> > >
>> >> > >> >> > > On Tue, 28 Jul 2020 at 17:08, Dan Davydov
>> >> > >> ><dd...@twitter.com.invalid>
>> >> > >> >> > > wrote:
>> >> > >> >> > >
>> >> > >> >> > > > Strongly agree with Max's points, also I feel the right
>> way
>> >> > >to
>> >> > >> >go about
>> >> > >> >> > > > this is instead of Airflow schedulers/webservers/workers
>> >> > >> >reading DAG
>> >> > >> >> > > Python
>> >> > >> >> > > > files, they would instead read from serialized
>> >> > >representations
>> >> > >> >of the
>> >> > >> >> > > DAGs
>> >> > >> >> > > > (e.g. json representation in the Airflow DB). Instead of
>> DAG
>> >> > >> >owners
>> >> > >> >> > > pushing
>> >> > >> >> > > > their DAG files to the Airflow components via varying
>> >> > >> >mechanisms (e.g.
>> >> > >> >> > > > git), they would instead call an Airflow CLI to push the
>> >> > >> >serialized DAG
>> >> > >> >> > > > representations to the DB, and for things like dynamic
>> DAGs
>> >> > >you
>> >> > >> >could
>> >> > >> >> > > > populate them from a DAG or another service.
>> >> > >> >> > > >
>> >> > >> >> > > > This would also enable other features like stronger
>> >> > >> >> > > security/multi-tenancy.
>> >> > >> >> > > >
>> >> > >> >> > > > On Tue, Jul 28, 2020 at 6:44 PM Maxime Beauchemin <
>> >> > >> >> > > > maximebeauchemin@gmail.com> wrote:
>> >> > >> >> > > >
>> >> > >> >> > > > > > "mixed version"
>> >> > >> >> > > > >
>> >> > >> >> > > > > Personally I vote for a DAG version to be pinned and
>> >> > >> >consistent for
>> >> > >> >> > the
>> >> > >> >> > > > > duration of the DAG run. Some of the reasons why:
>> >> > >> >> > > > > - it's easier to reason about, and therefore visualize
>> and
>> >> > >> >> > troubleshoot
>> >> > >> >> > > > > - it prevents some cases where dependencies are never
>> met
>> >> > >> >> > > > > - it prevents the explosion of artifact/metadata (one
>> >> > >> >serialization
>> >> > >> >> > per
>> >> > >> >> > > > > dagrun as opposed to one per scheduler cycle) in the
>> case
>> >> > >of
>> >> > >> >a
>> >> > >> >> > dynamic
>> >> > >> >> > > > DAG
>> >> > >> >> > > > > whose fingerprint is never the same. I'm certain that
>> >> > >there
>> >> > >> >are lots
>> >> > >> >> > of
>> >> > >> >> > > > > those DAGs out there, and that it will overwhelm the
>> >> > >metadata
>> >> > >> >> > database,
>> >> > >> >> > > > and
>> >> > >> >> > > > > confuse the users. For an hourly DAG is would mean 24
>> >> > >> >artifact per
>> >> > >> >> > day
>> >> > >> >> > > > > instead of 1000+
>> >> > >> >> > > > >
>> >> > >> >> > > > > If backwards compatibility in behavior is a concern,
>> I'd
>> >> > >> >recommend
>> >> > >> >> > > > adding a
>> >> > >> >> > > > > flag to the DAG class and/or config and make sure we're
>> >> > >doing
>> >> > >> >the
>> >> > >> >> > right
>> >> > >> >> > > > > thing by default. People who want backward
>> compatibility
>> >> > >> >would have
>> >> > >> >> > to
>> >> > >> >> > > > > change that default. But again, that's a lot of extra
>> and
>> >> > >> >confusing
>> >> > >> >> > > > > complexity that will likely be the source of bugs and
>> user
>> >> > >> >confusion.
>> >> > >> >> > > > > Having a clear, easy to reason about execution model is
>> >> > >super
>> >> > >> >> > > important.
>> >> > >> >> > > > >
>> >> > >> >> > > > > Think about visualizing a DAG that shapeshifted 5 times
>> >> > >> >during its
>> >> > >> >> > > > > execution, how does anyone make sense of that?
>> >> > >> >> > > > >
>> >> > >> >> > > > > Max
>> >> > >> >> > > > >
>> >> > >> >> > > > > On Tue, Jul 28, 2020 at 3:14 AM Kaxil Naik
>> >> > >> ><ka...@gmail.com>
>> >> > >> >> > > wrote:
>> >> > >> >> > > > >
>> >> > >> >> > > > > > Thanks Max for your comments.
>> >> > >> >> > > > > >
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > *DAG Fingerprinting: *this can be tricky, especially
>> in
>> >> > >> >regards to
>> >> > >> >> > > > > dynamic
>> >> > >> >> > > > > > > DAGs, where in some cases each parsing of the DAG
>> can
>> >> > >> >result in a
>> >> > >> >> > > > > > different
>> >> > >> >> > > > > > > fingerprint. I think DAG and tasks attributes are
>> left
>> >> > >> >out from
>> >> > >> >> > the
>> >> > >> >> > > > > > > proposal that should be considered as part of the
>> >> > >> >fingerprint,
>> >> > >> >> > like
>> >> > >> >> > > > > > trigger
>> >> > >> >> > > > > > > rules or task start/end datetime. We should do a
>> full
>> >> > >> >pass of all
>> >> > >> >> > > DAG
>> >> > >> >> > > > > > > arguments and make sure we're not forgetting
>> anything
>> >> > >> >that can
>> >> > >> >> > > change
>> >> > >> >> > > > > > > scheduling logic. Also, let's be careful that
>> >> > >something
>> >> > >> >as simple
>> >> > >> >> > > as
>> >> > >> >> > > > a
>> >> > >> >> > > > > > > dynamic start or end date on a task could lead to a
>> >> > >> >different
>> >> > >> >> > > version
>> >> > >> >> > > > > > each
>> >> > >> >> > > > > > > time you parse.
>> >> > >> >> > > > > >
>> >> > >> >> > > > > >
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > The short version of Dag Fingerprinting would be
>> >> > >> >> > > > > > just a hash of the Serialized DAG.
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > *Example DAG*: https://imgur.com/TVuoN3p
>> >> > >> >> > > > > > *Example Serialized DAG*: https://imgur.com/LmA2Bpr
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > It contains all the task & DAG parameters. When they
>> >> > >> >change,
>> >> > >> >> > > Scheduler
>> >> > >> >> > > > > > writes
>> >> > >> >> > > > > > a new version of Serialized DAGs to the DB. The
>> >> > >Webserver
>> >> > >> >then
>> >> > >> >> > reads
>> >> > >> >> > > > the
>> >> > >> >> > > > > > DAGs from the DB.
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > I'd recommend limiting serialization/storage of one
>> >> > >version
>> >> > >> >> > > > > > > per DAG Run, as opposed to potentially everytime
>> the
>> >> > >DAG
>> >> > >> >is
>> >> > >> >> > parsed
>> >> > >> >> > > -
>> >> > >> >> > > > > once
>> >> > >> >> > > > > > > the version for a DAG run is pinned,
>> fingerprinting is
>> >> > >> >not
>> >> > >> >> > > > re-evaluated
>> >> > >> >> > > > > > > until the next DAG run is ready to get created.
>> >> > >> >> > > > > >
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > This is to handle Scenario 3 where a DAG structure is
>> >> > >> >changed
>> >> > >> >> > > mid-way.
>> >> > >> >> > > > > > Since we don't intend to
>> >> > >> >> > > > > > change the execution behaviour, if we limit Storage
>> of 1
>> >> > >> >version
>> >> > >> >> > per
>> >> > >> >> > > > DAG,
>> >> > >> >> > > > > > it won't actually show what
>> >> > >> >> > > > > > was run.
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > Example Dag v1: Task A -> Task B -> Task C
>> >> > >> >> > > > > > The worker has completed the execution of Task B and
>> is
>> >> > >> >just about
>> >> > >> >> > to
>> >> > >> >> > > > > > complete the execution of Task B.
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > The 2nd version of DAG is deployed: Task A -> Task D
>> >> > >> >> > > > > > Now Scheduler queued Task D and it will run to
>> >> > >completion.
>> >> > >> >(Task C
>> >> > >> >> > > > won't
>> >> > >> >> > > > > > run)
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > In this case, "the actual representation of the DAG"
>> >> > >that
>> >> > >> >run is
>> >> > >> >> > > > neither
>> >> > >> >> > > > > v1
>> >> > >> >> > > > > > nor v2 but a "mixed version"
>> >> > >> >> > > > > >  (Task A -> Task B -> Task D). The plan is that the
>> >> > >> >Scheduler will
>> >> > >> >> > > > create
>> >> > >> >> > > > > > this "mixed version" based on what ran
>> >> > >> >> > > > > > and the Graph View would show this "mixed version".
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > There would also be a toggle button on the Graph
>> View to
>> >> > >> >select v1
>> >> > >> >> > or
>> >> > >> >> > > > v2
>> >> > >> >> > > > > > where the tasks will be highlighted to show
>> >> > >> >> > > > > > that a particular task was in v1 or v2 as shown in
>> >> > >> >> > > > > >
>> >> > >> >> > > > > >
>> >> > >> >> > > > >
>> >> > >> >> > > >
>> >> > >> >> > >
>> >> > >> >> >
>> >> > >>
>> >> > >>
>> >> >
>> https://cwiki.apache.org/confluence/download/attachments/158868919/Picture%201.png?version=2&modificationDate=1595612863000&api=v2
>> >> > >> >> > > > > >
>> >> > >> >> > > > > >
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > *Visualizing change in the tree view:* I think this
>> is
>> >> > >very
>> >> > >> >complex
>> >> > >> >> > > and
>> >> > >> >> > > > > > > many things can make this view impossible to render
>> >> > >(task
>> >> > >> >> > > dependency
>> >> > >> >> > > > > > > reversal, cycles across versions, ...). Maybe a
>> better
>> >> > >> >visual
>> >> > >> >> > > > approach
>> >> > >> >> > > > > > > would be to render independent, individual tree
>> views
>> >> > >for
>> >> > >> >each
>> >> > >> >> > DAG
>> >> > >> >> > > > > > version
>> >> > >> >> > > > > > > (side by side), and doing best effort aligning the
>> >> > >tasks
>> >> > >> >across
>> >> > >> >> > > > blocks
>> >> > >> >> > > > > > and
>> >> > >> >> > > > > > > "linking" tasks with lines across blocks when
>> >> > >necessary.
>> >> > >> >> > > > > >
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > Agreed, the plan is to do the best effort aligning.
>> >> > >> >> > > > > > At this point in time, task additions to the end of
>> the
>> >> > >DAG
>> >> > >> >are
>> >> > >> >> > > > expected
>> >> > >> >> > > > > to
>> >> > >> >> > > > > > be compatible,
>> >> > >> >> > > > > > but changes to task structure within the DAG may
>> cause
>> >> > >the
>> >> > >> >tree
>> >> > >> >> > view
>> >> > >> >> > > > not
>> >> > >> >> > > > > to
>> >> > >> >> > > > > > incorporate “old” and “new” in the same view, hence
>> that
>> >> > >> >won't be
>> >> > >> >> > > > shown.
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > Regards,
>> >> > >> >> > > > > > Kaxil
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > On Mon, Jul 27, 2020 at 6:02 PM Maxime Beauchemin <
>> >> > >> >> > > > > > maximebeauchemin@gmail.com> wrote:
>> >> > >> >> > > > > >
>> >> > >> >> > > > > > > Some notes and ideas:
>> >> > >> >> > > > > > >
>> >> > >> >> > > > > > > *DAG Fingerprinting: *this can be tricky,
>> especially
>> >> > >in
>> >> > >> >regards
>> >> > >> >> > to
>> >> > >> >> > > > > > dynamic
>> >> > >> >> > > > > > > DAGs, where in some cases each parsing of the DAG
>> can
>> >> > >> >result in a
>> >> > >> >> > > > > > different
>> >> > >> >> > > > > > > fingerprint. I think DAG and tasks attributes are
>> left
>> >> > >> >out from
>> >> > >> >> > the
>> >> > >> >> > > > > > > proposal that should be considered as part of the
>> >> > >> >fingerprint,
>> >> > >> >> > like
>> >> > >> >> > > > > > trigger
>> >> > >> >> > > > > > > rules or task start/end datetime. We should do a
>> full
>> >> > >> >pass of all
>> >> > >> >> > > DAG
>> >> > >> >> > > > > > > arguments and make sure we're not forgetting
>> anything
>> >> > >> >that can
>> >> > >> >> > > change
>> >> > >> >> > > > > > > scheduling logic. Also, let's be careful that
>> >> > >something
>> >> > >> >as simple
>> >> > >> >> > > as
>> >> > >> >> > > > a
>> >> > >> >> > > > > > > dynamic start or end date on a task could lead to a
>> >> > >> >different
>> >> > >> >> > > version
>> >> > >> >> > > > > > each
>> >> > >> >> > > > > > > time you parse. I'd recommend limiting
>> >> > >> >serialization/storage of
>> >> > >> >> > one
>> >> > >> >> > > > > > version
>> >> > >> >> > > > > > > per DAG Run, as opposed to potentially everytime
>> the
>> >> > >DAG
>> >> > >> >is
>> >> > >> >> > parsed
>> >> > >> >> > > -
>> >> > >> >> > > > > once
>> >> > >> >> > > > > > > the version for a DAG run is pinned,
>> fingerprinting is
>> >> > >> >not
>> >> > >> >> > > > re-evaluated
>> >> > >> >> > > > > > > until the next DAG run is ready to get created.
>> >> > >> >> > > > > > >
>> >> > >> >> > > > > > > *Visualizing change in the tree view:* I think
>> this is
>> >> > >> >very
>> >> > >> >> > complex
>> >> > >> >> > > > and
>> >> > >> >> > > > > > > many things can make this view impossible to render
>> >> > >(task
>> >> > >> >> > > dependency
>> >> > >> >> > > > > > > reversal, cycles across versions, ...). Maybe a
>> better
>> >> > >> >visual
>> >> > >> >> > > > approach
>> >> > >> >> > > > > > > would be to render independent, individual tree
>> views
>> >> > >for
>> >> > >> >each
>> >> > >> >> > DAG
>> >> > >> >> > > > > > version
>> >> > >> >> > > > > > > (side by side), and doing best effort aligning the
>> >> > >tasks
>> >> > >> >across
>> >> > >> >> > > > blocks
>> >> > >> >> > > > > > and
>> >> > >> >> > > > > > > "linking" tasks with lines across blocks when
>> >> > >necessary.
>> >> > >> >> > > > > > >
>> >> > >> >> > > > > > > On Fri, Jul 24, 2020 at 12:46 PM Vikram Koka <
>> >> > >> >> > vikram@astronomer.io
>> >> > >> >> > > >
>> >> > >> >> > > > > > wrote:
>> >> > >> >> > > > > > >
>> >> > >> >> > > > > > > > Team,
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > > We just created 'AIP-36 DAG Versioning' on
>> >> > >Confluence
>> >> > >> >and would
>> >> > >> >> > > > very
>> >> > >> >> > > > > > much
>> >> > >> >> > > > > > > > appreciate feedback and suggestions from the
>> >> > >community.
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > >
>> >> > >> >> > > > > >
>> >> > >> >> > > > >
>> >> > >> >> > > >
>> >> > >> >> > >
>> >> > >> >> >
>> >> > >>
>> >> > >>
>> >> >
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-36+DAG+Versioning
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > > The DAG Versioning concept has been discussed on
>> >> > >> >multiple
>> >> > >> >> > > occasions
>> >> > >> >> > > > > in
>> >> > >> >> > > > > > > the
>> >> > >> >> > > > > > > > past and has been a topic highlighted as part of
>> >> > >> >Airflow 2.0 as
>> >> > >> >> > > > well.
>> >> > >> >> > > > > > We
>> >> > >> >> > > > > > > at
>> >> > >> >> > > > > > > > Astronomer have heard data engineers at several
>> >> > >> >enterprises ask
>> >> > >> >> > > > about
>> >> > >> >> > > > > > > this
>> >> > >> >> > > > > > > > feature as well, for easier debugging when
>> changes
>> >> > >are
>> >> > >> >made to
>> >> > >> >> > > DAGs
>> >> > >> >> > > > > as
>> >> > >> >> > > > > > a
>> >> > >> >> > > > > > > > result of evolving business needs.
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > > As described in the AIP, we have a proposal
>> focused
>> >> > >on
>> >> > >> >ensuring
>> >> > >> >> > > > that
>> >> > >> >> > > > > > the
>> >> > >> >> > > > > > > > visibility behaviour of Airflow is correct,
>> without
>> >> > >> >changing
>> >> > >> >> > the
>> >> > >> >> > > > > > > execution
>> >> > >> >> > > > > > > > behaviour. We considered changing the execution
>> >> > >> >behaviour as
>> >> > >> >> > > well,
>> >> > >> >> > > > > but
>> >> > >> >> > > > > > > > decided that the risks in changing execution
>> >> > >behavior
>> >> > >> >were too
>> >> > >> >> > > high
>> >> > >> >> > > > > as
>> >> > >> >> > > > > > > > compared to the benefits and therefore decided to
>> >> > >limit
>> >> > >> >the
>> >> > >> >> > scope
>> >> > >> >> > > > to
>> >> > >> >> > > > > > only
>> >> > >> >> > > > > > > > making sure that the visibility was correct.
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > > We would like to attempt this based on our
>> >> > >experience
>> >> > >> >running
>> >> > >> >> > > > Airflow
>> >> > >> >> > > > > > as
>> >> > >> >> > > > > > > a
>> >> > >> >> > > > > > > > service. We believe that this benefits Airflow
>> as a
>> >> > >> >project and
>> >> > >> >> > > the
>> >> > >> >> > > > > > > > development experience of data engineers using
>> >> > >Airflow
>> >> > >> >across
>> >> > >> >> > the
>> >> > >> >> > > > > > world.
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > >  Any feedback, suggestions, and comments would be
>> >> > >> >greatly
>> >> > >> >> > > > > appreciated.
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > > Best Regards,
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > > > Kaxil Naik, Ryan Hamilton, Ash Berlin-Taylor, and
>> >> > >> >Vikram Koka
>> >> > >> >> > > > > > > >
>> >> > >> >> > > > > > >
>> >> > >> >> > > > > >
>> >> > >> >> > > > >
>> >> > >> >> > > >
>> >> > >> >> > >
>> >> > >> >> > >
>> >> > >> >> > > --
>> >> > >> >> > >
>> >> > >> >> > > Jacob Ward    |    Graduate Data Infrastructure Engineer
>> >> > >> >> > >
>> >> > >> >> > > jward@brandwatch.com
>> >> > >> >> > >
>> >> > >> >> > >
>> >> > >> >> > > NEW YORK   | BOSTON   | BRIGHTON   | LONDON   | BERLIN |
>> >> > >> >STUTTGART |
>> >> > >> >> > > PARIS   | SINGAPORE | SYDNEY
>> >> > >> >> > >
>> >> > >> >> >
>> >> > >> >
>> >> > >> >
>> >> > >> >
>> >> > >> >--
>> >> > >> >
>> >> > >> >Jarek Potiuk
>> >> > >> >Polidea | Principal Software Engineer
>> >> > >> >
>> >> > >> >M: +48 660 796 129
>> >> >
>> >>
>> >>
>> >> --
>> >>
>> >> Jarek Potiuk
>> >> Polidea <https://www.polidea.com/> | Principal Software Engineer
>> >>
>> >> M: +48 660 796 129 <+48660796129>
>> >> [image: Polidea] <https://www.polidea.com/>
>>
>>

Re: [PROPOSAL][AIP-36 DAG Versioning]

Posted by Max Payton <mp...@lyft.com.INVALID>.
Hey, I was wondering if the resurrected AIP was ever published? This is
something that we (Lyft) are very interested in, and would like to
contribute to as well.
*Max Payton*
He/Him/His
Software Engineer
202.441.7757 <+12024417757>
[image: Lyft] <http://www.lyft.com/>


On Tue, Feb 15, 2022 at 4:23 AM Jarek Potiuk <ja...@potiuk.com> wrote:

> Woohoo! Looking forward to it!
>
> On Tue, Feb 15, 2022 at 1:11 PM Kaxil Naik <ka...@gmail.com> wrote:
> >
> > Hey folks,
> >
> > Just reviving this old thread to provide an update that we (Astronomer)
> will be resurrecting AIP-36 DAG Versioning with a different scope in the
> coming days that will be more consistent with what has been discussed in
> this thread.
> >
> > Regards,
> > Kaxil
> >
> > On Thu, Aug 13, 2020 at 9:32 PM Jarek Potiuk <Ja...@polidea.com>
> wrote:
> >>
> >> I fully agree with the "user" not having to know any of the "wheel'
> >> details. Similarly as they do not have to know python interpreter or the
> >> underlying libc library details. This  all should be hidden from the
> users.
> >>
> >> I think the wheels API that we might have there, does not have to be
> >> user-facing. We could - rather easily - make a client that points to a
> DAG
> >> file and builds appropriate wheel package under-the-hood and submits
> it. I
> >> reallly doubt any of the users will directly use the API to submit DAGs
> -
> >> they will use some clients built on top of it.
> >>
> >> I think we should separate the user side form the implementation -
> >> similarly as we do not expect the users to know any details on how "DAG
> >> Fetcher" should work - in any case with the DAG fetcher, we need to
> define
> >> how DAG fetcher will make sure about "atomicity" anyway - how to make
> sure
> >> that you get a "consistent" version of all the dependent python files
> when
> >> you fetch them? This is the part of DAG fetcher that i do not like
> because
> >> it assumes that "someone else" maintains the consistency and provides
> the
> >> "consistent view" somewhere on the "DAG Server" side (whatever the
> server
> >> side is).
> >>
> >> There were many ideas about some kind of manifest describing the files
> etc,
> >> but I think all of that depends on some kind of ability of providing a
> >> "snapshot" of files that will be consistent set to execute. With 'DAG
> >> Fetcher" this is somthing that "DAG Fetching server" has to provide.
> It's
> >> super easy if that "server" is GIT - we already use it for GIT sync. But
> >> it's rather difficult to provide a good abstraction for it for "generic"
> >> DAG fetcher.
> >>
> >> IMHO this is far easier to provide such consistent set at a "submission
> >> time". In pretty-much all cases, the user submitting the job already has
> >> consistent set of python files that the DAG uses. This is pretty much
> >> given. I think the job of the "submission" mechanism is to make a
> >> "snapshot" out of that consistent set and submit this snapshot, rather
> than
> >> individual files. Git provides it out of the box, but if we want to be
> >> generic - I see no other way than to build such "snapshot" locally. And
> >> Wheels seems like a very good candidate - if only it's an implementation
> >> detail and will be hidden from the users.
> >>
> >> J.
> >>
> >>
> >>
> >>
> >> On Tue, Aug 11, 2020 at 8:33 PM Ash Berlin-Taylor <as...@apache.org>
> wrote:
> >>
> >> > Anything to doing with the process of building wheels should be a
> "power
> >> > user" only feature, and should not be required for many users - many
> many
> >> > users of airflow are not primarily Python developers, but data
> scientists,
> >> > and needing them to understand anything about the python build
> toolchain is
> >> > too much of a learning curve for the benefit.
> >> >
> >> > After all it is very rare that people hit the multiple concurrent
> versions
> >> > of a dag.
> >> >
> >> > -ash
> >> >
> >> > On 10 August 2020 17:37:32 BST, Tomasz Urbaszek <turbaszek@apache.org
> >
> >> > wrote:
> >> > >I like the idea of wheels as this is probably the "most pythonic"
> >> > >solution. And "DAG version" is not only defined by DAG code but also
> >> > >by all dependencies the DAG uses (custom functions, libraries etc)
> and
> >> > >it seems that wheels can address that.
> >> > >
> >> > >However, I second Ash - keeping wheels in db doesn't sound good. In
> my
> >> > >opinion, DAG fetcher is the right solution and the idea surfaces
> every
> >> > >time we talk about serialization. This abstraction has a lot of pros
> >> > >as it allows a lot of customization (wheels, local fs, remote fs,
> >> > >wheels etc).
> >> > >
> >> > >Apart from that, if we decided to use wheels we should provide a CLI
> >> > >command to ease the process of building them. Also, I'm wondering
> >> > >about developers' workflow. Moving between code of different DAG
> >> > >version sounds easy if you use git but... what if someone doesn't use
> >> > >it?
> >> > >
> >> > >Tomek
> >> > >
> >> > >
> >> > >On Sat, Aug 8, 2020 at 9:49 AM Ash Berlin-Taylor <as...@apache.org>
> >> > >wrote:
> >> > >>
> >> > >> Quick comment (as I'm still mostly on paternity leave):
> >> > >>
> >> > >> Storing wheels in the db sounds like a bad Idea to me, especially
> if
> >> > >we need to store deps in there too (and if we don't store deps, then
> >> > >they are incomplete) - they could get very large, and I've stored
> blobs
> >> > >of ~10mb in postgres before: I don't recommend it. It "works" but
> >> > >operating it is tricky.
> >> > >>
> >> > >>
> >> > >>
> >> > >> > the API could simply accept "Wheel file + the Dag id"
> >> > >>
> >> > >> This sounds like a huge security risk.
> >> > >>
> >> > >>
> >> > >> My main concern with this idea is that it seems a lot of complexity
> >> > >we are putting on users. Doubly so if they are already using docker
> >> > >where there already exists an Ideal packaging and distribution that
> >> > >could contain dag + needed code.
> >> > >>
> >> > >> (Sorry for the brevity)
> >> > >>
> >> > >> -ash
> >> > >>
> >> > >>
> >> > >> On 2 August 2020 08:47:39 BST, Jarek Potiuk
> >> > ><Ja...@polidea.com> wrote:
> >> > >> >Few points from my sid (and proposal!):
> >> > >> >
> >> > >> >1) Agree with Max -  with a rather strong NO for pickles (however,
> >> > >> >indeed cloudpickle solves some of the problems). Pickles came up
> in
> >> > >> >our discussion in Polidea recently and the overall message was
> "no".
> >> > >I
> >> > >> >agree with Max here - if we can ship python code, turning that
> into
> >> > >> >pickle for transit makes little sense to me and brings a plethora
> of
> >> > >> >problems.
> >> > >> >
> >> > >> >2) I think indeed the versioning solution should treat the
> "DagRun"
> >> > >> >structure atomically. While I see why we would like to go with the
> >> > >> >UI/Scheduler only first rather than implementing them in the
> >> > >workers,
> >> > >> >adding the "mixed version" is where it breaks down IMHO. Reasoning
> >> > >> >about such "mixed version" dag is next to impossible. The current
> >> > >> >behavior is not well defined and non-deterministic (depends on
> >> > >> >scheduler delays, syncing, type of deployment, restarts of the
> works
> >> > >> >etc.) we are moving it up to UI (thus users) rather than solving
> the
> >> > >> >problem. So I am not a big fan of this and would rather solve it
> >> > >> >"well" with atomicity.
> >> > >> >
> >> > >> >3) I see the point of Dan as well - we had many discussions and
> many
> >> > >> >times the idea about "submitting" the DAG for execution via the
> API
> >> > >> >came up - and it makes sense IMHO.
> >> > >> >
> >> > >> >Proposal: Implement full versioning with code shipping via DB
> wheels
> >> > >> >BLOB (akin to serialized DAGs).
> >> > >> >
> >> > >> >I understand that the big issue is how to actually "ship" the code
> >> > >to
> >> > >> >the worker. And - maybe a wild idea - we can kill several birds
> with
> >> > >> >the same stone.
> >> > >> >
> >> > >> >There were plenty of discussions on how we could do that but one
> was
> >> > >> >never truly explored - using wheel packages.
> >> > >> >
> >> > >> >For those who do not know them, there is the PEP:
> >> > >> >https://www.python.org/dev/peps/pep-0427/
> >> > >> >
> >> > >> >Wheels allow to "package" python code in a standard way. They are
> >> > >> >portable ("purelib" + contain .py rather than .pyc code), they
> have
> >> > >> >metadata, versioning information, they can be signed for security,
> >> > >> >They can contain other packages or python code, Why don't we let
> >> > >> >scheduler to pack the fingerprinted version of the DAG in a .whl
> and
> >> > >> >store it as a blob in a DB next to the serialized form?
> >> > >> >
> >> > >> >There were concerns about the size of the code to keep in the DB -
> >> > >but
> >> > >> >we already use the DB for serialized DAGs and it works fine (I
> >> > >believe
> >> > >> >we only need to add compressing of the JSon serialized form - as
> >> > >we've
> >> > >> >learned from AirBnb during their talk at the Airflow Summit -
> wheels
> >> > >> >are already compressed). Also - each task will only need the
> >> > >> >particular "version" of one DAG so even if we keep many of them in
> >> > >the
> >> > >> >DB, the old version will pretty soon go "cold" and will never be
> >> > >> >retrieved (and most DBs will handle it well with caching/indexes).
> >> > >> >
> >> > >> >And if we want to add "callables" from other files - there is
> >> > >nothing
> >> > >> >to stop the person who defines dag to add list of files that
> should
> >> > >be
> >> > >> >packaged together with the main DAG file (additional_python_files
> =
> >> > >> >["common/my_fantastic_library.py"] in DAG constructor). Or we
> could
> >> > >> >auto-add all files after the DAG gets imported (i.e. package
> >> > >> >automatically all files that are imported for that particular DAG
> >> > >from
> >> > >> >the "dags" folder"). That should be rather easy.
> >> > >> >
> >> > >> >This way we could ship the code to workers for the exact version
> >> > >that
> >> > >> >the DagRun uses. And they can be cached and unpacked/installed to
> a
> >> > >> >virtualenv for the execution of that single task. That should be
> >> > >super
> >> > >> >quick. Such virtualenv can be wiped out after execution.
> >> > >> >
> >> > >> >Then we got what Max wants (atomicity of DagRuns) and what Dan
> wants
> >> > >> >(the API could simply accept "Wheel file + the Dag id". We have
> the
> >> > >> >isolation between tasks running on the same worker (based on
> >> > >> >virtualenv) so that each process in the same worker can run a
> >> > >> >different version of the same Dag. We have much less confusion for
> >> > >the
> >> > >> >UI.
> >> > >> >
> >> > >> >Extra bonus 1: we can expand it to package different dependencies
> in
> >> > >> >the wheels as well - so that if an operator requires a different
> >> > >> >(newer) version of a python library, it could be packaged together
> >> > >> >with the DAG in the same .whl file. This is also a highly
> requested
> >> > >> >feature.
> >> > >> >Extra bonus 2: workers will stop depending on the DAG file mount
> (!)
> >> > >> >which was our long term goal and indeed as Dan mentioned - a great
> >> > >> >step towards multi-tenancy.
> >> > >> >
> >> > >> >J.
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> >On Fri, Jul 31, 2020 at 6:41 AM Maxime Beauchemin
> >> > >> ><ma...@gmail.com> wrote:
> >> > >> >>
> >> > >> >> Having tried it early on, I'd advocate pretty strongly against
> >> > >> >pickles and
> >> > >> >> would rather not get too deep into the why here. Short story is
> >> > >they
> >> > >> >can
> >> > >> >> pull the entire memory space or much more than you want, and
> it's
> >> > >> >> impossible to reason about where they end. For that reason and
> >> > >other
> >> > >> >> reasons, they're a security issue. Oh and some objects are not
> >> > >> >picklable
> >> > >> >> (Jinja templates! to name a problematic one...). I've also seen
> >> > >> >> secret-related classes that raise when pickled (thank god!).
> >> > >> >>
> >> > >> >> About callback and other things like that, it's quite a puzzle
> in
> >> > >> >python.
> >> > >> >> One solution would be to point to a python namespace
> >> > >> >> callback="preset.airflow_utils.slack_callback" and assume the
> >> > >> >function has
> >> > >> >> to exist in the remote interpreter. Personally I like the
> >> > >DagFetcher
> >> > >> >idea
> >> > >> >> (it could be great to get a pointer to that mailing list thread
> >> > >> >here),
> >> > >> >> specifically the GitDagFetcher. I don't know how [un]reasonable
> it
> >> > >> >is, but
> >> > >> >> I hate pickles so much that shipping source code around seems
> much
> >> > >> >more
> >> > >> >> reasonable to me. I think out there there's a talk from Mike
> Star
> >> > >> >about
> >> > >> >> Dataswarm at FB and he may mention how their workers may git
> >> > >shallow
> >> > >> >clone
> >> > >> >> the pipeline repo. Or maybe they use that "beautifully ugly"
> hack
> >> > >to
> >> > >> >use
> >> > >> >> a gitfs fuse [file system in user space] on the worker [could
> get
> >> > >> >deeper
> >> > >> >> into that, not sure how reasonable that is either].
> >> > >> >>
> >> > >> >> About fingerprints, a simple `start_date = datetime.now() -
> >> > >> >timedelta(1)`
> >> > >> >> may lead to a never-repeating fingerprint. From memory the spec
> >> > >> >doesn't
> >> > >> >> list out the properties considered to build the hash. It be
> >> > >helpful
> >> > >> >to
> >> > >> >> specify and review that list.
> >> > >> >>
> >> > >> >> Max
> >> > >> >>
> >> > >> >> On Wed, Jul 29, 2020 at 5:20 AM Kaxil Naik <kaxilnaik@gmail.com
> >
> >> > >> >wrote:
> >> > >> >>
> >> > >> >> > Thanks, both Max and Dan for your comments, please check my
> >> > >reply
> >> > >> >below:
> >> > >> >> >
> >> > >> >> >
> >> > >> >> > >  Personally I vote for a DAG version to be pinned and
> >> > >consistent
> >> > >> >for the
> >> > >> >> > > duration of the DAG run. Some of the reasons why:
> >> > >> >> > > - it's easier to reason about, and therefore visualize and
> >> > >> >troubleshoot
> >> > >> >> > > - it prevents some cases where dependencies are never met
> >> > >> >> > > - it prevents the explosion of artifact/metadata (one
> >> > >> >serialization per
> >> > >> >> > > dagrun as opposed to one per scheduler cycle) in the case
> of a
> >> > >> >dynamic
> >> > >> >> > DAG
> >> > >> >> > > whose fingerprint is never the same.
> >> > >> >> >
> >> > >> >> >
> >> > >> >> > In this AIP, we were only looking to fix the current "Viewing
> >> > >> >behaviour"
> >> > >> >> > and
> >> > >> >> > we were intentionally not changing the execution behaviour.
> >> > >> >> > The change you are suggesting means we need to introduce DAG
> >> > >> >Versioning for
> >> > >> >> > the
> >> > >> >> > workers too. This will need more work as can't use the
> >> > >Serialised
> >> > >> >> > Representation
> >> > >> >> > to run the task since users could use custom modules in a
> >> > >different
> >> > >> >part of
> >> > >> >> > code,
> >> > >> >> > example the PythonOperator has python_callable that allows
> >> > >running
> >> > >> >any
> >> > >> >> > arbitrary code.
> >> > >> >> > A similar case is with the *on_*_callbacks* defined on DAG.
> >> > >> >> >
> >> > >> >> > Based on the current scope of the AIP, we still plan to use
> the
> >> > >> >actual DAG
> >> > >> >> > files for the
> >> > >> >> > execution and not use Serialized DAGs for the workers.
> >> > >> >> >
> >> > >> >> > To account for all the custom modules we will have to start
> >> > >looking
> >> > >> >at
> >> > >> >> > pickle (cloudpickle).
> >> > >> >> >
> >> > >> >> > I'm certain that there are lots of
> >> > >> >> > > those DAGs out there, and that it will overwhelm the
> metadata
> >> > >> >database,
> >> > >> >> > and
> >> > >> >> > > confuse the users. For an hourly DAG is would mean 24
> artifact
> >> > >> >per day
> >> > >> >> > > instead of 1000+
> >> > >> >> >
> >> > >> >> >
> >> > >> >> > What kind of dynamic DAGs are we talking about here, I would
> >> > >think
> >> > >> >the DAG
> >> > >> >> > signature won't change
> >> > >> >> > but I might be wrong, can you give an example, please.
> >> > >> >> >
> >> > >> >> > If backwards compatibility in behavior is a concern, I'd
> >> > >recommend
> >> > >> >adding a
> >> > >> >> > > flag to the DAG class and/or config and make sure we're
> doing
> >> > >the
> >> > >> >right
> >> > >> >> > > thing by default. People who want backward compatibility
> would
> >> > >> >have to
> >> > >> >> > > change that default. But again, that's a lot of extra and
> >> > >> >confusing
> >> > >> >> > > complexity that will likely be the source of bugs and user
> >> > >> >confusion.
> >> > >> >> > > Having a clear, easy to reason about execution model is
> super
> >> > >> >important.
> >> > >> >> >
> >> > >> >> > Think about visualizing a DAG that shapeshifted 5 times during
> >> > >its
> >> > >> >> > > execution, how does anyone make sense of that?
> >> > >> >> >
> >> > >> >> >
> >> > >> >> > Wouldn't that be an edge case? How often would someone change
> >> > >the
> >> > >> >DAG
> >> > >> >> > structure in the middle of
> >> > >> >> > a DAG execution. And since if they do change, the Graph View
> >> > >should
> >> > >> >show
> >> > >> >> > all the tasks that were
> >> > >> >> > run, if it just shows based on the latest version, the
> behaviour
> >> > >> >would be
> >> > >> >> > the same as now.
> >> > >> >> >
> >> > >> >> > --------
> >> > >> >> >
> >> > >> >> > Strongly agree with Max's points, also I feel the right way to
> >> > >go
> >> > >> >about
> >> > >> >> > > this is instead of Airflow schedulers/webservers/workers
> >> > >reading
> >> > >> >DAG
> >> > >> >> > Python
> >> > >> >> > > files, they would instead read from serialized
> representations
> >> > >of
> >> > >> >the
> >> > >> >> > DAGs
> >> > >> >> > > (e.g. json representation in the Airflow DB). Instead of DAG
> >> > >> >owners
> >> > >> >> > pushing
> >> > >> >> > > their DAG files to the Airflow components via varying
> >> > >mechanisms
> >> > >> >(e.g.
> >> > >> >> > > git), they would instead call an Airflow CLI to push the
> >> > >> >serialized DAG
> >> > >> >> > > representations to the DB, and for things like dynamic DAGs
> >> > >you
> >> > >> >could
> >> > >> >> > > populate them from a DAG or another service.
> >> > >> >> >
> >> > >> >> >
> >> > >> >> > Airflow Webserver and the Scheduler will definitely read from
> >> > >the
> >> > >> >> > Serialized representation as
> >> > >> >> > they don't need all the code from the DAG files.
> >> > >> >> >
> >> > >> >> > While the workers definitely need access to DAG files as the
> >> > >> >> > tasks/operators would be using
> >> > >> >> > code form custom modules and classes which are required to run
> >> > >the
> >> > >> >tasks.
> >> > >> >> >
> >> > >> >> > If we do want to go down that route we will have to use
> >> > >something
> >> > >> >like
> >> > >> >> > cloudpickle that serializes
> >> > >> >> > entire DAG file and their dependencies. And also ensure that
> >> > >> >someone is not
> >> > >> >> > able to change the pickled
> >> > >> >> > source when sending from executor to the worker as that poses
> a
> >> > >big
> >> > >> >> > security risk.
> >> > >> >> >
> >> > >> >> > - Kaxil
> >> > >> >> >
> >> > >> >> > On Wed, Jul 29, 2020 at 12:43 PM Jacob Ward
> >> > ><jw...@brandwatch.com>
> >> > >> >wrote:
> >> > >> >> >
> >> > >> >> > > I came here to say what Max has said, only less eloquently.
> >> > >> >> > >
> >> > >> >> > > I do have one concern with locking the version for a single
> >> > >run.
> >> > >> >> > Currently
> >> > >> >> > > it is possible for a user to create a dag which
> intentionally
> >> > >> >changes as
> >> > >> >> > a
> >> > >> >> > > dag executes, i.e. dynamically creating a task for the dag
> >> > >during
> >> > >> >a run
> >> > >> >> > by
> >> > >> >> > > modifying external data, but this change would prevent that.
> >> > >I'm
> >> > >> >of the
> >> > >> >> > > opinion that this situation is bad practice anyway so it
> >> > >doesn't
> >> > >> >matter
> >> > >> >> > if
> >> > >> >> > > we make it impossible to do, but others may disagree.
> >> > >> >> > >
> >> > >> >> > > On Tue, 28 Jul 2020 at 17:08, Dan Davydov
> >> > >> ><dd...@twitter.com.invalid>
> >> > >> >> > > wrote:
> >> > >> >> > >
> >> > >> >> > > > Strongly agree with Max's points, also I feel the right
> way
> >> > >to
> >> > >> >go about
> >> > >> >> > > > this is instead of Airflow schedulers/webservers/workers
> >> > >> >reading DAG
> >> > >> >> > > Python
> >> > >> >> > > > files, they would instead read from serialized
> >> > >representations
> >> > >> >of the
> >> > >> >> > > DAGs
> >> > >> >> > > > (e.g. json representation in the Airflow DB). Instead of
> DAG
> >> > >> >owners
> >> > >> >> > > pushing
> >> > >> >> > > > their DAG files to the Airflow components via varying
> >> > >> >mechanisms (e.g.
> >> > >> >> > > > git), they would instead call an Airflow CLI to push the
> >> > >> >serialized DAG
> >> > >> >> > > > representations to the DB, and for things like dynamic
> DAGs
> >> > >you
> >> > >> >could
> >> > >> >> > > > populate them from a DAG or another service.
> >> > >> >> > > >
> >> > >> >> > > > This would also enable other features like stronger
> >> > >> >> > > security/multi-tenancy.
> >> > >> >> > > >
> >> > >> >> > > > On Tue, Jul 28, 2020 at 6:44 PM Maxime Beauchemin <
> >> > >> >> > > > maximebeauchemin@gmail.com> wrote:
> >> > >> >> > > >
> >> > >> >> > > > > > "mixed version"
> >> > >> >> > > > >
> >> > >> >> > > > > Personally I vote for a DAG version to be pinned and
> >> > >> >consistent for
> >> > >> >> > the
> >> > >> >> > > > > duration of the DAG run. Some of the reasons why:
> >> > >> >> > > > > - it's easier to reason about, and therefore visualize
> and
> >> > >> >> > troubleshoot
> >> > >> >> > > > > - it prevents some cases where dependencies are never
> met
> >> > >> >> > > > > - it prevents the explosion of artifact/metadata (one
> >> > >> >serialization
> >> > >> >> > per
> >> > >> >> > > > > dagrun as opposed to one per scheduler cycle) in the
> case
> >> > >of
> >> > >> >a
> >> > >> >> > dynamic
> >> > >> >> > > > DAG
> >> > >> >> > > > > whose fingerprint is never the same. I'm certain that
> >> > >there
> >> > >> >are lots
> >> > >> >> > of
> >> > >> >> > > > > those DAGs out there, and that it will overwhelm the
> >> > >metadata
> >> > >> >> > database,
> >> > >> >> > > > and
> >> > >> >> > > > > confuse the users. For an hourly DAG is would mean 24
> >> > >> >artifact per
> >> > >> >> > day
> >> > >> >> > > > > instead of 1000+
> >> > >> >> > > > >
> >> > >> >> > > > > If backwards compatibility in behavior is a concern, I'd
> >> > >> >recommend
> >> > >> >> > > > adding a
> >> > >> >> > > > > flag to the DAG class and/or config and make sure we're
> >> > >doing
> >> > >> >the
> >> > >> >> > right
> >> > >> >> > > > > thing by default. People who want backward compatibility
> >> > >> >would have
> >> > >> >> > to
> >> > >> >> > > > > change that default. But again, that's a lot of extra
> and
> >> > >> >confusing
> >> > >> >> > > > > complexity that will likely be the source of bugs and
> user
> >> > >> >confusion.
> >> > >> >> > > > > Having a clear, easy to reason about execution model is
> >> > >super
> >> > >> >> > > important.
> >> > >> >> > > > >
> >> > >> >> > > > > Think about visualizing a DAG that shapeshifted 5 times
> >> > >> >during its
> >> > >> >> > > > > execution, how does anyone make sense of that?
> >> > >> >> > > > >
> >> > >> >> > > > > Max
> >> > >> >> > > > >
> >> > >> >> > > > > On Tue, Jul 28, 2020 at 3:14 AM Kaxil Naik
> >> > >> ><ka...@gmail.com>
> >> > >> >> > > wrote:
> >> > >> >> > > > >
> >> > >> >> > > > > > Thanks Max for your comments.
> >> > >> >> > > > > >
> >> > >> >> > > > > >
> >> > >> >> > > > > > *DAG Fingerprinting: *this can be tricky, especially
> in
> >> > >> >regards to
> >> > >> >> > > > > dynamic
> >> > >> >> > > > > > > DAGs, where in some cases each parsing of the DAG
> can
> >> > >> >result in a
> >> > >> >> > > > > > different
> >> > >> >> > > > > > > fingerprint. I think DAG and tasks attributes are
> left
> >> > >> >out from
> >> > >> >> > the
> >> > >> >> > > > > > > proposal that should be considered as part of the
> >> > >> >fingerprint,
> >> > >> >> > like
> >> > >> >> > > > > > trigger
> >> > >> >> > > > > > > rules or task start/end datetime. We should do a
> full
> >> > >> >pass of all
> >> > >> >> > > DAG
> >> > >> >> > > > > > > arguments and make sure we're not forgetting
> anything
> >> > >> >that can
> >> > >> >> > > change
> >> > >> >> > > > > > > scheduling logic. Also, let's be careful that
> >> > >something
> >> > >> >as simple
> >> > >> >> > > as
> >> > >> >> > > > a
> >> > >> >> > > > > > > dynamic start or end date on a task could lead to a
> >> > >> >different
> >> > >> >> > > version
> >> > >> >> > > > > > each
> >> > >> >> > > > > > > time you parse.
> >> > >> >> > > > > >
> >> > >> >> > > > > >
> >> > >> >> > > > > >
> >> > >> >> > > > > > The short version of Dag Fingerprinting would be
> >> > >> >> > > > > > just a hash of the Serialized DAG.
> >> > >> >> > > > > >
> >> > >> >> > > > > > *Example DAG*: https://imgur.com/TVuoN3p
> >> > >> >> > > > > > *Example Serialized DAG*: https://imgur.com/LmA2Bpr
> >> > >> >> > > > > >
> >> > >> >> > > > > > It contains all the task & DAG parameters. When they
> >> > >> >change,
> >> > >> >> > > Scheduler
> >> > >> >> > > > > > writes
> >> > >> >> > > > > > a new version of Serialized DAGs to the DB. The
> >> > >Webserver
> >> > >> >then
> >> > >> >> > reads
> >> > >> >> > > > the
> >> > >> >> > > > > > DAGs from the DB.
> >> > >> >> > > > > >
> >> > >> >> > > > > > I'd recommend limiting serialization/storage of one
> >> > >version
> >> > >> >> > > > > > > per DAG Run, as opposed to potentially everytime the
> >> > >DAG
> >> > >> >is
> >> > >> >> > parsed
> >> > >> >> > > -
> >> > >> >> > > > > once
> >> > >> >> > > > > > > the version for a DAG run is pinned, fingerprinting
> is
> >> > >> >not
> >> > >> >> > > > re-evaluated
> >> > >> >> > > > > > > until the next DAG run is ready to get created.
> >> > >> >> > > > > >
> >> > >> >> > > > > >
> >> > >> >> > > > > > This is to handle Scenario 3 where a DAG structure is
> >> > >> >changed
> >> > >> >> > > mid-way.
> >> > >> >> > > > > > Since we don't intend to
> >> > >> >> > > > > > change the execution behaviour, if we limit Storage
> of 1
> >> > >> >version
> >> > >> >> > per
> >> > >> >> > > > DAG,
> >> > >> >> > > > > > it won't actually show what
> >> > >> >> > > > > > was run.
> >> > >> >> > > > > >
> >> > >> >> > > > > > Example Dag v1: Task A -> Task B -> Task C
> >> > >> >> > > > > > The worker has completed the execution of Task B and
> is
> >> > >> >just about
> >> > >> >> > to
> >> > >> >> > > > > > complete the execution of Task B.
> >> > >> >> > > > > >
> >> > >> >> > > > > > The 2nd version of DAG is deployed: Task A -> Task D
> >> > >> >> > > > > > Now Scheduler queued Task D and it will run to
> >> > >completion.
> >> > >> >(Task C
> >> > >> >> > > > won't
> >> > >> >> > > > > > run)
> >> > >> >> > > > > >
> >> > >> >> > > > > > In this case, "the actual representation of the DAG"
> >> > >that
> >> > >> >run is
> >> > >> >> > > > neither
> >> > >> >> > > > > v1
> >> > >> >> > > > > > nor v2 but a "mixed version"
> >> > >> >> > > > > >  (Task A -> Task B -> Task D). The plan is that the
> >> > >> >Scheduler will
> >> > >> >> > > > create
> >> > >> >> > > > > > this "mixed version" based on what ran
> >> > >> >> > > > > > and the Graph View would show this "mixed version".
> >> > >> >> > > > > >
> >> > >> >> > > > > > There would also be a toggle button on the Graph View
> to
> >> > >> >select v1
> >> > >> >> > or
> >> > >> >> > > > v2
> >> > >> >> > > > > > where the tasks will be highlighted to show
> >> > >> >> > > > > > that a particular task was in v1 or v2 as shown in
> >> > >> >> > > > > >
> >> > >> >> > > > > >
> >> > >> >> > > > >
> >> > >> >> > > >
> >> > >> >> > >
> >> > >> >> >
> >> > >>
> >> > >>
> >> >
> https://cwiki.apache.org/confluence/download/attachments/158868919/Picture%201.png?version=2&modificationDate=1595612863000&api=v2
> >> > >> >> > > > > >
> >> > >> >> > > > > >
> >> > >> >> > > > > >
> >> > >> >> > > > > > *Visualizing change in the tree view:* I think this is
> >> > >very
> >> > >> >complex
> >> > >> >> > > and
> >> > >> >> > > > > > > many things can make this view impossible to render
> >> > >(task
> >> > >> >> > > dependency
> >> > >> >> > > > > > > reversal, cycles across versions, ...). Maybe a
> better
> >> > >> >visual
> >> > >> >> > > > approach
> >> > >> >> > > > > > > would be to render independent, individual tree
> views
> >> > >for
> >> > >> >each
> >> > >> >> > DAG
> >> > >> >> > > > > > version
> >> > >> >> > > > > > > (side by side), and doing best effort aligning the
> >> > >tasks
> >> > >> >across
> >> > >> >> > > > blocks
> >> > >> >> > > > > > and
> >> > >> >> > > > > > > "linking" tasks with lines across blocks when
> >> > >necessary.
> >> > >> >> > > > > >
> >> > >> >> > > > > >
> >> > >> >> > > > > > Agreed, the plan is to do the best effort aligning.
> >> > >> >> > > > > > At this point in time, task additions to the end of
> the
> >> > >DAG
> >> > >> >are
> >> > >> >> > > > expected
> >> > >> >> > > > > to
> >> > >> >> > > > > > be compatible,
> >> > >> >> > > > > > but changes to task structure within the DAG may cause
> >> > >the
> >> > >> >tree
> >> > >> >> > view
> >> > >> >> > > > not
> >> > >> >> > > > > to
> >> > >> >> > > > > > incorporate “old” and “new” in the same view, hence
> that
> >> > >> >won't be
> >> > >> >> > > > shown.
> >> > >> >> > > > > >
> >> > >> >> > > > > > Regards,
> >> > >> >> > > > > > Kaxil
> >> > >> >> > > > > >
> >> > >> >> > > > > > On Mon, Jul 27, 2020 at 6:02 PM Maxime Beauchemin <
> >> > >> >> > > > > > maximebeauchemin@gmail.com> wrote:
> >> > >> >> > > > > >
> >> > >> >> > > > > > > Some notes and ideas:
> >> > >> >> > > > > > >
> >> > >> >> > > > > > > *DAG Fingerprinting: *this can be tricky, especially
> >> > >in
> >> > >> >regards
> >> > >> >> > to
> >> > >> >> > > > > > dynamic
> >> > >> >> > > > > > > DAGs, where in some cases each parsing of the DAG
> can
> >> > >> >result in a
> >> > >> >> > > > > > different
> >> > >> >> > > > > > > fingerprint. I think DAG and tasks attributes are
> left
> >> > >> >out from
> >> > >> >> > the
> >> > >> >> > > > > > > proposal that should be considered as part of the
> >> > >> >fingerprint,
> >> > >> >> > like
> >> > >> >> > > > > > trigger
> >> > >> >> > > > > > > rules or task start/end datetime. We should do a
> full
> >> > >> >pass of all
> >> > >> >> > > DAG
> >> > >> >> > > > > > > arguments and make sure we're not forgetting
> anything
> >> > >> >that can
> >> > >> >> > > change
> >> > >> >> > > > > > > scheduling logic. Also, let's be careful that
> >> > >something
> >> > >> >as simple
> >> > >> >> > > as
> >> > >> >> > > > a
> >> > >> >> > > > > > > dynamic start or end date on a task could lead to a
> >> > >> >different
> >> > >> >> > > version
> >> > >> >> > > > > > each
> >> > >> >> > > > > > > time you parse. I'd recommend limiting
> >> > >> >serialization/storage of
> >> > >> >> > one
> >> > >> >> > > > > > version
> >> > >> >> > > > > > > per DAG Run, as opposed to potentially everytime the
> >> > >DAG
> >> > >> >is
> >> > >> >> > parsed
> >> > >> >> > > -
> >> > >> >> > > > > once
> >> > >> >> > > > > > > the version for a DAG run is pinned, fingerprinting
> is
> >> > >> >not
> >> > >> >> > > > re-evaluated
> >> > >> >> > > > > > > until the next DAG run is ready to get created.
> >> > >> >> > > > > > >
> >> > >> >> > > > > > > *Visualizing change in the tree view:* I think this
> is
> >> > >> >very
> >> > >> >> > complex
> >> > >> >> > > > and
> >> > >> >> > > > > > > many things can make this view impossible to render
> >> > >(task
> >> > >> >> > > dependency
> >> > >> >> > > > > > > reversal, cycles across versions, ...). Maybe a
> better
> >> > >> >visual
> >> > >> >> > > > approach
> >> > >> >> > > > > > > would be to render independent, individual tree
> views
> >> > >for
> >> > >> >each
> >> > >> >> > DAG
> >> > >> >> > > > > > version
> >> > >> >> > > > > > > (side by side), and doing best effort aligning the
> >> > >tasks
> >> > >> >across
> >> > >> >> > > > blocks
> >> > >> >> > > > > > and
> >> > >> >> > > > > > > "linking" tasks with lines across blocks when
> >> > >necessary.
> >> > >> >> > > > > > >
> >> > >> >> > > > > > > On Fri, Jul 24, 2020 at 12:46 PM Vikram Koka <
> >> > >> >> > vikram@astronomer.io
> >> > >> >> > > >
> >> > >> >> > > > > > wrote:
> >> > >> >> > > > > > >
> >> > >> >> > > > > > > > Team,
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > > We just created 'AIP-36 DAG Versioning' on
> >> > >Confluence
> >> > >> >and would
> >> > >> >> > > > very
> >> > >> >> > > > > > much
> >> > >> >> > > > > > > > appreciate feedback and suggestions from the
> >> > >community.
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > >
> >> > >> >> > > > > >
> >> > >> >> > > > >
> >> > >> >> > > >
> >> > >> >> > >
> >> > >> >> >
> >> > >>
> >> > >>
> >> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-36+DAG+Versioning
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > > The DAG Versioning concept has been discussed on
> >> > >> >multiple
> >> > >> >> > > occasions
> >> > >> >> > > > > in
> >> > >> >> > > > > > > the
> >> > >> >> > > > > > > > past and has been a topic highlighted as part of
> >> > >> >Airflow 2.0 as
> >> > >> >> > > > well.
> >> > >> >> > > > > > We
> >> > >> >> > > > > > > at
> >> > >> >> > > > > > > > Astronomer have heard data engineers at several
> >> > >> >enterprises ask
> >> > >> >> > > > about
> >> > >> >> > > > > > > this
> >> > >> >> > > > > > > > feature as well, for easier debugging when changes
> >> > >are
> >> > >> >made to
> >> > >> >> > > DAGs
> >> > >> >> > > > > as
> >> > >> >> > > > > > a
> >> > >> >> > > > > > > > result of evolving business needs.
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > > As described in the AIP, we have a proposal
> focused
> >> > >on
> >> > >> >ensuring
> >> > >> >> > > > that
> >> > >> >> > > > > > the
> >> > >> >> > > > > > > > visibility behaviour of Airflow is correct,
> without
> >> > >> >changing
> >> > >> >> > the
> >> > >> >> > > > > > > execution
> >> > >> >> > > > > > > > behaviour. We considered changing the execution
> >> > >> >behaviour as
> >> > >> >> > > well,
> >> > >> >> > > > > but
> >> > >> >> > > > > > > > decided that the risks in changing execution
> >> > >behavior
> >> > >> >were too
> >> > >> >> > > high
> >> > >> >> > > > > as
> >> > >> >> > > > > > > > compared to the benefits and therefore decided to
> >> > >limit
> >> > >> >the
> >> > >> >> > scope
> >> > >> >> > > > to
> >> > >> >> > > > > > only
> >> > >> >> > > > > > > > making sure that the visibility was correct.
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > > We would like to attempt this based on our
> >> > >experience
> >> > >> >running
> >> > >> >> > > > Airflow
> >> > >> >> > > > > > as
> >> > >> >> > > > > > > a
> >> > >> >> > > > > > > > service. We believe that this benefits Airflow as
> a
> >> > >> >project and
> >> > >> >> > > the
> >> > >> >> > > > > > > > development experience of data engineers using
> >> > >Airflow
> >> > >> >across
> >> > >> >> > the
> >> > >> >> > > > > > world.
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > >  Any feedback, suggestions, and comments would be
> >> > >> >greatly
> >> > >> >> > > > > appreciated.
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > > Best Regards,
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > > > Kaxil Naik, Ryan Hamilton, Ash Berlin-Taylor, and
> >> > >> >Vikram Koka
> >> > >> >> > > > > > > >
> >> > >> >> > > > > > >
> >> > >> >> > > > > >
> >> > >> >> > > > >
> >> > >> >> > > >
> >> > >> >> > >
> >> > >> >> > >
> >> > >> >> > > --
> >> > >> >> > >
> >> > >> >> > > Jacob Ward    |    Graduate Data Infrastructure Engineer
> >> > >> >> > >
> >> > >> >> > > jward@brandwatch.com
> >> > >> >> > >
> >> > >> >> > >
> >> > >> >> > > NEW YORK   | BOSTON   | BRIGHTON   | LONDON   | BERLIN |
> >> > >> >STUTTGART |
> >> > >> >> > > PARIS   | SINGAPORE | SYDNEY
> >> > >> >> > >
> >> > >> >> >
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> >--
> >> > >> >
> >> > >> >Jarek Potiuk
> >> > >> >Polidea | Principal Software Engineer
> >> > >> >
> >> > >> >M: +48 660 796 129
> >> >
> >>
> >>
> >> --
> >>
> >> Jarek Potiuk
> >> Polidea <https://www.polidea.com/> | Principal Software Engineer
> >>
> >> M: +48 660 796 129 <+48660796129>
> >> [image: Polidea] <https://www.polidea.com/>
>
>

Re: [PROPOSAL][AIP-36 DAG Versioning]

Posted by Jarek Potiuk <ja...@potiuk.com>.
Woohoo! Looking forward to it!

On Tue, Feb 15, 2022 at 1:11 PM Kaxil Naik <ka...@gmail.com> wrote:
>
> Hey folks,
>
> Just reviving this old thread to provide an update that we (Astronomer) will be resurrecting AIP-36 DAG Versioning with a different scope in the coming days that will be more consistent with what has been discussed in this thread.
>
> Regards,
> Kaxil
>
> On Thu, Aug 13, 2020 at 9:32 PM Jarek Potiuk <Ja...@polidea.com> wrote:
>>
>> I fully agree with the "user" not having to know any of the "wheel'
>> details. Similarly as they do not have to know python interpreter or the
>> underlying libc library details. This  all should be hidden from the users.
>>
>> I think the wheels API that we might have there, does not have to be
>> user-facing. We could - rather easily - make a client that points to a DAG
>> file and builds appropriate wheel package under-the-hood and submits it. I
>> reallly doubt any of the users will directly use the API to submit DAGs -
>> they will use some clients built on top of it.
>>
>> I think we should separate the user side form the implementation -
>> similarly as we do not expect the users to know any details on how "DAG
>> Fetcher" should work - in any case with the DAG fetcher, we need to define
>> how DAG fetcher will make sure about "atomicity" anyway - how to make sure
>> that you get a "consistent" version of all the dependent python files when
>> you fetch them? This is the part of DAG fetcher that i do not like because
>> it assumes that "someone else" maintains the consistency and provides the
>> "consistent view" somewhere on the "DAG Server" side (whatever the server
>> side is).
>>
>> There were many ideas about some kind of manifest describing the files etc,
>> but I think all of that depends on some kind of ability of providing a
>> "snapshot" of files that will be consistent set to execute. With 'DAG
>> Fetcher" this is somthing that "DAG Fetching server" has to provide. It's
>> super easy if that "server" is GIT - we already use it for GIT sync. But
>> it's rather difficult to provide a good abstraction for it for "generic"
>> DAG fetcher.
>>
>> IMHO this is far easier to provide such consistent set at a "submission
>> time". In pretty-much all cases, the user submitting the job already has
>> consistent set of python files that the DAG uses. This is pretty much
>> given. I think the job of the "submission" mechanism is to make a
>> "snapshot" out of that consistent set and submit this snapshot, rather than
>> individual files. Git provides it out of the box, but if we want to be
>> generic - I see no other way than to build such "snapshot" locally. And
>> Wheels seems like a very good candidate - if only it's an implementation
>> detail and will be hidden from the users.
>>
>> J.
>>
>>
>>
>>
>> On Tue, Aug 11, 2020 at 8:33 PM Ash Berlin-Taylor <as...@apache.org> wrote:
>>
>> > Anything to doing with the process of building wheels should be a "power
>> > user" only feature, and should not be required for many users - many many
>> > users of airflow are not primarily Python developers, but data scientists,
>> > and needing them to understand anything about the python build toolchain is
>> > too much of a learning curve for the benefit.
>> >
>> > After all it is very rare that people hit the multiple concurrent versions
>> > of a dag.
>> >
>> > -ash
>> >
>> > On 10 August 2020 17:37:32 BST, Tomasz Urbaszek <tu...@apache.org>
>> > wrote:
>> > >I like the idea of wheels as this is probably the "most pythonic"
>> > >solution. And "DAG version" is not only defined by DAG code but also
>> > >by all dependencies the DAG uses (custom functions, libraries etc) and
>> > >it seems that wheels can address that.
>> > >
>> > >However, I second Ash - keeping wheels in db doesn't sound good. In my
>> > >opinion, DAG fetcher is the right solution and the idea surfaces every
>> > >time we talk about serialization. This abstraction has a lot of pros
>> > >as it allows a lot of customization (wheels, local fs, remote fs,
>> > >wheels etc).
>> > >
>> > >Apart from that, if we decided to use wheels we should provide a CLI
>> > >command to ease the process of building them. Also, I'm wondering
>> > >about developers' workflow. Moving between code of different DAG
>> > >version sounds easy if you use git but... what if someone doesn't use
>> > >it?
>> > >
>> > >Tomek
>> > >
>> > >
>> > >On Sat, Aug 8, 2020 at 9:49 AM Ash Berlin-Taylor <as...@apache.org>
>> > >wrote:
>> > >>
>> > >> Quick comment (as I'm still mostly on paternity leave):
>> > >>
>> > >> Storing wheels in the db sounds like a bad Idea to me, especially if
>> > >we need to store deps in there too (and if we don't store deps, then
>> > >they are incomplete) - they could get very large, and I've stored blobs
>> > >of ~10mb in postgres before: I don't recommend it. It "works" but
>> > >operating it is tricky.
>> > >>
>> > >>
>> > >>
>> > >> > the API could simply accept "Wheel file + the Dag id"
>> > >>
>> > >> This sounds like a huge security risk.
>> > >>
>> > >>
>> > >> My main concern with this idea is that it seems a lot of complexity
>> > >we are putting on users. Doubly so if they are already using docker
>> > >where there already exists an Ideal packaging and distribution that
>> > >could contain dag + needed code.
>> > >>
>> > >> (Sorry for the brevity)
>> > >>
>> > >> -ash
>> > >>
>> > >>
>> > >> On 2 August 2020 08:47:39 BST, Jarek Potiuk
>> > ><Ja...@polidea.com> wrote:
>> > >> >Few points from my sid (and proposal!):
>> > >> >
>> > >> >1) Agree with Max -  with a rather strong NO for pickles (however,
>> > >> >indeed cloudpickle solves some of the problems). Pickles came up in
>> > >> >our discussion in Polidea recently and the overall message was "no".
>> > >I
>> > >> >agree with Max here - if we can ship python code, turning that into
>> > >> >pickle for transit makes little sense to me and brings a plethora of
>> > >> >problems.
>> > >> >
>> > >> >2) I think indeed the versioning solution should treat the "DagRun"
>> > >> >structure atomically. While I see why we would like to go with the
>> > >> >UI/Scheduler only first rather than implementing them in the
>> > >workers,
>> > >> >adding the "mixed version" is where it breaks down IMHO. Reasoning
>> > >> >about such "mixed version" dag is next to impossible. The current
>> > >> >behavior is not well defined and non-deterministic (depends on
>> > >> >scheduler delays, syncing, type of deployment, restarts of the works
>> > >> >etc.) we are moving it up to UI (thus users) rather than solving the
>> > >> >problem. So I am not a big fan of this and would rather solve it
>> > >> >"well" with atomicity.
>> > >> >
>> > >> >3) I see the point of Dan as well - we had many discussions and many
>> > >> >times the idea about "submitting" the DAG for execution via the API
>> > >> >came up - and it makes sense IMHO.
>> > >> >
>> > >> >Proposal: Implement full versioning with code shipping via DB wheels
>> > >> >BLOB (akin to serialized DAGs).
>> > >> >
>> > >> >I understand that the big issue is how to actually "ship" the code
>> > >to
>> > >> >the worker. And - maybe a wild idea - we can kill several birds with
>> > >> >the same stone.
>> > >> >
>> > >> >There were plenty of discussions on how we could do that but one was
>> > >> >never truly explored - using wheel packages.
>> > >> >
>> > >> >For those who do not know them, there is the PEP:
>> > >> >https://www.python.org/dev/peps/pep-0427/
>> > >> >
>> > >> >Wheels allow to "package" python code in a standard way. They are
>> > >> >portable ("purelib" + contain .py rather than .pyc code), they have
>> > >> >metadata, versioning information, they can be signed for security,
>> > >> >They can contain other packages or python code, Why don't we let
>> > >> >scheduler to pack the fingerprinted version of the DAG in a .whl and
>> > >> >store it as a blob in a DB next to the serialized form?
>> > >> >
>> > >> >There were concerns about the size of the code to keep in the DB -
>> > >but
>> > >> >we already use the DB for serialized DAGs and it works fine (I
>> > >believe
>> > >> >we only need to add compressing of the JSon serialized form - as
>> > >we've
>> > >> >learned from AirBnb during their talk at the Airflow Summit - wheels
>> > >> >are already compressed). Also - each task will only need the
>> > >> >particular "version" of one DAG so even if we keep many of them in
>> > >the
>> > >> >DB, the old version will pretty soon go "cold" and will never be
>> > >> >retrieved (and most DBs will handle it well with caching/indexes).
>> > >> >
>> > >> >And if we want to add "callables" from other files - there is
>> > >nothing
>> > >> >to stop the person who defines dag to add list of files that should
>> > >be
>> > >> >packaged together with the main DAG file (additional_python_files =
>> > >> >["common/my_fantastic_library.py"] in DAG constructor). Or we could
>> > >> >auto-add all files after the DAG gets imported (i.e. package
>> > >> >automatically all files that are imported for that particular DAG
>> > >from
>> > >> >the "dags" folder"). That should be rather easy.
>> > >> >
>> > >> >This way we could ship the code to workers for the exact version
>> > >that
>> > >> >the DagRun uses. And they can be cached and unpacked/installed to a
>> > >> >virtualenv for the execution of that single task. That should be
>> > >super
>> > >> >quick. Such virtualenv can be wiped out after execution.
>> > >> >
>> > >> >Then we got what Max wants (atomicity of DagRuns) and what Dan wants
>> > >> >(the API could simply accept "Wheel file + the Dag id". We have the
>> > >> >isolation between tasks running on the same worker (based on
>> > >> >virtualenv) so that each process in the same worker can run a
>> > >> >different version of the same Dag. We have much less confusion for
>> > >the
>> > >> >UI.
>> > >> >
>> > >> >Extra bonus 1: we can expand it to package different dependencies in
>> > >> >the wheels as well - so that if an operator requires a different
>> > >> >(newer) version of a python library, it could be packaged together
>> > >> >with the DAG in the same .whl file. This is also a highly requested
>> > >> >feature.
>> > >> >Extra bonus 2: workers will stop depending on the DAG file mount (!)
>> > >> >which was our long term goal and indeed as Dan mentioned - a great
>> > >> >step towards multi-tenancy.
>> > >> >
>> > >> >J.
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >On Fri, Jul 31, 2020 at 6:41 AM Maxime Beauchemin
>> > >> ><ma...@gmail.com> wrote:
>> > >> >>
>> > >> >> Having tried it early on, I'd advocate pretty strongly against
>> > >> >pickles and
>> > >> >> would rather not get too deep into the why here. Short story is
>> > >they
>> > >> >can
>> > >> >> pull the entire memory space or much more than you want, and it's
>> > >> >> impossible to reason about where they end. For that reason and
>> > >other
>> > >> >> reasons, they're a security issue. Oh and some objects are not
>> > >> >picklable
>> > >> >> (Jinja templates! to name a problematic one...). I've also seen
>> > >> >> secret-related classes that raise when pickled (thank god!).
>> > >> >>
>> > >> >> About callback and other things like that, it's quite a puzzle in
>> > >> >python.
>> > >> >> One solution would be to point to a python namespace
>> > >> >> callback="preset.airflow_utils.slack_callback" and assume the
>> > >> >function has
>> > >> >> to exist in the remote interpreter. Personally I like the
>> > >DagFetcher
>> > >> >idea
>> > >> >> (it could be great to get a pointer to that mailing list thread
>> > >> >here),
>> > >> >> specifically the GitDagFetcher. I don't know how [un]reasonable it
>> > >> >is, but
>> > >> >> I hate pickles so much that shipping source code around seems much
>> > >> >more
>> > >> >> reasonable to me. I think out there there's a talk from Mike Star
>> > >> >about
>> > >> >> Dataswarm at FB and he may mention how their workers may git
>> > >shallow
>> > >> >clone
>> > >> >> the pipeline repo. Or maybe they use that "beautifully ugly" hack
>> > >to
>> > >> >use
>> > >> >> a gitfs fuse [file system in user space] on the worker [could get
>> > >> >deeper
>> > >> >> into that, not sure how reasonable that is either].
>> > >> >>
>> > >> >> About fingerprints, a simple `start_date = datetime.now() -
>> > >> >timedelta(1)`
>> > >> >> may lead to a never-repeating fingerprint. From memory the spec
>> > >> >doesn't
>> > >> >> list out the properties considered to build the hash. It be
>> > >helpful
>> > >> >to
>> > >> >> specify and review that list.
>> > >> >>
>> > >> >> Max
>> > >> >>
>> > >> >> On Wed, Jul 29, 2020 at 5:20 AM Kaxil Naik <ka...@gmail.com>
>> > >> >wrote:
>> > >> >>
>> > >> >> > Thanks, both Max and Dan for your comments, please check my
>> > >reply
>> > >> >below:
>> > >> >> >
>> > >> >> >
>> > >> >> > >  Personally I vote for a DAG version to be pinned and
>> > >consistent
>> > >> >for the
>> > >> >> > > duration of the DAG run. Some of the reasons why:
>> > >> >> > > - it's easier to reason about, and therefore visualize and
>> > >> >troubleshoot
>> > >> >> > > - it prevents some cases where dependencies are never met
>> > >> >> > > - it prevents the explosion of artifact/metadata (one
>> > >> >serialization per
>> > >> >> > > dagrun as opposed to one per scheduler cycle) in the case of a
>> > >> >dynamic
>> > >> >> > DAG
>> > >> >> > > whose fingerprint is never the same.
>> > >> >> >
>> > >> >> >
>> > >> >> > In this AIP, we were only looking to fix the current "Viewing
>> > >> >behaviour"
>> > >> >> > and
>> > >> >> > we were intentionally not changing the execution behaviour.
>> > >> >> > The change you are suggesting means we need to introduce DAG
>> > >> >Versioning for
>> > >> >> > the
>> > >> >> > workers too. This will need more work as can't use the
>> > >Serialised
>> > >> >> > Representation
>> > >> >> > to run the task since users could use custom modules in a
>> > >different
>> > >> >part of
>> > >> >> > code,
>> > >> >> > example the PythonOperator has python_callable that allows
>> > >running
>> > >> >any
>> > >> >> > arbitrary code.
>> > >> >> > A similar case is with the *on_*_callbacks* defined on DAG.
>> > >> >> >
>> > >> >> > Based on the current scope of the AIP, we still plan to use the
>> > >> >actual DAG
>> > >> >> > files for the
>> > >> >> > execution and not use Serialized DAGs for the workers.
>> > >> >> >
>> > >> >> > To account for all the custom modules we will have to start
>> > >looking
>> > >> >at
>> > >> >> > pickle (cloudpickle).
>> > >> >> >
>> > >> >> > I'm certain that there are lots of
>> > >> >> > > those DAGs out there, and that it will overwhelm the metadata
>> > >> >database,
>> > >> >> > and
>> > >> >> > > confuse the users. For an hourly DAG is would mean 24 artifact
>> > >> >per day
>> > >> >> > > instead of 1000+
>> > >> >> >
>> > >> >> >
>> > >> >> > What kind of dynamic DAGs are we talking about here, I would
>> > >think
>> > >> >the DAG
>> > >> >> > signature won't change
>> > >> >> > but I might be wrong, can you give an example, please.
>> > >> >> >
>> > >> >> > If backwards compatibility in behavior is a concern, I'd
>> > >recommend
>> > >> >adding a
>> > >> >> > > flag to the DAG class and/or config and make sure we're doing
>> > >the
>> > >> >right
>> > >> >> > > thing by default. People who want backward compatibility would
>> > >> >have to
>> > >> >> > > change that default. But again, that's a lot of extra and
>> > >> >confusing
>> > >> >> > > complexity that will likely be the source of bugs and user
>> > >> >confusion.
>> > >> >> > > Having a clear, easy to reason about execution model is super
>> > >> >important.
>> > >> >> >
>> > >> >> > Think about visualizing a DAG that shapeshifted 5 times during
>> > >its
>> > >> >> > > execution, how does anyone make sense of that?
>> > >> >> >
>> > >> >> >
>> > >> >> > Wouldn't that be an edge case? How often would someone change
>> > >the
>> > >> >DAG
>> > >> >> > structure in the middle of
>> > >> >> > a DAG execution. And since if they do change, the Graph View
>> > >should
>> > >> >show
>> > >> >> > all the tasks that were
>> > >> >> > run, if it just shows based on the latest version, the behaviour
>> > >> >would be
>> > >> >> > the same as now.
>> > >> >> >
>> > >> >> > --------
>> > >> >> >
>> > >> >> > Strongly agree with Max's points, also I feel the right way to
>> > >go
>> > >> >about
>> > >> >> > > this is instead of Airflow schedulers/webservers/workers
>> > >reading
>> > >> >DAG
>> > >> >> > Python
>> > >> >> > > files, they would instead read from serialized representations
>> > >of
>> > >> >the
>> > >> >> > DAGs
>> > >> >> > > (e.g. json representation in the Airflow DB). Instead of DAG
>> > >> >owners
>> > >> >> > pushing
>> > >> >> > > their DAG files to the Airflow components via varying
>> > >mechanisms
>> > >> >(e.g.
>> > >> >> > > git), they would instead call an Airflow CLI to push the
>> > >> >serialized DAG
>> > >> >> > > representations to the DB, and for things like dynamic DAGs
>> > >you
>> > >> >could
>> > >> >> > > populate them from a DAG or another service.
>> > >> >> >
>> > >> >> >
>> > >> >> > Airflow Webserver and the Scheduler will definitely read from
>> > >the
>> > >> >> > Serialized representation as
>> > >> >> > they don't need all the code from the DAG files.
>> > >> >> >
>> > >> >> > While the workers definitely need access to DAG files as the
>> > >> >> > tasks/operators would be using
>> > >> >> > code form custom modules and classes which are required to run
>> > >the
>> > >> >tasks.
>> > >> >> >
>> > >> >> > If we do want to go down that route we will have to use
>> > >something
>> > >> >like
>> > >> >> > cloudpickle that serializes
>> > >> >> > entire DAG file and their dependencies. And also ensure that
>> > >> >someone is not
>> > >> >> > able to change the pickled
>> > >> >> > source when sending from executor to the worker as that poses a
>> > >big
>> > >> >> > security risk.
>> > >> >> >
>> > >> >> > - Kaxil
>> > >> >> >
>> > >> >> > On Wed, Jul 29, 2020 at 12:43 PM Jacob Ward
>> > ><jw...@brandwatch.com>
>> > >> >wrote:
>> > >> >> >
>> > >> >> > > I came here to say what Max has said, only less eloquently.
>> > >> >> > >
>> > >> >> > > I do have one concern with locking the version for a single
>> > >run.
>> > >> >> > Currently
>> > >> >> > > it is possible for a user to create a dag which intentionally
>> > >> >changes as
>> > >> >> > a
>> > >> >> > > dag executes, i.e. dynamically creating a task for the dag
>> > >during
>> > >> >a run
>> > >> >> > by
>> > >> >> > > modifying external data, but this change would prevent that.
>> > >I'm
>> > >> >of the
>> > >> >> > > opinion that this situation is bad practice anyway so it
>> > >doesn't
>> > >> >matter
>> > >> >> > if
>> > >> >> > > we make it impossible to do, but others may disagree.
>> > >> >> > >
>> > >> >> > > On Tue, 28 Jul 2020 at 17:08, Dan Davydov
>> > >> ><dd...@twitter.com.invalid>
>> > >> >> > > wrote:
>> > >> >> > >
>> > >> >> > > > Strongly agree with Max's points, also I feel the right way
>> > >to
>> > >> >go about
>> > >> >> > > > this is instead of Airflow schedulers/webservers/workers
>> > >> >reading DAG
>> > >> >> > > Python
>> > >> >> > > > files, they would instead read from serialized
>> > >representations
>> > >> >of the
>> > >> >> > > DAGs
>> > >> >> > > > (e.g. json representation in the Airflow DB). Instead of DAG
>> > >> >owners
>> > >> >> > > pushing
>> > >> >> > > > their DAG files to the Airflow components via varying
>> > >> >mechanisms (e.g.
>> > >> >> > > > git), they would instead call an Airflow CLI to push the
>> > >> >serialized DAG
>> > >> >> > > > representations to the DB, and for things like dynamic DAGs
>> > >you
>> > >> >could
>> > >> >> > > > populate them from a DAG or another service.
>> > >> >> > > >
>> > >> >> > > > This would also enable other features like stronger
>> > >> >> > > security/multi-tenancy.
>> > >> >> > > >
>> > >> >> > > > On Tue, Jul 28, 2020 at 6:44 PM Maxime Beauchemin <
>> > >> >> > > > maximebeauchemin@gmail.com> wrote:
>> > >> >> > > >
>> > >> >> > > > > > "mixed version"
>> > >> >> > > > >
>> > >> >> > > > > Personally I vote for a DAG version to be pinned and
>> > >> >consistent for
>> > >> >> > the
>> > >> >> > > > > duration of the DAG run. Some of the reasons why:
>> > >> >> > > > > - it's easier to reason about, and therefore visualize and
>> > >> >> > troubleshoot
>> > >> >> > > > > - it prevents some cases where dependencies are never met
>> > >> >> > > > > - it prevents the explosion of artifact/metadata (one
>> > >> >serialization
>> > >> >> > per
>> > >> >> > > > > dagrun as opposed to one per scheduler cycle) in the case
>> > >of
>> > >> >a
>> > >> >> > dynamic
>> > >> >> > > > DAG
>> > >> >> > > > > whose fingerprint is never the same. I'm certain that
>> > >there
>> > >> >are lots
>> > >> >> > of
>> > >> >> > > > > those DAGs out there, and that it will overwhelm the
>> > >metadata
>> > >> >> > database,
>> > >> >> > > > and
>> > >> >> > > > > confuse the users. For an hourly DAG is would mean 24
>> > >> >artifact per
>> > >> >> > day
>> > >> >> > > > > instead of 1000+
>> > >> >> > > > >
>> > >> >> > > > > If backwards compatibility in behavior is a concern, I'd
>> > >> >recommend
>> > >> >> > > > adding a
>> > >> >> > > > > flag to the DAG class and/or config and make sure we're
>> > >doing
>> > >> >the
>> > >> >> > right
>> > >> >> > > > > thing by default. People who want backward compatibility
>> > >> >would have
>> > >> >> > to
>> > >> >> > > > > change that default. But again, that's a lot of extra and
>> > >> >confusing
>> > >> >> > > > > complexity that will likely be the source of bugs and user
>> > >> >confusion.
>> > >> >> > > > > Having a clear, easy to reason about execution model is
>> > >super
>> > >> >> > > important.
>> > >> >> > > > >
>> > >> >> > > > > Think about visualizing a DAG that shapeshifted 5 times
>> > >> >during its
>> > >> >> > > > > execution, how does anyone make sense of that?
>> > >> >> > > > >
>> > >> >> > > > > Max
>> > >> >> > > > >
>> > >> >> > > > > On Tue, Jul 28, 2020 at 3:14 AM Kaxil Naik
>> > >> ><ka...@gmail.com>
>> > >> >> > > wrote:
>> > >> >> > > > >
>> > >> >> > > > > > Thanks Max for your comments.
>> > >> >> > > > > >
>> > >> >> > > > > >
>> > >> >> > > > > > *DAG Fingerprinting: *this can be tricky, especially in
>> > >> >regards to
>> > >> >> > > > > dynamic
>> > >> >> > > > > > > DAGs, where in some cases each parsing of the DAG can
>> > >> >result in a
>> > >> >> > > > > > different
>> > >> >> > > > > > > fingerprint. I think DAG and tasks attributes are left
>> > >> >out from
>> > >> >> > the
>> > >> >> > > > > > > proposal that should be considered as part of the
>> > >> >fingerprint,
>> > >> >> > like
>> > >> >> > > > > > trigger
>> > >> >> > > > > > > rules or task start/end datetime. We should do a full
>> > >> >pass of all
>> > >> >> > > DAG
>> > >> >> > > > > > > arguments and make sure we're not forgetting anything
>> > >> >that can
>> > >> >> > > change
>> > >> >> > > > > > > scheduling logic. Also, let's be careful that
>> > >something
>> > >> >as simple
>> > >> >> > > as
>> > >> >> > > > a
>> > >> >> > > > > > > dynamic start or end date on a task could lead to a
>> > >> >different
>> > >> >> > > version
>> > >> >> > > > > > each
>> > >> >> > > > > > > time you parse.
>> > >> >> > > > > >
>> > >> >> > > > > >
>> > >> >> > > > > >
>> > >> >> > > > > > The short version of Dag Fingerprinting would be
>> > >> >> > > > > > just a hash of the Serialized DAG.
>> > >> >> > > > > >
>> > >> >> > > > > > *Example DAG*: https://imgur.com/TVuoN3p
>> > >> >> > > > > > *Example Serialized DAG*: https://imgur.com/LmA2Bpr
>> > >> >> > > > > >
>> > >> >> > > > > > It contains all the task & DAG parameters. When they
>> > >> >change,
>> > >> >> > > Scheduler
>> > >> >> > > > > > writes
>> > >> >> > > > > > a new version of Serialized DAGs to the DB. The
>> > >Webserver
>> > >> >then
>> > >> >> > reads
>> > >> >> > > > the
>> > >> >> > > > > > DAGs from the DB.
>> > >> >> > > > > >
>> > >> >> > > > > > I'd recommend limiting serialization/storage of one
>> > >version
>> > >> >> > > > > > > per DAG Run, as opposed to potentially everytime the
>> > >DAG
>> > >> >is
>> > >> >> > parsed
>> > >> >> > > -
>> > >> >> > > > > once
>> > >> >> > > > > > > the version for a DAG run is pinned, fingerprinting is
>> > >> >not
>> > >> >> > > > re-evaluated
>> > >> >> > > > > > > until the next DAG run is ready to get created.
>> > >> >> > > > > >
>> > >> >> > > > > >
>> > >> >> > > > > > This is to handle Scenario 3 where a DAG structure is
>> > >> >changed
>> > >> >> > > mid-way.
>> > >> >> > > > > > Since we don't intend to
>> > >> >> > > > > > change the execution behaviour, if we limit Storage of 1
>> > >> >version
>> > >> >> > per
>> > >> >> > > > DAG,
>> > >> >> > > > > > it won't actually show what
>> > >> >> > > > > > was run.
>> > >> >> > > > > >
>> > >> >> > > > > > Example Dag v1: Task A -> Task B -> Task C
>> > >> >> > > > > > The worker has completed the execution of Task B and is
>> > >> >just about
>> > >> >> > to
>> > >> >> > > > > > complete the execution of Task B.
>> > >> >> > > > > >
>> > >> >> > > > > > The 2nd version of DAG is deployed: Task A -> Task D
>> > >> >> > > > > > Now Scheduler queued Task D and it will run to
>> > >completion.
>> > >> >(Task C
>> > >> >> > > > won't
>> > >> >> > > > > > run)
>> > >> >> > > > > >
>> > >> >> > > > > > In this case, "the actual representation of the DAG"
>> > >that
>> > >> >run is
>> > >> >> > > > neither
>> > >> >> > > > > v1
>> > >> >> > > > > > nor v2 but a "mixed version"
>> > >> >> > > > > >  (Task A -> Task B -> Task D). The plan is that the
>> > >> >Scheduler will
>> > >> >> > > > create
>> > >> >> > > > > > this "mixed version" based on what ran
>> > >> >> > > > > > and the Graph View would show this "mixed version".
>> > >> >> > > > > >
>> > >> >> > > > > > There would also be a toggle button on the Graph View to
>> > >> >select v1
>> > >> >> > or
>> > >> >> > > > v2
>> > >> >> > > > > > where the tasks will be highlighted to show
>> > >> >> > > > > > that a particular task was in v1 or v2 as shown in
>> > >> >> > > > > >
>> > >> >> > > > > >
>> > >> >> > > > >
>> > >> >> > > >
>> > >> >> > >
>> > >> >> >
>> > >>
>> > >>
>> > https://cwiki.apache.org/confluence/download/attachments/158868919/Picture%201.png?version=2&modificationDate=1595612863000&api=v2
>> > >> >> > > > > >
>> > >> >> > > > > >
>> > >> >> > > > > >
>> > >> >> > > > > > *Visualizing change in the tree view:* I think this is
>> > >very
>> > >> >complex
>> > >> >> > > and
>> > >> >> > > > > > > many things can make this view impossible to render
>> > >(task
>> > >> >> > > dependency
>> > >> >> > > > > > > reversal, cycles across versions, ...). Maybe a better
>> > >> >visual
>> > >> >> > > > approach
>> > >> >> > > > > > > would be to render independent, individual tree views
>> > >for
>> > >> >each
>> > >> >> > DAG
>> > >> >> > > > > > version
>> > >> >> > > > > > > (side by side), and doing best effort aligning the
>> > >tasks
>> > >> >across
>> > >> >> > > > blocks
>> > >> >> > > > > > and
>> > >> >> > > > > > > "linking" tasks with lines across blocks when
>> > >necessary.
>> > >> >> > > > > >
>> > >> >> > > > > >
>> > >> >> > > > > > Agreed, the plan is to do the best effort aligning.
>> > >> >> > > > > > At this point in time, task additions to the end of the
>> > >DAG
>> > >> >are
>> > >> >> > > > expected
>> > >> >> > > > > to
>> > >> >> > > > > > be compatible,
>> > >> >> > > > > > but changes to task structure within the DAG may cause
>> > >the
>> > >> >tree
>> > >> >> > view
>> > >> >> > > > not
>> > >> >> > > > > to
>> > >> >> > > > > > incorporate “old” and “new” in the same view, hence that
>> > >> >won't be
>> > >> >> > > > shown.
>> > >> >> > > > > >
>> > >> >> > > > > > Regards,
>> > >> >> > > > > > Kaxil
>> > >> >> > > > > >
>> > >> >> > > > > > On Mon, Jul 27, 2020 at 6:02 PM Maxime Beauchemin <
>> > >> >> > > > > > maximebeauchemin@gmail.com> wrote:
>> > >> >> > > > > >
>> > >> >> > > > > > > Some notes and ideas:
>> > >> >> > > > > > >
>> > >> >> > > > > > > *DAG Fingerprinting: *this can be tricky, especially
>> > >in
>> > >> >regards
>> > >> >> > to
>> > >> >> > > > > > dynamic
>> > >> >> > > > > > > DAGs, where in some cases each parsing of the DAG can
>> > >> >result in a
>> > >> >> > > > > > different
>> > >> >> > > > > > > fingerprint. I think DAG and tasks attributes are left
>> > >> >out from
>> > >> >> > the
>> > >> >> > > > > > > proposal that should be considered as part of the
>> > >> >fingerprint,
>> > >> >> > like
>> > >> >> > > > > > trigger
>> > >> >> > > > > > > rules or task start/end datetime. We should do a full
>> > >> >pass of all
>> > >> >> > > DAG
>> > >> >> > > > > > > arguments and make sure we're not forgetting anything
>> > >> >that can
>> > >> >> > > change
>> > >> >> > > > > > > scheduling logic. Also, let's be careful that
>> > >something
>> > >> >as simple
>> > >> >> > > as
>> > >> >> > > > a
>> > >> >> > > > > > > dynamic start or end date on a task could lead to a
>> > >> >different
>> > >> >> > > version
>> > >> >> > > > > > each
>> > >> >> > > > > > > time you parse. I'd recommend limiting
>> > >> >serialization/storage of
>> > >> >> > one
>> > >> >> > > > > > version
>> > >> >> > > > > > > per DAG Run, as opposed to potentially everytime the
>> > >DAG
>> > >> >is
>> > >> >> > parsed
>> > >> >> > > -
>> > >> >> > > > > once
>> > >> >> > > > > > > the version for a DAG run is pinned, fingerprinting is
>> > >> >not
>> > >> >> > > > re-evaluated
>> > >> >> > > > > > > until the next DAG run is ready to get created.
>> > >> >> > > > > > >
>> > >> >> > > > > > > *Visualizing change in the tree view:* I think this is
>> > >> >very
>> > >> >> > complex
>> > >> >> > > > and
>> > >> >> > > > > > > many things can make this view impossible to render
>> > >(task
>> > >> >> > > dependency
>> > >> >> > > > > > > reversal, cycles across versions, ...). Maybe a better
>> > >> >visual
>> > >> >> > > > approach
>> > >> >> > > > > > > would be to render independent, individual tree views
>> > >for
>> > >> >each
>> > >> >> > DAG
>> > >> >> > > > > > version
>> > >> >> > > > > > > (side by side), and doing best effort aligning the
>> > >tasks
>> > >> >across
>> > >> >> > > > blocks
>> > >> >> > > > > > and
>> > >> >> > > > > > > "linking" tasks with lines across blocks when
>> > >necessary.
>> > >> >> > > > > > >
>> > >> >> > > > > > > On Fri, Jul 24, 2020 at 12:46 PM Vikram Koka <
>> > >> >> > vikram@astronomer.io
>> > >> >> > > >
>> > >> >> > > > > > wrote:
>> > >> >> > > > > > >
>> > >> >> > > > > > > > Team,
>> > >> >> > > > > > > >
>> > >> >> > > > > > > >
>> > >> >> > > > > > > >
>> > >> >> > > > > > > > We just created 'AIP-36 DAG Versioning' on
>> > >Confluence
>> > >> >and would
>> > >> >> > > > very
>> > >> >> > > > > > much
>> > >> >> > > > > > > > appreciate feedback and suggestions from the
>> > >community.
>> > >> >> > > > > > > >
>> > >> >> > > > > > > >
>> > >> >> > > > > > > >
>> > >> >> > > > > > > >
>> > >> >> > > > > > >
>> > >> >> > > > > >
>> > >> >> > > > >
>> > >> >> > > >
>> > >> >> > >
>> > >> >> >
>> > >>
>> > >>
>> > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-36+DAG+Versioning
>> > >> >> > > > > > > >
>> > >> >> > > > > > > >
>> > >> >> > > > > > > >
>> > >> >> > > > > > > > The DAG Versioning concept has been discussed on
>> > >> >multiple
>> > >> >> > > occasions
>> > >> >> > > > > in
>> > >> >> > > > > > > the
>> > >> >> > > > > > > > past and has been a topic highlighted as part of
>> > >> >Airflow 2.0 as
>> > >> >> > > > well.
>> > >> >> > > > > > We
>> > >> >> > > > > > > at
>> > >> >> > > > > > > > Astronomer have heard data engineers at several
>> > >> >enterprises ask
>> > >> >> > > > about
>> > >> >> > > > > > > this
>> > >> >> > > > > > > > feature as well, for easier debugging when changes
>> > >are
>> > >> >made to
>> > >> >> > > DAGs
>> > >> >> > > > > as
>> > >> >> > > > > > a
>> > >> >> > > > > > > > result of evolving business needs.
>> > >> >> > > > > > > >
>> > >> >> > > > > > > >
>> > >> >> > > > > > > > As described in the AIP, we have a proposal focused
>> > >on
>> > >> >ensuring
>> > >> >> > > > that
>> > >> >> > > > > > the
>> > >> >> > > > > > > > visibility behaviour of Airflow is correct, without
>> > >> >changing
>> > >> >> > the
>> > >> >> > > > > > > execution
>> > >> >> > > > > > > > behaviour. We considered changing the execution
>> > >> >behaviour as
>> > >> >> > > well,
>> > >> >> > > > > but
>> > >> >> > > > > > > > decided that the risks in changing execution
>> > >behavior
>> > >> >were too
>> > >> >> > > high
>> > >> >> > > > > as
>> > >> >> > > > > > > > compared to the benefits and therefore decided to
>> > >limit
>> > >> >the
>> > >> >> > scope
>> > >> >> > > > to
>> > >> >> > > > > > only
>> > >> >> > > > > > > > making sure that the visibility was correct.
>> > >> >> > > > > > > >
>> > >> >> > > > > > > >
>> > >> >> > > > > > > > We would like to attempt this based on our
>> > >experience
>> > >> >running
>> > >> >> > > > Airflow
>> > >> >> > > > > > as
>> > >> >> > > > > > > a
>> > >> >> > > > > > > > service. We believe that this benefits Airflow as a
>> > >> >project and
>> > >> >> > > the
>> > >> >> > > > > > > > development experience of data engineers using
>> > >Airflow
>> > >> >across
>> > >> >> > the
>> > >> >> > > > > > world.
>> > >> >> > > > > > > >
>> > >> >> > > > > > > >
>> > >> >> > > > > > > >  Any feedback, suggestions, and comments would be
>> > >> >greatly
>> > >> >> > > > > appreciated.
>> > >> >> > > > > > > >
>> > >> >> > > > > > > >
>> > >> >> > > > > > > >
>> > >> >> > > > > > > > Best Regards,
>> > >> >> > > > > > > >
>> > >> >> > > > > > > >
>> > >> >> > > > > > > > Kaxil Naik, Ryan Hamilton, Ash Berlin-Taylor, and
>> > >> >Vikram Koka
>> > >> >> > > > > > > >
>> > >> >> > > > > > >
>> > >> >> > > > > >
>> > >> >> > > > >
>> > >> >> > > >
>> > >> >> > >
>> > >> >> > >
>> > >> >> > > --
>> > >> >> > >
>> > >> >> > > Jacob Ward    |    Graduate Data Infrastructure Engineer
>> > >> >> > >
>> > >> >> > > jward@brandwatch.com
>> > >> >> > >
>> > >> >> > >
>> > >> >> > > NEW YORK   | BOSTON   | BRIGHTON   | LONDON   | BERLIN |
>> > >> >STUTTGART |
>> > >> >> > > PARIS   | SINGAPORE | SYDNEY
>> > >> >> > >
>> > >> >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >--
>> > >> >
>> > >> >Jarek Potiuk
>> > >> >Polidea | Principal Software Engineer
>> > >> >
>> > >> >M: +48 660 796 129
>> >
>>
>>
>> --
>>
>> Jarek Potiuk
>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>
>> M: +48 660 796 129 <+48660796129>
>> [image: Polidea] <https://www.polidea.com/>


Re: [PROPOSAL][AIP-36 DAG Versioning]

Posted by Kaxil Naik <ka...@gmail.com>.
Hey folks,

Just reviving this old thread to provide an update that we (Astronomer)
will be resurrecting AIP-36 DAG Versioning with a different scope in the
coming days that will be more consistent with what has been discussed in
this thread.

Regards,
Kaxil

On Thu, Aug 13, 2020 at 9:32 PM Jarek Potiuk <Ja...@polidea.com>
wrote:

> I fully agree with the "user" not having to know any of the "wheel'
> details. Similarly as they do not have to know python interpreter or the
> underlying libc library details. This  all should be hidden from the users.
>
> I think the wheels API that we might have there, does not have to be
> user-facing. We could - rather easily - make a client that points to a DAG
> file and builds appropriate wheel package under-the-hood and submits it. I
> reallly doubt any of the users will directly use the API to submit DAGs -
> they will use some clients built on top of it.
>
> I think we should separate the user side form the implementation -
> similarly as we do not expect the users to know any details on how "DAG
> Fetcher" should work - in any case with the DAG fetcher, we need to define
> how DAG fetcher will make sure about "atomicity" anyway - how to make sure
> that you get a "consistent" version of all the dependent python files when
> you fetch them? This is the part of DAG fetcher that i do not like because
> it assumes that "someone else" maintains the consistency and provides the
> "consistent view" somewhere on the "DAG Server" side (whatever the server
> side is).
>
> There were many ideas about some kind of manifest describing the files etc,
> but I think all of that depends on some kind of ability of providing a
> "snapshot" of files that will be consistent set to execute. With 'DAG
> Fetcher" this is somthing that "DAG Fetching server" has to provide. It's
> super easy if that "server" is GIT - we already use it for GIT sync. But
> it's rather difficult to provide a good abstraction for it for "generic"
> DAG fetcher.
>
> IMHO this is far easier to provide such consistent set at a "submission
> time". In pretty-much all cases, the user submitting the job already has
> consistent set of python files that the DAG uses. This is pretty much
> given. I think the job of the "submission" mechanism is to make a
> "snapshot" out of that consistent set and submit this snapshot, rather than
> individual files. Git provides it out of the box, but if we want to be
> generic - I see no other way than to build such "snapshot" locally. And
> Wheels seems like a very good candidate - if only it's an implementation
> detail and will be hidden from the users.
>
> J.
>
>
>
>
> On Tue, Aug 11, 2020 at 8:33 PM Ash Berlin-Taylor <as...@apache.org> wrote:
>
> > Anything to doing with the process of building wheels should be a "power
> > user" only feature, and should not be required for many users - many many
> > users of airflow are not primarily Python developers, but data
> scientists,
> > and needing them to understand anything about the python build toolchain
> is
> > too much of a learning curve for the benefit.
> >
> > After all it is very rare that people hit the multiple concurrent
> versions
> > of a dag.
> >
> > -ash
> >
> > On 10 August 2020 17:37:32 BST, Tomasz Urbaszek <tu...@apache.org>
> > wrote:
> > >I like the idea of wheels as this is probably the "most pythonic"
> > >solution. And "DAG version" is not only defined by DAG code but also
> > >by all dependencies the DAG uses (custom functions, libraries etc) and
> > >it seems that wheels can address that.
> > >
> > >However, I second Ash - keeping wheels in db doesn't sound good. In my
> > >opinion, DAG fetcher is the right solution and the idea surfaces every
> > >time we talk about serialization. This abstraction has a lot of pros
> > >as it allows a lot of customization (wheels, local fs, remote fs,
> > >wheels etc).
> > >
> > >Apart from that, if we decided to use wheels we should provide a CLI
> > >command to ease the process of building them. Also, I'm wondering
> > >about developers' workflow. Moving between code of different DAG
> > >version sounds easy if you use git but... what if someone doesn't use
> > >it?
> > >
> > >Tomek
> > >
> > >
> > >On Sat, Aug 8, 2020 at 9:49 AM Ash Berlin-Taylor <as...@apache.org>
> > >wrote:
> > >>
> > >> Quick comment (as I'm still mostly on paternity leave):
> > >>
> > >> Storing wheels in the db sounds like a bad Idea to me, especially if
> > >we need to store deps in there too (and if we don't store deps, then
> > >they are incomplete) - they could get very large, and I've stored blobs
> > >of ~10mb in postgres before: I don't recommend it. It "works" but
> > >operating it is tricky.
> > >>
> > >>
> > >>
> > >> > the API could simply accept "Wheel file + the Dag id"
> > >>
> > >> This sounds like a huge security risk.
> > >>
> > >>
> > >> My main concern with this idea is that it seems a lot of complexity
> > >we are putting on users. Doubly so if they are already using docker
> > >where there already exists an Ideal packaging and distribution that
> > >could contain dag + needed code.
> > >>
> > >> (Sorry for the brevity)
> > >>
> > >> -ash
> > >>
> > >>
> > >> On 2 August 2020 08:47:39 BST, Jarek Potiuk
> > ><Ja...@polidea.com> wrote:
> > >> >Few points from my sid (and proposal!):
> > >> >
> > >> >1) Agree with Max -  with a rather strong NO for pickles (however,
> > >> >indeed cloudpickle solves some of the problems). Pickles came up in
> > >> >our discussion in Polidea recently and the overall message was "no".
> > >I
> > >> >agree with Max here - if we can ship python code, turning that into
> > >> >pickle for transit makes little sense to me and brings a plethora of
> > >> >problems.
> > >> >
> > >> >2) I think indeed the versioning solution should treat the "DagRun"
> > >> >structure atomically. While I see why we would like to go with the
> > >> >UI/Scheduler only first rather than implementing them in the
> > >workers,
> > >> >adding the "mixed version" is where it breaks down IMHO. Reasoning
> > >> >about such "mixed version" dag is next to impossible. The current
> > >> >behavior is not well defined and non-deterministic (depends on
> > >> >scheduler delays, syncing, type of deployment, restarts of the works
> > >> >etc.) we are moving it up to UI (thus users) rather than solving the
> > >> >problem. So I am not a big fan of this and would rather solve it
> > >> >"well" with atomicity.
> > >> >
> > >> >3) I see the point of Dan as well - we had many discussions and many
> > >> >times the idea about "submitting" the DAG for execution via the API
> > >> >came up - and it makes sense IMHO.
> > >> >
> > >> >Proposal: Implement full versioning with code shipping via DB wheels
> > >> >BLOB (akin to serialized DAGs).
> > >> >
> > >> >I understand that the big issue is how to actually "ship" the code
> > >to
> > >> >the worker. And - maybe a wild idea - we can kill several birds with
> > >> >the same stone.
> > >> >
> > >> >There were plenty of discussions on how we could do that but one was
> > >> >never truly explored - using wheel packages.
> > >> >
> > >> >For those who do not know them, there is the PEP:
> > >> >https://www.python.org/dev/peps/pep-0427/
> > >> >
> > >> >Wheels allow to "package" python code in a standard way. They are
> > >> >portable ("purelib" + contain .py rather than .pyc code), they have
> > >> >metadata, versioning information, they can be signed for security,
> > >> >They can contain other packages or python code, Why don't we let
> > >> >scheduler to pack the fingerprinted version of the DAG in a .whl and
> > >> >store it as a blob in a DB next to the serialized form?
> > >> >
> > >> >There were concerns about the size of the code to keep in the DB -
> > >but
> > >> >we already use the DB for serialized DAGs and it works fine (I
> > >believe
> > >> >we only need to add compressing of the JSon serialized form - as
> > >we've
> > >> >learned from AirBnb during their talk at the Airflow Summit - wheels
> > >> >are already compressed). Also - each task will only need the
> > >> >particular "version" of one DAG so even if we keep many of them in
> > >the
> > >> >DB, the old version will pretty soon go "cold" and will never be
> > >> >retrieved (and most DBs will handle it well with caching/indexes).
> > >> >
> > >> >And if we want to add "callables" from other files - there is
> > >nothing
> > >> >to stop the person who defines dag to add list of files that should
> > >be
> > >> >packaged together with the main DAG file (additional_python_files =
> > >> >["common/my_fantastic_library.py"] in DAG constructor). Or we could
> > >> >auto-add all files after the DAG gets imported (i.e. package
> > >> >automatically all files that are imported for that particular DAG
> > >from
> > >> >the "dags" folder"). That should be rather easy.
> > >> >
> > >> >This way we could ship the code to workers for the exact version
> > >that
> > >> >the DagRun uses. And they can be cached and unpacked/installed to a
> > >> >virtualenv for the execution of that single task. That should be
> > >super
> > >> >quick. Such virtualenv can be wiped out after execution.
> > >> >
> > >> >Then we got what Max wants (atomicity of DagRuns) and what Dan wants
> > >> >(the API could simply accept "Wheel file + the Dag id". We have the
> > >> >isolation between tasks running on the same worker (based on
> > >> >virtualenv) so that each process in the same worker can run a
> > >> >different version of the same Dag. We have much less confusion for
> > >the
> > >> >UI.
> > >> >
> > >> >Extra bonus 1: we can expand it to package different dependencies in
> > >> >the wheels as well - so that if an operator requires a different
> > >> >(newer) version of a python library, it could be packaged together
> > >> >with the DAG in the same .whl file. This is also a highly requested
> > >> >feature.
> > >> >Extra bonus 2: workers will stop depending on the DAG file mount (!)
> > >> >which was our long term goal and indeed as Dan mentioned - a great
> > >> >step towards multi-tenancy.
> > >> >
> > >> >J.
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >On Fri, Jul 31, 2020 at 6:41 AM Maxime Beauchemin
> > >> ><ma...@gmail.com> wrote:
> > >> >>
> > >> >> Having tried it early on, I'd advocate pretty strongly against
> > >> >pickles and
> > >> >> would rather not get too deep into the why here. Short story is
> > >they
> > >> >can
> > >> >> pull the entire memory space or much more than you want, and it's
> > >> >> impossible to reason about where they end. For that reason and
> > >other
> > >> >> reasons, they're a security issue. Oh and some objects are not
> > >> >picklable
> > >> >> (Jinja templates! to name a problematic one...). I've also seen
> > >> >> secret-related classes that raise when pickled (thank god!).
> > >> >>
> > >> >> About callback and other things like that, it's quite a puzzle in
> > >> >python.
> > >> >> One solution would be to point to a python namespace
> > >> >> callback="preset.airflow_utils.slack_callback" and assume the
> > >> >function has
> > >> >> to exist in the remote interpreter. Personally I like the
> > >DagFetcher
> > >> >idea
> > >> >> (it could be great to get a pointer to that mailing list thread
> > >> >here),
> > >> >> specifically the GitDagFetcher. I don't know how [un]reasonable it
> > >> >is, but
> > >> >> I hate pickles so much that shipping source code around seems much
> > >> >more
> > >> >> reasonable to me. I think out there there's a talk from Mike Star
> > >> >about
> > >> >> Dataswarm at FB and he may mention how their workers may git
> > >shallow
> > >> >clone
> > >> >> the pipeline repo. Or maybe they use that "beautifully ugly" hack
> > >to
> > >> >use
> > >> >> a gitfs fuse [file system in user space] on the worker [could get
> > >> >deeper
> > >> >> into that, not sure how reasonable that is either].
> > >> >>
> > >> >> About fingerprints, a simple `start_date = datetime.now() -
> > >> >timedelta(1)`
> > >> >> may lead to a never-repeating fingerprint. From memory the spec
> > >> >doesn't
> > >> >> list out the properties considered to build the hash. It be
> > >helpful
> > >> >to
> > >> >> specify and review that list.
> > >> >>
> > >> >> Max
> > >> >>
> > >> >> On Wed, Jul 29, 2020 at 5:20 AM Kaxil Naik <ka...@gmail.com>
> > >> >wrote:
> > >> >>
> > >> >> > Thanks, both Max and Dan for your comments, please check my
> > >reply
> > >> >below:
> > >> >> >
> > >> >> >
> > >> >> > >  Personally I vote for a DAG version to be pinned and
> > >consistent
> > >> >for the
> > >> >> > > duration of the DAG run. Some of the reasons why:
> > >> >> > > - it's easier to reason about, and therefore visualize and
> > >> >troubleshoot
> > >> >> > > - it prevents some cases where dependencies are never met
> > >> >> > > - it prevents the explosion of artifact/metadata (one
> > >> >serialization per
> > >> >> > > dagrun as opposed to one per scheduler cycle) in the case of a
> > >> >dynamic
> > >> >> > DAG
> > >> >> > > whose fingerprint is never the same.
> > >> >> >
> > >> >> >
> > >> >> > In this AIP, we were only looking to fix the current "Viewing
> > >> >behaviour"
> > >> >> > and
> > >> >> > we were intentionally not changing the execution behaviour.
> > >> >> > The change you are suggesting means we need to introduce DAG
> > >> >Versioning for
> > >> >> > the
> > >> >> > workers too. This will need more work as can't use the
> > >Serialised
> > >> >> > Representation
> > >> >> > to run the task since users could use custom modules in a
> > >different
> > >> >part of
> > >> >> > code,
> > >> >> > example the PythonOperator has python_callable that allows
> > >running
> > >> >any
> > >> >> > arbitrary code.
> > >> >> > A similar case is with the *on_*_callbacks* defined on DAG.
> > >> >> >
> > >> >> > Based on the current scope of the AIP, we still plan to use the
> > >> >actual DAG
> > >> >> > files for the
> > >> >> > execution and not use Serialized DAGs for the workers.
> > >> >> >
> > >> >> > To account for all the custom modules we will have to start
> > >looking
> > >> >at
> > >> >> > pickle (cloudpickle).
> > >> >> >
> > >> >> > I'm certain that there are lots of
> > >> >> > > those DAGs out there, and that it will overwhelm the metadata
> > >> >database,
> > >> >> > and
> > >> >> > > confuse the users. For an hourly DAG is would mean 24 artifact
> > >> >per day
> > >> >> > > instead of 1000+
> > >> >> >
> > >> >> >
> > >> >> > What kind of dynamic DAGs are we talking about here, I would
> > >think
> > >> >the DAG
> > >> >> > signature won't change
> > >> >> > but I might be wrong, can you give an example, please.
> > >> >> >
> > >> >> > If backwards compatibility in behavior is a concern, I'd
> > >recommend
> > >> >adding a
> > >> >> > > flag to the DAG class and/or config and make sure we're doing
> > >the
> > >> >right
> > >> >> > > thing by default. People who want backward compatibility would
> > >> >have to
> > >> >> > > change that default. But again, that's a lot of extra and
> > >> >confusing
> > >> >> > > complexity that will likely be the source of bugs and user
> > >> >confusion.
> > >> >> > > Having a clear, easy to reason about execution model is super
> > >> >important.
> > >> >> >
> > >> >> > Think about visualizing a DAG that shapeshifted 5 times during
> > >its
> > >> >> > > execution, how does anyone make sense of that?
> > >> >> >
> > >> >> >
> > >> >> > Wouldn't that be an edge case? How often would someone change
> > >the
> > >> >DAG
> > >> >> > structure in the middle of
> > >> >> > a DAG execution. And since if they do change, the Graph View
> > >should
> > >> >show
> > >> >> > all the tasks that were
> > >> >> > run, if it just shows based on the latest version, the behaviour
> > >> >would be
> > >> >> > the same as now.
> > >> >> >
> > >> >> > --------
> > >> >> >
> > >> >> > Strongly agree with Max's points, also I feel the right way to
> > >go
> > >> >about
> > >> >> > > this is instead of Airflow schedulers/webservers/workers
> > >reading
> > >> >DAG
> > >> >> > Python
> > >> >> > > files, they would instead read from serialized representations
> > >of
> > >> >the
> > >> >> > DAGs
> > >> >> > > (e.g. json representation in the Airflow DB). Instead of DAG
> > >> >owners
> > >> >> > pushing
> > >> >> > > their DAG files to the Airflow components via varying
> > >mechanisms
> > >> >(e.g.
> > >> >> > > git), they would instead call an Airflow CLI to push the
> > >> >serialized DAG
> > >> >> > > representations to the DB, and for things like dynamic DAGs
> > >you
> > >> >could
> > >> >> > > populate them from a DAG or another service.
> > >> >> >
> > >> >> >
> > >> >> > Airflow Webserver and the Scheduler will definitely read from
> > >the
> > >> >> > Serialized representation as
> > >> >> > they don't need all the code from the DAG files.
> > >> >> >
> > >> >> > While the workers definitely need access to DAG files as the
> > >> >> > tasks/operators would be using
> > >> >> > code form custom modules and classes which are required to run
> > >the
> > >> >tasks.
> > >> >> >
> > >> >> > If we do want to go down that route we will have to use
> > >something
> > >> >like
> > >> >> > cloudpickle that serializes
> > >> >> > entire DAG file and their dependencies. And also ensure that
> > >> >someone is not
> > >> >> > able to change the pickled
> > >> >> > source when sending from executor to the worker as that poses a
> > >big
> > >> >> > security risk.
> > >> >> >
> > >> >> > - Kaxil
> > >> >> >
> > >> >> > On Wed, Jul 29, 2020 at 12:43 PM Jacob Ward
> > ><jw...@brandwatch.com>
> > >> >wrote:
> > >> >> >
> > >> >> > > I came here to say what Max has said, only less eloquently.
> > >> >> > >
> > >> >> > > I do have one concern with locking the version for a single
> > >run.
> > >> >> > Currently
> > >> >> > > it is possible for a user to create a dag which intentionally
> > >> >changes as
> > >> >> > a
> > >> >> > > dag executes, i.e. dynamically creating a task for the dag
> > >during
> > >> >a run
> > >> >> > by
> > >> >> > > modifying external data, but this change would prevent that.
> > >I'm
> > >> >of the
> > >> >> > > opinion that this situation is bad practice anyway so it
> > >doesn't
> > >> >matter
> > >> >> > if
> > >> >> > > we make it impossible to do, but others may disagree.
> > >> >> > >
> > >> >> > > On Tue, 28 Jul 2020 at 17:08, Dan Davydov
> > >> ><dd...@twitter.com.invalid>
> > >> >> > > wrote:
> > >> >> > >
> > >> >> > > > Strongly agree with Max's points, also I feel the right way
> > >to
> > >> >go about
> > >> >> > > > this is instead of Airflow schedulers/webservers/workers
> > >> >reading DAG
> > >> >> > > Python
> > >> >> > > > files, they would instead read from serialized
> > >representations
> > >> >of the
> > >> >> > > DAGs
> > >> >> > > > (e.g. json representation in the Airflow DB). Instead of DAG
> > >> >owners
> > >> >> > > pushing
> > >> >> > > > their DAG files to the Airflow components via varying
> > >> >mechanisms (e.g.
> > >> >> > > > git), they would instead call an Airflow CLI to push the
> > >> >serialized DAG
> > >> >> > > > representations to the DB, and for things like dynamic DAGs
> > >you
> > >> >could
> > >> >> > > > populate them from a DAG or another service.
> > >> >> > > >
> > >> >> > > > This would also enable other features like stronger
> > >> >> > > security/multi-tenancy.
> > >> >> > > >
> > >> >> > > > On Tue, Jul 28, 2020 at 6:44 PM Maxime Beauchemin <
> > >> >> > > > maximebeauchemin@gmail.com> wrote:
> > >> >> > > >
> > >> >> > > > > > "mixed version"
> > >> >> > > > >
> > >> >> > > > > Personally I vote for a DAG version to be pinned and
> > >> >consistent for
> > >> >> > the
> > >> >> > > > > duration of the DAG run. Some of the reasons why:
> > >> >> > > > > - it's easier to reason about, and therefore visualize and
> > >> >> > troubleshoot
> > >> >> > > > > - it prevents some cases where dependencies are never met
> > >> >> > > > > - it prevents the explosion of artifact/metadata (one
> > >> >serialization
> > >> >> > per
> > >> >> > > > > dagrun as opposed to one per scheduler cycle) in the case
> > >of
> > >> >a
> > >> >> > dynamic
> > >> >> > > > DAG
> > >> >> > > > > whose fingerprint is never the same. I'm certain that
> > >there
> > >> >are lots
> > >> >> > of
> > >> >> > > > > those DAGs out there, and that it will overwhelm the
> > >metadata
> > >> >> > database,
> > >> >> > > > and
> > >> >> > > > > confuse the users. For an hourly DAG is would mean 24
> > >> >artifact per
> > >> >> > day
> > >> >> > > > > instead of 1000+
> > >> >> > > > >
> > >> >> > > > > If backwards compatibility in behavior is a concern, I'd
> > >> >recommend
> > >> >> > > > adding a
> > >> >> > > > > flag to the DAG class and/or config and make sure we're
> > >doing
> > >> >the
> > >> >> > right
> > >> >> > > > > thing by default. People who want backward compatibility
> > >> >would have
> > >> >> > to
> > >> >> > > > > change that default. But again, that's a lot of extra and
> > >> >confusing
> > >> >> > > > > complexity that will likely be the source of bugs and user
> > >> >confusion.
> > >> >> > > > > Having a clear, easy to reason about execution model is
> > >super
> > >> >> > > important.
> > >> >> > > > >
> > >> >> > > > > Think about visualizing a DAG that shapeshifted 5 times
> > >> >during its
> > >> >> > > > > execution, how does anyone make sense of that?
> > >> >> > > > >
> > >> >> > > > > Max
> > >> >> > > > >
> > >> >> > > > > On Tue, Jul 28, 2020 at 3:14 AM Kaxil Naik
> > >> ><ka...@gmail.com>
> > >> >> > > wrote:
> > >> >> > > > >
> > >> >> > > > > > Thanks Max for your comments.
> > >> >> > > > > >
> > >> >> > > > > >
> > >> >> > > > > > *DAG Fingerprinting: *this can be tricky, especially in
> > >> >regards to
> > >> >> > > > > dynamic
> > >> >> > > > > > > DAGs, where in some cases each parsing of the DAG can
> > >> >result in a
> > >> >> > > > > > different
> > >> >> > > > > > > fingerprint. I think DAG and tasks attributes are left
> > >> >out from
> > >> >> > the
> > >> >> > > > > > > proposal that should be considered as part of the
> > >> >fingerprint,
> > >> >> > like
> > >> >> > > > > > trigger
> > >> >> > > > > > > rules or task start/end datetime. We should do a full
> > >> >pass of all
> > >> >> > > DAG
> > >> >> > > > > > > arguments and make sure we're not forgetting anything
> > >> >that can
> > >> >> > > change
> > >> >> > > > > > > scheduling logic. Also, let's be careful that
> > >something
> > >> >as simple
> > >> >> > > as
> > >> >> > > > a
> > >> >> > > > > > > dynamic start or end date on a task could lead to a
> > >> >different
> > >> >> > > version
> > >> >> > > > > > each
> > >> >> > > > > > > time you parse.
> > >> >> > > > > >
> > >> >> > > > > >
> > >> >> > > > > >
> > >> >> > > > > > The short version of Dag Fingerprinting would be
> > >> >> > > > > > just a hash of the Serialized DAG.
> > >> >> > > > > >
> > >> >> > > > > > *Example DAG*: https://imgur.com/TVuoN3p
> > >> >> > > > > > *Example Serialized DAG*: https://imgur.com/LmA2Bpr
> > >> >> > > > > >
> > >> >> > > > > > It contains all the task & DAG parameters. When they
> > >> >change,
> > >> >> > > Scheduler
> > >> >> > > > > > writes
> > >> >> > > > > > a new version of Serialized DAGs to the DB. The
> > >Webserver
> > >> >then
> > >> >> > reads
> > >> >> > > > the
> > >> >> > > > > > DAGs from the DB.
> > >> >> > > > > >
> > >> >> > > > > > I'd recommend limiting serialization/storage of one
> > >version
> > >> >> > > > > > > per DAG Run, as opposed to potentially everytime the
> > >DAG
> > >> >is
> > >> >> > parsed
> > >> >> > > -
> > >> >> > > > > once
> > >> >> > > > > > > the version for a DAG run is pinned, fingerprinting is
> > >> >not
> > >> >> > > > re-evaluated
> > >> >> > > > > > > until the next DAG run is ready to get created.
> > >> >> > > > > >
> > >> >> > > > > >
> > >> >> > > > > > This is to handle Scenario 3 where a DAG structure is
> > >> >changed
> > >> >> > > mid-way.
> > >> >> > > > > > Since we don't intend to
> > >> >> > > > > > change the execution behaviour, if we limit Storage of 1
> > >> >version
> > >> >> > per
> > >> >> > > > DAG,
> > >> >> > > > > > it won't actually show what
> > >> >> > > > > > was run.
> > >> >> > > > > >
> > >> >> > > > > > Example Dag v1: Task A -> Task B -> Task C
> > >> >> > > > > > The worker has completed the execution of Task B and is
> > >> >just about
> > >> >> > to
> > >> >> > > > > > complete the execution of Task B.
> > >> >> > > > > >
> > >> >> > > > > > The 2nd version of DAG is deployed: Task A -> Task D
> > >> >> > > > > > Now Scheduler queued Task D and it will run to
> > >completion.
> > >> >(Task C
> > >> >> > > > won't
> > >> >> > > > > > run)
> > >> >> > > > > >
> > >> >> > > > > > In this case, "the actual representation of the DAG"
> > >that
> > >> >run is
> > >> >> > > > neither
> > >> >> > > > > v1
> > >> >> > > > > > nor v2 but a "mixed version"
> > >> >> > > > > >  (Task A -> Task B -> Task D). The plan is that the
> > >> >Scheduler will
> > >> >> > > > create
> > >> >> > > > > > this "mixed version" based on what ran
> > >> >> > > > > > and the Graph View would show this "mixed version".
> > >> >> > > > > >
> > >> >> > > > > > There would also be a toggle button on the Graph View to
> > >> >select v1
> > >> >> > or
> > >> >> > > > v2
> > >> >> > > > > > where the tasks will be highlighted to show
> > >> >> > > > > > that a particular task was in v1 or v2 as shown in
> > >> >> > > > > >
> > >> >> > > > > >
> > >> >> > > > >
> > >> >> > > >
> > >> >> > >
> > >> >> >
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/download/attachments/158868919/Picture%201.png?version=2&modificationDate=1595612863000&api=v2
> > >> >> > > > > >
> > >> >> > > > > >
> > >> >> > > > > >
> > >> >> > > > > > *Visualizing change in the tree view:* I think this is
> > >very
> > >> >complex
> > >> >> > > and
> > >> >> > > > > > > many things can make this view impossible to render
> > >(task
> > >> >> > > dependency
> > >> >> > > > > > > reversal, cycles across versions, ...). Maybe a better
> > >> >visual
> > >> >> > > > approach
> > >> >> > > > > > > would be to render independent, individual tree views
> > >for
> > >> >each
> > >> >> > DAG
> > >> >> > > > > > version
> > >> >> > > > > > > (side by side), and doing best effort aligning the
> > >tasks
> > >> >across
> > >> >> > > > blocks
> > >> >> > > > > > and
> > >> >> > > > > > > "linking" tasks with lines across blocks when
> > >necessary.
> > >> >> > > > > >
> > >> >> > > > > >
> > >> >> > > > > > Agreed, the plan is to do the best effort aligning.
> > >> >> > > > > > At this point in time, task additions to the end of the
> > >DAG
> > >> >are
> > >> >> > > > expected
> > >> >> > > > > to
> > >> >> > > > > > be compatible,
> > >> >> > > > > > but changes to task structure within the DAG may cause
> > >the
> > >> >tree
> > >> >> > view
> > >> >> > > > not
> > >> >> > > > > to
> > >> >> > > > > > incorporate “old” and “new” in the same view, hence that
> > >> >won't be
> > >> >> > > > shown.
> > >> >> > > > > >
> > >> >> > > > > > Regards,
> > >> >> > > > > > Kaxil
> > >> >> > > > > >
> > >> >> > > > > > On Mon, Jul 27, 2020 at 6:02 PM Maxime Beauchemin <
> > >> >> > > > > > maximebeauchemin@gmail.com> wrote:
> > >> >> > > > > >
> > >> >> > > > > > > Some notes and ideas:
> > >> >> > > > > > >
> > >> >> > > > > > > *DAG Fingerprinting: *this can be tricky, especially
> > >in
> > >> >regards
> > >> >> > to
> > >> >> > > > > > dynamic
> > >> >> > > > > > > DAGs, where in some cases each parsing of the DAG can
> > >> >result in a
> > >> >> > > > > > different
> > >> >> > > > > > > fingerprint. I think DAG and tasks attributes are left
> > >> >out from
> > >> >> > the
> > >> >> > > > > > > proposal that should be considered as part of the
> > >> >fingerprint,
> > >> >> > like
> > >> >> > > > > > trigger
> > >> >> > > > > > > rules or task start/end datetime. We should do a full
> > >> >pass of all
> > >> >> > > DAG
> > >> >> > > > > > > arguments and make sure we're not forgetting anything
> > >> >that can
> > >> >> > > change
> > >> >> > > > > > > scheduling logic. Also, let's be careful that
> > >something
> > >> >as simple
> > >> >> > > as
> > >> >> > > > a
> > >> >> > > > > > > dynamic start or end date on a task could lead to a
> > >> >different
> > >> >> > > version
> > >> >> > > > > > each
> > >> >> > > > > > > time you parse. I'd recommend limiting
> > >> >serialization/storage of
> > >> >> > one
> > >> >> > > > > > version
> > >> >> > > > > > > per DAG Run, as opposed to potentially everytime the
> > >DAG
> > >> >is
> > >> >> > parsed
> > >> >> > > -
> > >> >> > > > > once
> > >> >> > > > > > > the version for a DAG run is pinned, fingerprinting is
> > >> >not
> > >> >> > > > re-evaluated
> > >> >> > > > > > > until the next DAG run is ready to get created.
> > >> >> > > > > > >
> > >> >> > > > > > > *Visualizing change in the tree view:* I think this is
> > >> >very
> > >> >> > complex
> > >> >> > > > and
> > >> >> > > > > > > many things can make this view impossible to render
> > >(task
> > >> >> > > dependency
> > >> >> > > > > > > reversal, cycles across versions, ...). Maybe a better
> > >> >visual
> > >> >> > > > approach
> > >> >> > > > > > > would be to render independent, individual tree views
> > >for
> > >> >each
> > >> >> > DAG
> > >> >> > > > > > version
> > >> >> > > > > > > (side by side), and doing best effort aligning the
> > >tasks
> > >> >across
> > >> >> > > > blocks
> > >> >> > > > > > and
> > >> >> > > > > > > "linking" tasks with lines across blocks when
> > >necessary.
> > >> >> > > > > > >
> > >> >> > > > > > > On Fri, Jul 24, 2020 at 12:46 PM Vikram Koka <
> > >> >> > vikram@astronomer.io
> > >> >> > > >
> > >> >> > > > > > wrote:
> > >> >> > > > > > >
> > >> >> > > > > > > > Team,
> > >> >> > > > > > > >
> > >> >> > > > > > > >
> > >> >> > > > > > > >
> > >> >> > > > > > > > We just created 'AIP-36 DAG Versioning' on
> > >Confluence
> > >> >and would
> > >> >> > > > very
> > >> >> > > > > > much
> > >> >> > > > > > > > appreciate feedback and suggestions from the
> > >community.
> > >> >> > > > > > > >
> > >> >> > > > > > > >
> > >> >> > > > > > > >
> > >> >> > > > > > > >
> > >> >> > > > > > >
> > >> >> > > > > >
> > >> >> > > > >
> > >> >> > > >
> > >> >> > >
> > >> >> >
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-36+DAG+Versioning
> > >> >> > > > > > > >
> > >> >> > > > > > > >
> > >> >> > > > > > > >
> > >> >> > > > > > > > The DAG Versioning concept has been discussed on
> > >> >multiple
> > >> >> > > occasions
> > >> >> > > > > in
> > >> >> > > > > > > the
> > >> >> > > > > > > > past and has been a topic highlighted as part of
> > >> >Airflow 2.0 as
> > >> >> > > > well.
> > >> >> > > > > > We
> > >> >> > > > > > > at
> > >> >> > > > > > > > Astronomer have heard data engineers at several
> > >> >enterprises ask
> > >> >> > > > about
> > >> >> > > > > > > this
> > >> >> > > > > > > > feature as well, for easier debugging when changes
> > >are
> > >> >made to
> > >> >> > > DAGs
> > >> >> > > > > as
> > >> >> > > > > > a
> > >> >> > > > > > > > result of evolving business needs.
> > >> >> > > > > > > >
> > >> >> > > > > > > >
> > >> >> > > > > > > > As described in the AIP, we have a proposal focused
> > >on
> > >> >ensuring
> > >> >> > > > that
> > >> >> > > > > > the
> > >> >> > > > > > > > visibility behaviour of Airflow is correct, without
> > >> >changing
> > >> >> > the
> > >> >> > > > > > > execution
> > >> >> > > > > > > > behaviour. We considered changing the execution
> > >> >behaviour as
> > >> >> > > well,
> > >> >> > > > > but
> > >> >> > > > > > > > decided that the risks in changing execution
> > >behavior
> > >> >were too
> > >> >> > > high
> > >> >> > > > > as
> > >> >> > > > > > > > compared to the benefits and therefore decided to
> > >limit
> > >> >the
> > >> >> > scope
> > >> >> > > > to
> > >> >> > > > > > only
> > >> >> > > > > > > > making sure that the visibility was correct.
> > >> >> > > > > > > >
> > >> >> > > > > > > >
> > >> >> > > > > > > > We would like to attempt this based on our
> > >experience
> > >> >running
> > >> >> > > > Airflow
> > >> >> > > > > > as
> > >> >> > > > > > > a
> > >> >> > > > > > > > service. We believe that this benefits Airflow as a
> > >> >project and
> > >> >> > > the
> > >> >> > > > > > > > development experience of data engineers using
> > >Airflow
> > >> >across
> > >> >> > the
> > >> >> > > > > > world.
> > >> >> > > > > > > >
> > >> >> > > > > > > >
> > >> >> > > > > > > >  Any feedback, suggestions, and comments would be
> > >> >greatly
> > >> >> > > > > appreciated.
> > >> >> > > > > > > >
> > >> >> > > > > > > >
> > >> >> > > > > > > >
> > >> >> > > > > > > > Best Regards,
> > >> >> > > > > > > >
> > >> >> > > > > > > >
> > >> >> > > > > > > > Kaxil Naik, Ryan Hamilton, Ash Berlin-Taylor, and
> > >> >Vikram Koka
> > >> >> > > > > > > >
> > >> >> > > > > > >
> > >> >> > > > > >
> > >> >> > > > >
> > >> >> > > >
> > >> >> > >
> > >> >> > >
> > >> >> > > --
> > >> >> > >
> > >> >> > > Jacob Ward    |    Graduate Data Infrastructure Engineer
> > >> >> > >
> > >> >> > > jward@brandwatch.com
> > >> >> > >
> > >> >> > >
> > >> >> > > NEW YORK   | BOSTON   | BRIGHTON   | LONDON   | BERLIN |
> > >> >STUTTGART |
> > >> >> > > PARIS   | SINGAPORE | SYDNEY
> > >> >> > >
> > >> >> >
> > >> >
> > >> >
> > >> >
> > >> >--
> > >> >
> > >> >Jarek Potiuk
> > >> >Polidea | Principal Software Engineer
> > >> >
> > >> >M: +48 660 796 129
> >
>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48660796129>
> [image: Polidea] <https://www.polidea.com/>
>

Re: [PROPOSAL][AIP-36 DAG Versioning]

Posted by Jarek Potiuk <Ja...@polidea.com>.
I fully agree with the "user" not having to know any of the "wheel'
details. Similarly as they do not have to know python interpreter or the
underlying libc library details. This  all should be hidden from the users.

I think the wheels API that we might have there, does not have to be
user-facing. We could - rather easily - make a client that points to a DAG
file and builds appropriate wheel package under-the-hood and submits it. I
reallly doubt any of the users will directly use the API to submit DAGs -
they will use some clients built on top of it.

I think we should separate the user side form the implementation -
similarly as we do not expect the users to know any details on how "DAG
Fetcher" should work - in any case with the DAG fetcher, we need to define
how DAG fetcher will make sure about "atomicity" anyway - how to make sure
that you get a "consistent" version of all the dependent python files when
you fetch them? This is the part of DAG fetcher that i do not like because
it assumes that "someone else" maintains the consistency and provides the
"consistent view" somewhere on the "DAG Server" side (whatever the server
side is).

There were many ideas about some kind of manifest describing the files etc,
but I think all of that depends on some kind of ability of providing a
"snapshot" of files that will be consistent set to execute. With 'DAG
Fetcher" this is somthing that "DAG Fetching server" has to provide. It's
super easy if that "server" is GIT - we already use it for GIT sync. But
it's rather difficult to provide a good abstraction for it for "generic"
DAG fetcher.

IMHO this is far easier to provide such consistent set at a "submission
time". In pretty-much all cases, the user submitting the job already has
consistent set of python files that the DAG uses. This is pretty much
given. I think the job of the "submission" mechanism is to make a
"snapshot" out of that consistent set and submit this snapshot, rather than
individual files. Git provides it out of the box, but if we want to be
generic - I see no other way than to build such "snapshot" locally. And
Wheels seems like a very good candidate - if only it's an implementation
detail and will be hidden from the users.

J.




On Tue, Aug 11, 2020 at 8:33 PM Ash Berlin-Taylor <as...@apache.org> wrote:

> Anything to doing with the process of building wheels should be a "power
> user" only feature, and should not be required for many users - many many
> users of airflow are not primarily Python developers, but data scientists,
> and needing them to understand anything about the python build toolchain is
> too much of a learning curve for the benefit.
>
> After all it is very rare that people hit the multiple concurrent versions
> of a dag.
>
> -ash
>
> On 10 August 2020 17:37:32 BST, Tomasz Urbaszek <tu...@apache.org>
> wrote:
> >I like the idea of wheels as this is probably the "most pythonic"
> >solution. And "DAG version" is not only defined by DAG code but also
> >by all dependencies the DAG uses (custom functions, libraries etc) and
> >it seems that wheels can address that.
> >
> >However, I second Ash - keeping wheels in db doesn't sound good. In my
> >opinion, DAG fetcher is the right solution and the idea surfaces every
> >time we talk about serialization. This abstraction has a lot of pros
> >as it allows a lot of customization (wheels, local fs, remote fs,
> >wheels etc).
> >
> >Apart from that, if we decided to use wheels we should provide a CLI
> >command to ease the process of building them. Also, I'm wondering
> >about developers' workflow. Moving between code of different DAG
> >version sounds easy if you use git but... what if someone doesn't use
> >it?
> >
> >Tomek
> >
> >
> >On Sat, Aug 8, 2020 at 9:49 AM Ash Berlin-Taylor <as...@apache.org>
> >wrote:
> >>
> >> Quick comment (as I'm still mostly on paternity leave):
> >>
> >> Storing wheels in the db sounds like a bad Idea to me, especially if
> >we need to store deps in there too (and if we don't store deps, then
> >they are incomplete) - they could get very large, and I've stored blobs
> >of ~10mb in postgres before: I don't recommend it. It "works" but
> >operating it is tricky.
> >>
> >>
> >>
> >> > the API could simply accept "Wheel file + the Dag id"
> >>
> >> This sounds like a huge security risk.
> >>
> >>
> >> My main concern with this idea is that it seems a lot of complexity
> >we are putting on users. Doubly so if they are already using docker
> >where there already exists an Ideal packaging and distribution that
> >could contain dag + needed code.
> >>
> >> (Sorry for the brevity)
> >>
> >> -ash
> >>
> >>
> >> On 2 August 2020 08:47:39 BST, Jarek Potiuk
> ><Ja...@polidea.com> wrote:
> >> >Few points from my sid (and proposal!):
> >> >
> >> >1) Agree with Max -  with a rather strong NO for pickles (however,
> >> >indeed cloudpickle solves some of the problems). Pickles came up in
> >> >our discussion in Polidea recently and the overall message was "no".
> >I
> >> >agree with Max here - if we can ship python code, turning that into
> >> >pickle for transit makes little sense to me and brings a plethora of
> >> >problems.
> >> >
> >> >2) I think indeed the versioning solution should treat the "DagRun"
> >> >structure atomically. While I see why we would like to go with the
> >> >UI/Scheduler only first rather than implementing them in the
> >workers,
> >> >adding the "mixed version" is where it breaks down IMHO. Reasoning
> >> >about such "mixed version" dag is next to impossible. The current
> >> >behavior is not well defined and non-deterministic (depends on
> >> >scheduler delays, syncing, type of deployment, restarts of the works
> >> >etc.) we are moving it up to UI (thus users) rather than solving the
> >> >problem. So I am not a big fan of this and would rather solve it
> >> >"well" with atomicity.
> >> >
> >> >3) I see the point of Dan as well - we had many discussions and many
> >> >times the idea about "submitting" the DAG for execution via the API
> >> >came up - and it makes sense IMHO.
> >> >
> >> >Proposal: Implement full versioning with code shipping via DB wheels
> >> >BLOB (akin to serialized DAGs).
> >> >
> >> >I understand that the big issue is how to actually "ship" the code
> >to
> >> >the worker. And - maybe a wild idea - we can kill several birds with
> >> >the same stone.
> >> >
> >> >There were plenty of discussions on how we could do that but one was
> >> >never truly explored - using wheel packages.
> >> >
> >> >For those who do not know them, there is the PEP:
> >> >https://www.python.org/dev/peps/pep-0427/
> >> >
> >> >Wheels allow to "package" python code in a standard way. They are
> >> >portable ("purelib" + contain .py rather than .pyc code), they have
> >> >metadata, versioning information, they can be signed for security,
> >> >They can contain other packages or python code, Why don't we let
> >> >scheduler to pack the fingerprinted version of the DAG in a .whl and
> >> >store it as a blob in a DB next to the serialized form?
> >> >
> >> >There were concerns about the size of the code to keep in the DB -
> >but
> >> >we already use the DB for serialized DAGs and it works fine (I
> >believe
> >> >we only need to add compressing of the JSon serialized form - as
> >we've
> >> >learned from AirBnb during their talk at the Airflow Summit - wheels
> >> >are already compressed). Also - each task will only need the
> >> >particular "version" of one DAG so even if we keep many of them in
> >the
> >> >DB, the old version will pretty soon go "cold" and will never be
> >> >retrieved (and most DBs will handle it well with caching/indexes).
> >> >
> >> >And if we want to add "callables" from other files - there is
> >nothing
> >> >to stop the person who defines dag to add list of files that should
> >be
> >> >packaged together with the main DAG file (additional_python_files =
> >> >["common/my_fantastic_library.py"] in DAG constructor). Or we could
> >> >auto-add all files after the DAG gets imported (i.e. package
> >> >automatically all files that are imported for that particular DAG
> >from
> >> >the "dags" folder"). That should be rather easy.
> >> >
> >> >This way we could ship the code to workers for the exact version
> >that
> >> >the DagRun uses. And they can be cached and unpacked/installed to a
> >> >virtualenv for the execution of that single task. That should be
> >super
> >> >quick. Such virtualenv can be wiped out after execution.
> >> >
> >> >Then we got what Max wants (atomicity of DagRuns) and what Dan wants
> >> >(the API could simply accept "Wheel file + the Dag id". We have the
> >> >isolation between tasks running on the same worker (based on
> >> >virtualenv) so that each process in the same worker can run a
> >> >different version of the same Dag. We have much less confusion for
> >the
> >> >UI.
> >> >
> >> >Extra bonus 1: we can expand it to package different dependencies in
> >> >the wheels as well - so that if an operator requires a different
> >> >(newer) version of a python library, it could be packaged together
> >> >with the DAG in the same .whl file. This is also a highly requested
> >> >feature.
> >> >Extra bonus 2: workers will stop depending on the DAG file mount (!)
> >> >which was our long term goal and indeed as Dan mentioned - a great
> >> >step towards multi-tenancy.
> >> >
> >> >J.
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >On Fri, Jul 31, 2020 at 6:41 AM Maxime Beauchemin
> >> ><ma...@gmail.com> wrote:
> >> >>
> >> >> Having tried it early on, I'd advocate pretty strongly against
> >> >pickles and
> >> >> would rather not get too deep into the why here. Short story is
> >they
> >> >can
> >> >> pull the entire memory space or much more than you want, and it's
> >> >> impossible to reason about where they end. For that reason and
> >other
> >> >> reasons, they're a security issue. Oh and some objects are not
> >> >picklable
> >> >> (Jinja templates! to name a problematic one...). I've also seen
> >> >> secret-related classes that raise when pickled (thank god!).
> >> >>
> >> >> About callback and other things like that, it's quite a puzzle in
> >> >python.
> >> >> One solution would be to point to a python namespace
> >> >> callback="preset.airflow_utils.slack_callback" and assume the
> >> >function has
> >> >> to exist in the remote interpreter. Personally I like the
> >DagFetcher
> >> >idea
> >> >> (it could be great to get a pointer to that mailing list thread
> >> >here),
> >> >> specifically the GitDagFetcher. I don't know how [un]reasonable it
> >> >is, but
> >> >> I hate pickles so much that shipping source code around seems much
> >> >more
> >> >> reasonable to me. I think out there there's a talk from Mike Star
> >> >about
> >> >> Dataswarm at FB and he may mention how their workers may git
> >shallow
> >> >clone
> >> >> the pipeline repo. Or maybe they use that "beautifully ugly" hack
> >to
> >> >use
> >> >> a gitfs fuse [file system in user space] on the worker [could get
> >> >deeper
> >> >> into that, not sure how reasonable that is either].
> >> >>
> >> >> About fingerprints, a simple `start_date = datetime.now() -
> >> >timedelta(1)`
> >> >> may lead to a never-repeating fingerprint. From memory the spec
> >> >doesn't
> >> >> list out the properties considered to build the hash. It be
> >helpful
> >> >to
> >> >> specify and review that list.
> >> >>
> >> >> Max
> >> >>
> >> >> On Wed, Jul 29, 2020 at 5:20 AM Kaxil Naik <ka...@gmail.com>
> >> >wrote:
> >> >>
> >> >> > Thanks, both Max and Dan for your comments, please check my
> >reply
> >> >below:
> >> >> >
> >> >> >
> >> >> > >  Personally I vote for a DAG version to be pinned and
> >consistent
> >> >for the
> >> >> > > duration of the DAG run. Some of the reasons why:
> >> >> > > - it's easier to reason about, and therefore visualize and
> >> >troubleshoot
> >> >> > > - it prevents some cases where dependencies are never met
> >> >> > > - it prevents the explosion of artifact/metadata (one
> >> >serialization per
> >> >> > > dagrun as opposed to one per scheduler cycle) in the case of a
> >> >dynamic
> >> >> > DAG
> >> >> > > whose fingerprint is never the same.
> >> >> >
> >> >> >
> >> >> > In this AIP, we were only looking to fix the current "Viewing
> >> >behaviour"
> >> >> > and
> >> >> > we were intentionally not changing the execution behaviour.
> >> >> > The change you are suggesting means we need to introduce DAG
> >> >Versioning for
> >> >> > the
> >> >> > workers too. This will need more work as can't use the
> >Serialised
> >> >> > Representation
> >> >> > to run the task since users could use custom modules in a
> >different
> >> >part of
> >> >> > code,
> >> >> > example the PythonOperator has python_callable that allows
> >running
> >> >any
> >> >> > arbitrary code.
> >> >> > A similar case is with the *on_*_callbacks* defined on DAG.
> >> >> >
> >> >> > Based on the current scope of the AIP, we still plan to use the
> >> >actual DAG
> >> >> > files for the
> >> >> > execution and not use Serialized DAGs for the workers.
> >> >> >
> >> >> > To account for all the custom modules we will have to start
> >looking
> >> >at
> >> >> > pickle (cloudpickle).
> >> >> >
> >> >> > I'm certain that there are lots of
> >> >> > > those DAGs out there, and that it will overwhelm the metadata
> >> >database,
> >> >> > and
> >> >> > > confuse the users. For an hourly DAG is would mean 24 artifact
> >> >per day
> >> >> > > instead of 1000+
> >> >> >
> >> >> >
> >> >> > What kind of dynamic DAGs are we talking about here, I would
> >think
> >> >the DAG
> >> >> > signature won't change
> >> >> > but I might be wrong, can you give an example, please.
> >> >> >
> >> >> > If backwards compatibility in behavior is a concern, I'd
> >recommend
> >> >adding a
> >> >> > > flag to the DAG class and/or config and make sure we're doing
> >the
> >> >right
> >> >> > > thing by default. People who want backward compatibility would
> >> >have to
> >> >> > > change that default. But again, that's a lot of extra and
> >> >confusing
> >> >> > > complexity that will likely be the source of bugs and user
> >> >confusion.
> >> >> > > Having a clear, easy to reason about execution model is super
> >> >important.
> >> >> >
> >> >> > Think about visualizing a DAG that shapeshifted 5 times during
> >its
> >> >> > > execution, how does anyone make sense of that?
> >> >> >
> >> >> >
> >> >> > Wouldn't that be an edge case? How often would someone change
> >the
> >> >DAG
> >> >> > structure in the middle of
> >> >> > a DAG execution. And since if they do change, the Graph View
> >should
> >> >show
> >> >> > all the tasks that were
> >> >> > run, if it just shows based on the latest version, the behaviour
> >> >would be
> >> >> > the same as now.
> >> >> >
> >> >> > --------
> >> >> >
> >> >> > Strongly agree with Max's points, also I feel the right way to
> >go
> >> >about
> >> >> > > this is instead of Airflow schedulers/webservers/workers
> >reading
> >> >DAG
> >> >> > Python
> >> >> > > files, they would instead read from serialized representations
> >of
> >> >the
> >> >> > DAGs
> >> >> > > (e.g. json representation in the Airflow DB). Instead of DAG
> >> >owners
> >> >> > pushing
> >> >> > > their DAG files to the Airflow components via varying
> >mechanisms
> >> >(e.g.
> >> >> > > git), they would instead call an Airflow CLI to push the
> >> >serialized DAG
> >> >> > > representations to the DB, and for things like dynamic DAGs
> >you
> >> >could
> >> >> > > populate them from a DAG or another service.
> >> >> >
> >> >> >
> >> >> > Airflow Webserver and the Scheduler will definitely read from
> >the
> >> >> > Serialized representation as
> >> >> > they don't need all the code from the DAG files.
> >> >> >
> >> >> > While the workers definitely need access to DAG files as the
> >> >> > tasks/operators would be using
> >> >> > code form custom modules and classes which are required to run
> >the
> >> >tasks.
> >> >> >
> >> >> > If we do want to go down that route we will have to use
> >something
> >> >like
> >> >> > cloudpickle that serializes
> >> >> > entire DAG file and their dependencies. And also ensure that
> >> >someone is not
> >> >> > able to change the pickled
> >> >> > source when sending from executor to the worker as that poses a
> >big
> >> >> > security risk.
> >> >> >
> >> >> > - Kaxil
> >> >> >
> >> >> > On Wed, Jul 29, 2020 at 12:43 PM Jacob Ward
> ><jw...@brandwatch.com>
> >> >wrote:
> >> >> >
> >> >> > > I came here to say what Max has said, only less eloquently.
> >> >> > >
> >> >> > > I do have one concern with locking the version for a single
> >run.
> >> >> > Currently
> >> >> > > it is possible for a user to create a dag which intentionally
> >> >changes as
> >> >> > a
> >> >> > > dag executes, i.e. dynamically creating a task for the dag
> >during
> >> >a run
> >> >> > by
> >> >> > > modifying external data, but this change would prevent that.
> >I'm
> >> >of the
> >> >> > > opinion that this situation is bad practice anyway so it
> >doesn't
> >> >matter
> >> >> > if
> >> >> > > we make it impossible to do, but others may disagree.
> >> >> > >
> >> >> > > On Tue, 28 Jul 2020 at 17:08, Dan Davydov
> >> ><dd...@twitter.com.invalid>
> >> >> > > wrote:
> >> >> > >
> >> >> > > > Strongly agree with Max's points, also I feel the right way
> >to
> >> >go about
> >> >> > > > this is instead of Airflow schedulers/webservers/workers
> >> >reading DAG
> >> >> > > Python
> >> >> > > > files, they would instead read from serialized
> >representations
> >> >of the
> >> >> > > DAGs
> >> >> > > > (e.g. json representation in the Airflow DB). Instead of DAG
> >> >owners
> >> >> > > pushing
> >> >> > > > their DAG files to the Airflow components via varying
> >> >mechanisms (e.g.
> >> >> > > > git), they would instead call an Airflow CLI to push the
> >> >serialized DAG
> >> >> > > > representations to the DB, and for things like dynamic DAGs
> >you
> >> >could
> >> >> > > > populate them from a DAG or another service.
> >> >> > > >
> >> >> > > > This would also enable other features like stronger
> >> >> > > security/multi-tenancy.
> >> >> > > >
> >> >> > > > On Tue, Jul 28, 2020 at 6:44 PM Maxime Beauchemin <
> >> >> > > > maximebeauchemin@gmail.com> wrote:
> >> >> > > >
> >> >> > > > > > "mixed version"
> >> >> > > > >
> >> >> > > > > Personally I vote for a DAG version to be pinned and
> >> >consistent for
> >> >> > the
> >> >> > > > > duration of the DAG run. Some of the reasons why:
> >> >> > > > > - it's easier to reason about, and therefore visualize and
> >> >> > troubleshoot
> >> >> > > > > - it prevents some cases where dependencies are never met
> >> >> > > > > - it prevents the explosion of artifact/metadata (one
> >> >serialization
> >> >> > per
> >> >> > > > > dagrun as opposed to one per scheduler cycle) in the case
> >of
> >> >a
> >> >> > dynamic
> >> >> > > > DAG
> >> >> > > > > whose fingerprint is never the same. I'm certain that
> >there
> >> >are lots
> >> >> > of
> >> >> > > > > those DAGs out there, and that it will overwhelm the
> >metadata
> >> >> > database,
> >> >> > > > and
> >> >> > > > > confuse the users. For an hourly DAG is would mean 24
> >> >artifact per
> >> >> > day
> >> >> > > > > instead of 1000+
> >> >> > > > >
> >> >> > > > > If backwards compatibility in behavior is a concern, I'd
> >> >recommend
> >> >> > > > adding a
> >> >> > > > > flag to the DAG class and/or config and make sure we're
> >doing
> >> >the
> >> >> > right
> >> >> > > > > thing by default. People who want backward compatibility
> >> >would have
> >> >> > to
> >> >> > > > > change that default. But again, that's a lot of extra and
> >> >confusing
> >> >> > > > > complexity that will likely be the source of bugs and user
> >> >confusion.
> >> >> > > > > Having a clear, easy to reason about execution model is
> >super
> >> >> > > important.
> >> >> > > > >
> >> >> > > > > Think about visualizing a DAG that shapeshifted 5 times
> >> >during its
> >> >> > > > > execution, how does anyone make sense of that?
> >> >> > > > >
> >> >> > > > > Max
> >> >> > > > >
> >> >> > > > > On Tue, Jul 28, 2020 at 3:14 AM Kaxil Naik
> >> ><ka...@gmail.com>
> >> >> > > wrote:
> >> >> > > > >
> >> >> > > > > > Thanks Max for your comments.
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > > *DAG Fingerprinting: *this can be tricky, especially in
> >> >regards to
> >> >> > > > > dynamic
> >> >> > > > > > > DAGs, where in some cases each parsing of the DAG can
> >> >result in a
> >> >> > > > > > different
> >> >> > > > > > > fingerprint. I think DAG and tasks attributes are left
> >> >out from
> >> >> > the
> >> >> > > > > > > proposal that should be considered as part of the
> >> >fingerprint,
> >> >> > like
> >> >> > > > > > trigger
> >> >> > > > > > > rules or task start/end datetime. We should do a full
> >> >pass of all
> >> >> > > DAG
> >> >> > > > > > > arguments and make sure we're not forgetting anything
> >> >that can
> >> >> > > change
> >> >> > > > > > > scheduling logic. Also, let's be careful that
> >something
> >> >as simple
> >> >> > > as
> >> >> > > > a
> >> >> > > > > > > dynamic start or end date on a task could lead to a
> >> >different
> >> >> > > version
> >> >> > > > > > each
> >> >> > > > > > > time you parse.
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > > The short version of Dag Fingerprinting would be
> >> >> > > > > > just a hash of the Serialized DAG.
> >> >> > > > > >
> >> >> > > > > > *Example DAG*: https://imgur.com/TVuoN3p
> >> >> > > > > > *Example Serialized DAG*: https://imgur.com/LmA2Bpr
> >> >> > > > > >
> >> >> > > > > > It contains all the task & DAG parameters. When they
> >> >change,
> >> >> > > Scheduler
> >> >> > > > > > writes
> >> >> > > > > > a new version of Serialized DAGs to the DB. The
> >Webserver
> >> >then
> >> >> > reads
> >> >> > > > the
> >> >> > > > > > DAGs from the DB.
> >> >> > > > > >
> >> >> > > > > > I'd recommend limiting serialization/storage of one
> >version
> >> >> > > > > > > per DAG Run, as opposed to potentially everytime the
> >DAG
> >> >is
> >> >> > parsed
> >> >> > > -
> >> >> > > > > once
> >> >> > > > > > > the version for a DAG run is pinned, fingerprinting is
> >> >not
> >> >> > > > re-evaluated
> >> >> > > > > > > until the next DAG run is ready to get created.
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > > This is to handle Scenario 3 where a DAG structure is
> >> >changed
> >> >> > > mid-way.
> >> >> > > > > > Since we don't intend to
> >> >> > > > > > change the execution behaviour, if we limit Storage of 1
> >> >version
> >> >> > per
> >> >> > > > DAG,
> >> >> > > > > > it won't actually show what
> >> >> > > > > > was run.
> >> >> > > > > >
> >> >> > > > > > Example Dag v1: Task A -> Task B -> Task C
> >> >> > > > > > The worker has completed the execution of Task B and is
> >> >just about
> >> >> > to
> >> >> > > > > > complete the execution of Task B.
> >> >> > > > > >
> >> >> > > > > > The 2nd version of DAG is deployed: Task A -> Task D
> >> >> > > > > > Now Scheduler queued Task D and it will run to
> >completion.
> >> >(Task C
> >> >> > > > won't
> >> >> > > > > > run)
> >> >> > > > > >
> >> >> > > > > > In this case, "the actual representation of the DAG"
> >that
> >> >run is
> >> >> > > > neither
> >> >> > > > > v1
> >> >> > > > > > nor v2 but a "mixed version"
> >> >> > > > > >  (Task A -> Task B -> Task D). The plan is that the
> >> >Scheduler will
> >> >> > > > create
> >> >> > > > > > this "mixed version" based on what ran
> >> >> > > > > > and the Graph View would show this "mixed version".
> >> >> > > > > >
> >> >> > > > > > There would also be a toggle button on the Graph View to
> >> >select v1
> >> >> > or
> >> >> > > > v2
> >> >> > > > > > where the tasks will be highlighted to show
> >> >> > > > > > that a particular task was in v1 or v2 as shown in
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >>
> >>
> https://cwiki.apache.org/confluence/download/attachments/158868919/Picture%201.png?version=2&modificationDate=1595612863000&api=v2
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > > *Visualizing change in the tree view:* I think this is
> >very
> >> >complex
> >> >> > > and
> >> >> > > > > > > many things can make this view impossible to render
> >(task
> >> >> > > dependency
> >> >> > > > > > > reversal, cycles across versions, ...). Maybe a better
> >> >visual
> >> >> > > > approach
> >> >> > > > > > > would be to render independent, individual tree views
> >for
> >> >each
> >> >> > DAG
> >> >> > > > > > version
> >> >> > > > > > > (side by side), and doing best effort aligning the
> >tasks
> >> >across
> >> >> > > > blocks
> >> >> > > > > > and
> >> >> > > > > > > "linking" tasks with lines across blocks when
> >necessary.
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > > Agreed, the plan is to do the best effort aligning.
> >> >> > > > > > At this point in time, task additions to the end of the
> >DAG
> >> >are
> >> >> > > > expected
> >> >> > > > > to
> >> >> > > > > > be compatible,
> >> >> > > > > > but changes to task structure within the DAG may cause
> >the
> >> >tree
> >> >> > view
> >> >> > > > not
> >> >> > > > > to
> >> >> > > > > > incorporate “old” and “new” in the same view, hence that
> >> >won't be
> >> >> > > > shown.
> >> >> > > > > >
> >> >> > > > > > Regards,
> >> >> > > > > > Kaxil
> >> >> > > > > >
> >> >> > > > > > On Mon, Jul 27, 2020 at 6:02 PM Maxime Beauchemin <
> >> >> > > > > > maximebeauchemin@gmail.com> wrote:
> >> >> > > > > >
> >> >> > > > > > > Some notes and ideas:
> >> >> > > > > > >
> >> >> > > > > > > *DAG Fingerprinting: *this can be tricky, especially
> >in
> >> >regards
> >> >> > to
> >> >> > > > > > dynamic
> >> >> > > > > > > DAGs, where in some cases each parsing of the DAG can
> >> >result in a
> >> >> > > > > > different
> >> >> > > > > > > fingerprint. I think DAG and tasks attributes are left
> >> >out from
> >> >> > the
> >> >> > > > > > > proposal that should be considered as part of the
> >> >fingerprint,
> >> >> > like
> >> >> > > > > > trigger
> >> >> > > > > > > rules or task start/end datetime. We should do a full
> >> >pass of all
> >> >> > > DAG
> >> >> > > > > > > arguments and make sure we're not forgetting anything
> >> >that can
> >> >> > > change
> >> >> > > > > > > scheduling logic. Also, let's be careful that
> >something
> >> >as simple
> >> >> > > as
> >> >> > > > a
> >> >> > > > > > > dynamic start or end date on a task could lead to a
> >> >different
> >> >> > > version
> >> >> > > > > > each
> >> >> > > > > > > time you parse. I'd recommend limiting
> >> >serialization/storage of
> >> >> > one
> >> >> > > > > > version
> >> >> > > > > > > per DAG Run, as opposed to potentially everytime the
> >DAG
> >> >is
> >> >> > parsed
> >> >> > > -
> >> >> > > > > once
> >> >> > > > > > > the version for a DAG run is pinned, fingerprinting is
> >> >not
> >> >> > > > re-evaluated
> >> >> > > > > > > until the next DAG run is ready to get created.
> >> >> > > > > > >
> >> >> > > > > > > *Visualizing change in the tree view:* I think this is
> >> >very
> >> >> > complex
> >> >> > > > and
> >> >> > > > > > > many things can make this view impossible to render
> >(task
> >> >> > > dependency
> >> >> > > > > > > reversal, cycles across versions, ...). Maybe a better
> >> >visual
> >> >> > > > approach
> >> >> > > > > > > would be to render independent, individual tree views
> >for
> >> >each
> >> >> > DAG
> >> >> > > > > > version
> >> >> > > > > > > (side by side), and doing best effort aligning the
> >tasks
> >> >across
> >> >> > > > blocks
> >> >> > > > > > and
> >> >> > > > > > > "linking" tasks with lines across blocks when
> >necessary.
> >> >> > > > > > >
> >> >> > > > > > > On Fri, Jul 24, 2020 at 12:46 PM Vikram Koka <
> >> >> > vikram@astronomer.io
> >> >> > > >
> >> >> > > > > > wrote:
> >> >> > > > > > >
> >> >> > > > > > > > Team,
> >> >> > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > > > We just created 'AIP-36 DAG Versioning' on
> >Confluence
> >> >and would
> >> >> > > > very
> >> >> > > > > > much
> >> >> > > > > > > > appreciate feedback and suggestions from the
> >community.
> >> >> > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > >
> >> >> > > > > >
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >>
> >>
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-36+DAG+Versioning
> >> >> > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > > > The DAG Versioning concept has been discussed on
> >> >multiple
> >> >> > > occasions
> >> >> > > > > in
> >> >> > > > > > > the
> >> >> > > > > > > > past and has been a topic highlighted as part of
> >> >Airflow 2.0 as
> >> >> > > > well.
> >> >> > > > > > We
> >> >> > > > > > > at
> >> >> > > > > > > > Astronomer have heard data engineers at several
> >> >enterprises ask
> >> >> > > > about
> >> >> > > > > > > this
> >> >> > > > > > > > feature as well, for easier debugging when changes
> >are
> >> >made to
> >> >> > > DAGs
> >> >> > > > > as
> >> >> > > > > > a
> >> >> > > > > > > > result of evolving business needs.
> >> >> > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > > > As described in the AIP, we have a proposal focused
> >on
> >> >ensuring
> >> >> > > > that
> >> >> > > > > > the
> >> >> > > > > > > > visibility behaviour of Airflow is correct, without
> >> >changing
> >> >> > the
> >> >> > > > > > > execution
> >> >> > > > > > > > behaviour. We considered changing the execution
> >> >behaviour as
> >> >> > > well,
> >> >> > > > > but
> >> >> > > > > > > > decided that the risks in changing execution
> >behavior
> >> >were too
> >> >> > > high
> >> >> > > > > as
> >> >> > > > > > > > compared to the benefits and therefore decided to
> >limit
> >> >the
> >> >> > scope
> >> >> > > > to
> >> >> > > > > > only
> >> >> > > > > > > > making sure that the visibility was correct.
> >> >> > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > > > We would like to attempt this based on our
> >experience
> >> >running
> >> >> > > > Airflow
> >> >> > > > > > as
> >> >> > > > > > > a
> >> >> > > > > > > > service. We believe that this benefits Airflow as a
> >> >project and
> >> >> > > the
> >> >> > > > > > > > development experience of data engineers using
> >Airflow
> >> >across
> >> >> > the
> >> >> > > > > > world.
> >> >> > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > > >  Any feedback, suggestions, and comments would be
> >> >greatly
> >> >> > > > > appreciated.
> >> >> > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > > > Best Regards,
> >> >> > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > > > Kaxil Naik, Ryan Hamilton, Ash Berlin-Taylor, and
> >> >Vikram Koka
> >> >> > > > > > > >
> >> >> > > > > > >
> >> >> > > > > >
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> > >
> >> >> > > --
> >> >> > >
> >> >> > > Jacob Ward    |    Graduate Data Infrastructure Engineer
> >> >> > >
> >> >> > > jward@brandwatch.com
> >> >> > >
> >> >> > >
> >> >> > > NEW YORK   | BOSTON   | BRIGHTON   | LONDON   | BERLIN |
> >> >STUTTGART |
> >> >> > > PARIS   | SINGAPORE | SYDNEY
> >> >> > >
> >> >> >
> >> >
> >> >
> >> >
> >> >--
> >> >
> >> >Jarek Potiuk
> >> >Polidea | Principal Software Engineer
> >> >
> >> >M: +48 660 796 129
>


-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>

Re: [PROPOSAL][AIP-36 DAG Versioning]

Posted by Ash Berlin-Taylor <as...@apache.org>.
Anything to doing with the process of building wheels should be a "power user" only feature, and should not be required for many users - many many users of airflow are not primarily Python developers, but data scientists, and needing them to understand anything about the python build toolchain is too much of a learning curve for the benefit.

After all it is very rare that people hit the multiple concurrent versions of a dag.

-ash

On 10 August 2020 17:37:32 BST, Tomasz Urbaszek <tu...@apache.org> wrote:
>I like the idea of wheels as this is probably the "most pythonic"
>solution. And "DAG version" is not only defined by DAG code but also
>by all dependencies the DAG uses (custom functions, libraries etc) and
>it seems that wheels can address that.
>
>However, I second Ash - keeping wheels in db doesn't sound good. In my
>opinion, DAG fetcher is the right solution and the idea surfaces every
>time we talk about serialization. This abstraction has a lot of pros
>as it allows a lot of customization (wheels, local fs, remote fs,
>wheels etc).
>
>Apart from that, if we decided to use wheels we should provide a CLI
>command to ease the process of building them. Also, I'm wondering
>about developers' workflow. Moving between code of different DAG
>version sounds easy if you use git but... what if someone doesn't use
>it?
>
>Tomek
>
>
>On Sat, Aug 8, 2020 at 9:49 AM Ash Berlin-Taylor <as...@apache.org>
>wrote:
>>
>> Quick comment (as I'm still mostly on paternity leave):
>>
>> Storing wheels in the db sounds like a bad Idea to me, especially if
>we need to store deps in there too (and if we don't store deps, then
>they are incomplete) - they could get very large, and I've stored blobs
>of ~10mb in postgres before: I don't recommend it. It "works" but
>operating it is tricky.
>>
>>
>>
>> > the API could simply accept "Wheel file + the Dag id"
>>
>> This sounds like a huge security risk.
>>
>>
>> My main concern with this idea is that it seems a lot of complexity
>we are putting on users. Doubly so if they are already using docker
>where there already exists an Ideal packaging and distribution that
>could contain dag + needed code.
>>
>> (Sorry for the brevity)
>>
>> -ash
>>
>>
>> On 2 August 2020 08:47:39 BST, Jarek Potiuk
><Ja...@polidea.com> wrote:
>> >Few points from my sid (and proposal!):
>> >
>> >1) Agree with Max -  with a rather strong NO for pickles (however,
>> >indeed cloudpickle solves some of the problems). Pickles came up in
>> >our discussion in Polidea recently and the overall message was "no".
>I
>> >agree with Max here - if we can ship python code, turning that into
>> >pickle for transit makes little sense to me and brings a plethora of
>> >problems.
>> >
>> >2) I think indeed the versioning solution should treat the "DagRun"
>> >structure atomically. While I see why we would like to go with the
>> >UI/Scheduler only first rather than implementing them in the
>workers,
>> >adding the "mixed version" is where it breaks down IMHO. Reasoning
>> >about such "mixed version" dag is next to impossible. The current
>> >behavior is not well defined and non-deterministic (depends on
>> >scheduler delays, syncing, type of deployment, restarts of the works
>> >etc.) we are moving it up to UI (thus users) rather than solving the
>> >problem. So I am not a big fan of this and would rather solve it
>> >"well" with atomicity.
>> >
>> >3) I see the point of Dan as well - we had many discussions and many
>> >times the idea about "submitting" the DAG for execution via the API
>> >came up - and it makes sense IMHO.
>> >
>> >Proposal: Implement full versioning with code shipping via DB wheels
>> >BLOB (akin to serialized DAGs).
>> >
>> >I understand that the big issue is how to actually "ship" the code
>to
>> >the worker. And - maybe a wild idea - we can kill several birds with
>> >the same stone.
>> >
>> >There were plenty of discussions on how we could do that but one was
>> >never truly explored - using wheel packages.
>> >
>> >For those who do not know them, there is the PEP:
>> >https://www.python.org/dev/peps/pep-0427/
>> >
>> >Wheels allow to "package" python code in a standard way. They are
>> >portable ("purelib" + contain .py rather than .pyc code), they have
>> >metadata, versioning information, they can be signed for security,
>> >They can contain other packages or python code, Why don't we let
>> >scheduler to pack the fingerprinted version of the DAG in a .whl and
>> >store it as a blob in a DB next to the serialized form?
>> >
>> >There were concerns about the size of the code to keep in the DB -
>but
>> >we already use the DB for serialized DAGs and it works fine (I
>believe
>> >we only need to add compressing of the JSon serialized form - as
>we've
>> >learned from AirBnb during their talk at the Airflow Summit - wheels
>> >are already compressed). Also - each task will only need the
>> >particular "version" of one DAG so even if we keep many of them in
>the
>> >DB, the old version will pretty soon go "cold" and will never be
>> >retrieved (and most DBs will handle it well with caching/indexes).
>> >
>> >And if we want to add "callables" from other files - there is
>nothing
>> >to stop the person who defines dag to add list of files that should
>be
>> >packaged together with the main DAG file (additional_python_files =
>> >["common/my_fantastic_library.py"] in DAG constructor). Or we could
>> >auto-add all files after the DAG gets imported (i.e. package
>> >automatically all files that are imported for that particular DAG
>from
>> >the "dags" folder"). That should be rather easy.
>> >
>> >This way we could ship the code to workers for the exact version
>that
>> >the DagRun uses. And they can be cached and unpacked/installed to a
>> >virtualenv for the execution of that single task. That should be
>super
>> >quick. Such virtualenv can be wiped out after execution.
>> >
>> >Then we got what Max wants (atomicity of DagRuns) and what Dan wants
>> >(the API could simply accept "Wheel file + the Dag id". We have the
>> >isolation between tasks running on the same worker (based on
>> >virtualenv) so that each process in the same worker can run a
>> >different version of the same Dag. We have much less confusion for
>the
>> >UI.
>> >
>> >Extra bonus 1: we can expand it to package different dependencies in
>> >the wheels as well - so that if an operator requires a different
>> >(newer) version of a python library, it could be packaged together
>> >with the DAG in the same .whl file. This is also a highly requested
>> >feature.
>> >Extra bonus 2: workers will stop depending on the DAG file mount (!)
>> >which was our long term goal and indeed as Dan mentioned - a great
>> >step towards multi-tenancy.
>> >
>> >J.
>> >
>> >
>> >
>> >
>> >
>> >
>> >On Fri, Jul 31, 2020 at 6:41 AM Maxime Beauchemin
>> ><ma...@gmail.com> wrote:
>> >>
>> >> Having tried it early on, I'd advocate pretty strongly against
>> >pickles and
>> >> would rather not get too deep into the why here. Short story is
>they
>> >can
>> >> pull the entire memory space or much more than you want, and it's
>> >> impossible to reason about where they end. For that reason and
>other
>> >> reasons, they're a security issue. Oh and some objects are not
>> >picklable
>> >> (Jinja templates! to name a problematic one...). I've also seen
>> >> secret-related classes that raise when pickled (thank god!).
>> >>
>> >> About callback and other things like that, it's quite a puzzle in
>> >python.
>> >> One solution would be to point to a python namespace
>> >> callback="preset.airflow_utils.slack_callback" and assume the
>> >function has
>> >> to exist in the remote interpreter. Personally I like the
>DagFetcher
>> >idea
>> >> (it could be great to get a pointer to that mailing list thread
>> >here),
>> >> specifically the GitDagFetcher. I don't know how [un]reasonable it
>> >is, but
>> >> I hate pickles so much that shipping source code around seems much
>> >more
>> >> reasonable to me. I think out there there's a talk from Mike Star
>> >about
>> >> Dataswarm at FB and he may mention how their workers may git
>shallow
>> >clone
>> >> the pipeline repo. Or maybe they use that "beautifully ugly" hack
>to
>> >use
>> >> a gitfs fuse [file system in user space] on the worker [could get
>> >deeper
>> >> into that, not sure how reasonable that is either].
>> >>
>> >> About fingerprints, a simple `start_date = datetime.now() -
>> >timedelta(1)`
>> >> may lead to a never-repeating fingerprint. From memory the spec
>> >doesn't
>> >> list out the properties considered to build the hash. It be
>helpful
>> >to
>> >> specify and review that list.
>> >>
>> >> Max
>> >>
>> >> On Wed, Jul 29, 2020 at 5:20 AM Kaxil Naik <ka...@gmail.com>
>> >wrote:
>> >>
>> >> > Thanks, both Max and Dan for your comments, please check my
>reply
>> >below:
>> >> >
>> >> >
>> >> > >  Personally I vote for a DAG version to be pinned and
>consistent
>> >for the
>> >> > > duration of the DAG run. Some of the reasons why:
>> >> > > - it's easier to reason about, and therefore visualize and
>> >troubleshoot
>> >> > > - it prevents some cases where dependencies are never met
>> >> > > - it prevents the explosion of artifact/metadata (one
>> >serialization per
>> >> > > dagrun as opposed to one per scheduler cycle) in the case of a
>> >dynamic
>> >> > DAG
>> >> > > whose fingerprint is never the same.
>> >> >
>> >> >
>> >> > In this AIP, we were only looking to fix the current "Viewing
>> >behaviour"
>> >> > and
>> >> > we were intentionally not changing the execution behaviour.
>> >> > The change you are suggesting means we need to introduce DAG
>> >Versioning for
>> >> > the
>> >> > workers too. This will need more work as can't use the
>Serialised
>> >> > Representation
>> >> > to run the task since users could use custom modules in a
>different
>> >part of
>> >> > code,
>> >> > example the PythonOperator has python_callable that allows
>running
>> >any
>> >> > arbitrary code.
>> >> > A similar case is with the *on_*_callbacks* defined on DAG.
>> >> >
>> >> > Based on the current scope of the AIP, we still plan to use the
>> >actual DAG
>> >> > files for the
>> >> > execution and not use Serialized DAGs for the workers.
>> >> >
>> >> > To account for all the custom modules we will have to start
>looking
>> >at
>> >> > pickle (cloudpickle).
>> >> >
>> >> > I'm certain that there are lots of
>> >> > > those DAGs out there, and that it will overwhelm the metadata
>> >database,
>> >> > and
>> >> > > confuse the users. For an hourly DAG is would mean 24 artifact
>> >per day
>> >> > > instead of 1000+
>> >> >
>> >> >
>> >> > What kind of dynamic DAGs are we talking about here, I would
>think
>> >the DAG
>> >> > signature won't change
>> >> > but I might be wrong, can you give an example, please.
>> >> >
>> >> > If backwards compatibility in behavior is a concern, I'd
>recommend
>> >adding a
>> >> > > flag to the DAG class and/or config and make sure we're doing
>the
>> >right
>> >> > > thing by default. People who want backward compatibility would
>> >have to
>> >> > > change that default. But again, that's a lot of extra and
>> >confusing
>> >> > > complexity that will likely be the source of bugs and user
>> >confusion.
>> >> > > Having a clear, easy to reason about execution model is super
>> >important.
>> >> >
>> >> > Think about visualizing a DAG that shapeshifted 5 times during
>its
>> >> > > execution, how does anyone make sense of that?
>> >> >
>> >> >
>> >> > Wouldn't that be an edge case? How often would someone change
>the
>> >DAG
>> >> > structure in the middle of
>> >> > a DAG execution. And since if they do change, the Graph View
>should
>> >show
>> >> > all the tasks that were
>> >> > run, if it just shows based on the latest version, the behaviour
>> >would be
>> >> > the same as now.
>> >> >
>> >> > --------
>> >> >
>> >> > Strongly agree with Max's points, also I feel the right way to
>go
>> >about
>> >> > > this is instead of Airflow schedulers/webservers/workers
>reading
>> >DAG
>> >> > Python
>> >> > > files, they would instead read from serialized representations
>of
>> >the
>> >> > DAGs
>> >> > > (e.g. json representation in the Airflow DB). Instead of DAG
>> >owners
>> >> > pushing
>> >> > > their DAG files to the Airflow components via varying
>mechanisms
>> >(e.g.
>> >> > > git), they would instead call an Airflow CLI to push the
>> >serialized DAG
>> >> > > representations to the DB, and for things like dynamic DAGs
>you
>> >could
>> >> > > populate them from a DAG or another service.
>> >> >
>> >> >
>> >> > Airflow Webserver and the Scheduler will definitely read from
>the
>> >> > Serialized representation as
>> >> > they don't need all the code from the DAG files.
>> >> >
>> >> > While the workers definitely need access to DAG files as the
>> >> > tasks/operators would be using
>> >> > code form custom modules and classes which are required to run
>the
>> >tasks.
>> >> >
>> >> > If we do want to go down that route we will have to use
>something
>> >like
>> >> > cloudpickle that serializes
>> >> > entire DAG file and their dependencies. And also ensure that
>> >someone is not
>> >> > able to change the pickled
>> >> > source when sending from executor to the worker as that poses a
>big
>> >> > security risk.
>> >> >
>> >> > - Kaxil
>> >> >
>> >> > On Wed, Jul 29, 2020 at 12:43 PM Jacob Ward
><jw...@brandwatch.com>
>> >wrote:
>> >> >
>> >> > > I came here to say what Max has said, only less eloquently.
>> >> > >
>> >> > > I do have one concern with locking the version for a single
>run.
>> >> > Currently
>> >> > > it is possible for a user to create a dag which intentionally
>> >changes as
>> >> > a
>> >> > > dag executes, i.e. dynamically creating a task for the dag
>during
>> >a run
>> >> > by
>> >> > > modifying external data, but this change would prevent that.
>I'm
>> >of the
>> >> > > opinion that this situation is bad practice anyway so it
>doesn't
>> >matter
>> >> > if
>> >> > > we make it impossible to do, but others may disagree.
>> >> > >
>> >> > > On Tue, 28 Jul 2020 at 17:08, Dan Davydov
>> ><dd...@twitter.com.invalid>
>> >> > > wrote:
>> >> > >
>> >> > > > Strongly agree with Max's points, also I feel the right way
>to
>> >go about
>> >> > > > this is instead of Airflow schedulers/webservers/workers
>> >reading DAG
>> >> > > Python
>> >> > > > files, they would instead read from serialized
>representations
>> >of the
>> >> > > DAGs
>> >> > > > (e.g. json representation in the Airflow DB). Instead of DAG
>> >owners
>> >> > > pushing
>> >> > > > their DAG files to the Airflow components via varying
>> >mechanisms (e.g.
>> >> > > > git), they would instead call an Airflow CLI to push the
>> >serialized DAG
>> >> > > > representations to the DB, and for things like dynamic DAGs
>you
>> >could
>> >> > > > populate them from a DAG or another service.
>> >> > > >
>> >> > > > This would also enable other features like stronger
>> >> > > security/multi-tenancy.
>> >> > > >
>> >> > > > On Tue, Jul 28, 2020 at 6:44 PM Maxime Beauchemin <
>> >> > > > maximebeauchemin@gmail.com> wrote:
>> >> > > >
>> >> > > > > > "mixed version"
>> >> > > > >
>> >> > > > > Personally I vote for a DAG version to be pinned and
>> >consistent for
>> >> > the
>> >> > > > > duration of the DAG run. Some of the reasons why:
>> >> > > > > - it's easier to reason about, and therefore visualize and
>> >> > troubleshoot
>> >> > > > > - it prevents some cases where dependencies are never met
>> >> > > > > - it prevents the explosion of artifact/metadata (one
>> >serialization
>> >> > per
>> >> > > > > dagrun as opposed to one per scheduler cycle) in the case
>of
>> >a
>> >> > dynamic
>> >> > > > DAG
>> >> > > > > whose fingerprint is never the same. I'm certain that
>there
>> >are lots
>> >> > of
>> >> > > > > those DAGs out there, and that it will overwhelm the
>metadata
>> >> > database,
>> >> > > > and
>> >> > > > > confuse the users. For an hourly DAG is would mean 24
>> >artifact per
>> >> > day
>> >> > > > > instead of 1000+
>> >> > > > >
>> >> > > > > If backwards compatibility in behavior is a concern, I'd
>> >recommend
>> >> > > > adding a
>> >> > > > > flag to the DAG class and/or config and make sure we're
>doing
>> >the
>> >> > right
>> >> > > > > thing by default. People who want backward compatibility
>> >would have
>> >> > to
>> >> > > > > change that default. But again, that's a lot of extra and
>> >confusing
>> >> > > > > complexity that will likely be the source of bugs and user
>> >confusion.
>> >> > > > > Having a clear, easy to reason about execution model is
>super
>> >> > > important.
>> >> > > > >
>> >> > > > > Think about visualizing a DAG that shapeshifted 5 times
>> >during its
>> >> > > > > execution, how does anyone make sense of that?
>> >> > > > >
>> >> > > > > Max
>> >> > > > >
>> >> > > > > On Tue, Jul 28, 2020 at 3:14 AM Kaxil Naik
>> ><ka...@gmail.com>
>> >> > > wrote:
>> >> > > > >
>> >> > > > > > Thanks Max for your comments.
>> >> > > > > >
>> >> > > > > >
>> >> > > > > > *DAG Fingerprinting: *this can be tricky, especially in
>> >regards to
>> >> > > > > dynamic
>> >> > > > > > > DAGs, where in some cases each parsing of the DAG can
>> >result in a
>> >> > > > > > different
>> >> > > > > > > fingerprint. I think DAG and tasks attributes are left
>> >out from
>> >> > the
>> >> > > > > > > proposal that should be considered as part of the
>> >fingerprint,
>> >> > like
>> >> > > > > > trigger
>> >> > > > > > > rules or task start/end datetime. We should do a full
>> >pass of all
>> >> > > DAG
>> >> > > > > > > arguments and make sure we're not forgetting anything
>> >that can
>> >> > > change
>> >> > > > > > > scheduling logic. Also, let's be careful that
>something
>> >as simple
>> >> > > as
>> >> > > > a
>> >> > > > > > > dynamic start or end date on a task could lead to a
>> >different
>> >> > > version
>> >> > > > > > each
>> >> > > > > > > time you parse.
>> >> > > > > >
>> >> > > > > >
>> >> > > > > >
>> >> > > > > > The short version of Dag Fingerprinting would be
>> >> > > > > > just a hash of the Serialized DAG.
>> >> > > > > >
>> >> > > > > > *Example DAG*: https://imgur.com/TVuoN3p
>> >> > > > > > *Example Serialized DAG*: https://imgur.com/LmA2Bpr
>> >> > > > > >
>> >> > > > > > It contains all the task & DAG parameters. When they
>> >change,
>> >> > > Scheduler
>> >> > > > > > writes
>> >> > > > > > a new version of Serialized DAGs to the DB. The
>Webserver
>> >then
>> >> > reads
>> >> > > > the
>> >> > > > > > DAGs from the DB.
>> >> > > > > >
>> >> > > > > > I'd recommend limiting serialization/storage of one
>version
>> >> > > > > > > per DAG Run, as opposed to potentially everytime the
>DAG
>> >is
>> >> > parsed
>> >> > > -
>> >> > > > > once
>> >> > > > > > > the version for a DAG run is pinned, fingerprinting is
>> >not
>> >> > > > re-evaluated
>> >> > > > > > > until the next DAG run is ready to get created.
>> >> > > > > >
>> >> > > > > >
>> >> > > > > > This is to handle Scenario 3 where a DAG structure is
>> >changed
>> >> > > mid-way.
>> >> > > > > > Since we don't intend to
>> >> > > > > > change the execution behaviour, if we limit Storage of 1
>> >version
>> >> > per
>> >> > > > DAG,
>> >> > > > > > it won't actually show what
>> >> > > > > > was run.
>> >> > > > > >
>> >> > > > > > Example Dag v1: Task A -> Task B -> Task C
>> >> > > > > > The worker has completed the execution of Task B and is
>> >just about
>> >> > to
>> >> > > > > > complete the execution of Task B.
>> >> > > > > >
>> >> > > > > > The 2nd version of DAG is deployed: Task A -> Task D
>> >> > > > > > Now Scheduler queued Task D and it will run to
>completion.
>> >(Task C
>> >> > > > won't
>> >> > > > > > run)
>> >> > > > > >
>> >> > > > > > In this case, "the actual representation of the DAG"
>that
>> >run is
>> >> > > > neither
>> >> > > > > v1
>> >> > > > > > nor v2 but a "mixed version"
>> >> > > > > >  (Task A -> Task B -> Task D). The plan is that the
>> >Scheduler will
>> >> > > > create
>> >> > > > > > this "mixed version" based on what ran
>> >> > > > > > and the Graph View would show this "mixed version".
>> >> > > > > >
>> >> > > > > > There would also be a toggle button on the Graph View to
>> >select v1
>> >> > or
>> >> > > > v2
>> >> > > > > > where the tasks will be highlighted to show
>> >> > > > > > that a particular task was in v1 or v2 as shown in
>> >> > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>>
>>https://cwiki.apache.org/confluence/download/attachments/158868919/Picture%201.png?version=2&modificationDate=1595612863000&api=v2
>> >> > > > > >
>> >> > > > > >
>> >> > > > > >
>> >> > > > > > *Visualizing change in the tree view:* I think this is
>very
>> >complex
>> >> > > and
>> >> > > > > > > many things can make this view impossible to render
>(task
>> >> > > dependency
>> >> > > > > > > reversal, cycles across versions, ...). Maybe a better
>> >visual
>> >> > > > approach
>> >> > > > > > > would be to render independent, individual tree views
>for
>> >each
>> >> > DAG
>> >> > > > > > version
>> >> > > > > > > (side by side), and doing best effort aligning the
>tasks
>> >across
>> >> > > > blocks
>> >> > > > > > and
>> >> > > > > > > "linking" tasks with lines across blocks when
>necessary.
>> >> > > > > >
>> >> > > > > >
>> >> > > > > > Agreed, the plan is to do the best effort aligning.
>> >> > > > > > At this point in time, task additions to the end of the
>DAG
>> >are
>> >> > > > expected
>> >> > > > > to
>> >> > > > > > be compatible,
>> >> > > > > > but changes to task structure within the DAG may cause
>the
>> >tree
>> >> > view
>> >> > > > not
>> >> > > > > to
>> >> > > > > > incorporate “old” and “new” in the same view, hence that
>> >won't be
>> >> > > > shown.
>> >> > > > > >
>> >> > > > > > Regards,
>> >> > > > > > Kaxil
>> >> > > > > >
>> >> > > > > > On Mon, Jul 27, 2020 at 6:02 PM Maxime Beauchemin <
>> >> > > > > > maximebeauchemin@gmail.com> wrote:
>> >> > > > > >
>> >> > > > > > > Some notes and ideas:
>> >> > > > > > >
>> >> > > > > > > *DAG Fingerprinting: *this can be tricky, especially
>in
>> >regards
>> >> > to
>> >> > > > > > dynamic
>> >> > > > > > > DAGs, where in some cases each parsing of the DAG can
>> >result in a
>> >> > > > > > different
>> >> > > > > > > fingerprint. I think DAG and tasks attributes are left
>> >out from
>> >> > the
>> >> > > > > > > proposal that should be considered as part of the
>> >fingerprint,
>> >> > like
>> >> > > > > > trigger
>> >> > > > > > > rules or task start/end datetime. We should do a full
>> >pass of all
>> >> > > DAG
>> >> > > > > > > arguments and make sure we're not forgetting anything
>> >that can
>> >> > > change
>> >> > > > > > > scheduling logic. Also, let's be careful that
>something
>> >as simple
>> >> > > as
>> >> > > > a
>> >> > > > > > > dynamic start or end date on a task could lead to a
>> >different
>> >> > > version
>> >> > > > > > each
>> >> > > > > > > time you parse. I'd recommend limiting
>> >serialization/storage of
>> >> > one
>> >> > > > > > version
>> >> > > > > > > per DAG Run, as opposed to potentially everytime the
>DAG
>> >is
>> >> > parsed
>> >> > > -
>> >> > > > > once
>> >> > > > > > > the version for a DAG run is pinned, fingerprinting is
>> >not
>> >> > > > re-evaluated
>> >> > > > > > > until the next DAG run is ready to get created.
>> >> > > > > > >
>> >> > > > > > > *Visualizing change in the tree view:* I think this is
>> >very
>> >> > complex
>> >> > > > and
>> >> > > > > > > many things can make this view impossible to render
>(task
>> >> > > dependency
>> >> > > > > > > reversal, cycles across versions, ...). Maybe a better
>> >visual
>> >> > > > approach
>> >> > > > > > > would be to render independent, individual tree views
>for
>> >each
>> >> > DAG
>> >> > > > > > version
>> >> > > > > > > (side by side), and doing best effort aligning the
>tasks
>> >across
>> >> > > > blocks
>> >> > > > > > and
>> >> > > > > > > "linking" tasks with lines across blocks when
>necessary.
>> >> > > > > > >
>> >> > > > > > > On Fri, Jul 24, 2020 at 12:46 PM Vikram Koka <
>> >> > vikram@astronomer.io
>> >> > > >
>> >> > > > > > wrote:
>> >> > > > > > >
>> >> > > > > > > > Team,
>> >> > > > > > > >
>> >> > > > > > > >
>> >> > > > > > > >
>> >> > > > > > > > We just created 'AIP-36 DAG Versioning' on
>Confluence
>> >and would
>> >> > > > very
>> >> > > > > > much
>> >> > > > > > > > appreciate feedback and suggestions from the
>community.
>> >> > > > > > > >
>> >> > > > > > > >
>> >> > > > > > > >
>> >> > > > > > > >
>> >> > > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>>
>>https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-36+DAG+Versioning
>> >> > > > > > > >
>> >> > > > > > > >
>> >> > > > > > > >
>> >> > > > > > > > The DAG Versioning concept has been discussed on
>> >multiple
>> >> > > occasions
>> >> > > > > in
>> >> > > > > > > the
>> >> > > > > > > > past and has been a topic highlighted as part of
>> >Airflow 2.0 as
>> >> > > > well.
>> >> > > > > > We
>> >> > > > > > > at
>> >> > > > > > > > Astronomer have heard data engineers at several
>> >enterprises ask
>> >> > > > about
>> >> > > > > > > this
>> >> > > > > > > > feature as well, for easier debugging when changes
>are
>> >made to
>> >> > > DAGs
>> >> > > > > as
>> >> > > > > > a
>> >> > > > > > > > result of evolving business needs.
>> >> > > > > > > >
>> >> > > > > > > >
>> >> > > > > > > > As described in the AIP, we have a proposal focused
>on
>> >ensuring
>> >> > > > that
>> >> > > > > > the
>> >> > > > > > > > visibility behaviour of Airflow is correct, without
>> >changing
>> >> > the
>> >> > > > > > > execution
>> >> > > > > > > > behaviour. We considered changing the execution
>> >behaviour as
>> >> > > well,
>> >> > > > > but
>> >> > > > > > > > decided that the risks in changing execution
>behavior
>> >were too
>> >> > > high
>> >> > > > > as
>> >> > > > > > > > compared to the benefits and therefore decided to
>limit
>> >the
>> >> > scope
>> >> > > > to
>> >> > > > > > only
>> >> > > > > > > > making sure that the visibility was correct.
>> >> > > > > > > >
>> >> > > > > > > >
>> >> > > > > > > > We would like to attempt this based on our
>experience
>> >running
>> >> > > > Airflow
>> >> > > > > > as
>> >> > > > > > > a
>> >> > > > > > > > service. We believe that this benefits Airflow as a
>> >project and
>> >> > > the
>> >> > > > > > > > development experience of data engineers using
>Airflow
>> >across
>> >> > the
>> >> > > > > > world.
>> >> > > > > > > >
>> >> > > > > > > >
>> >> > > > > > > >  Any feedback, suggestions, and comments would be
>> >greatly
>> >> > > > > appreciated.
>> >> > > > > > > >
>> >> > > > > > > >
>> >> > > > > > > >
>> >> > > > > > > > Best Regards,
>> >> > > > > > > >
>> >> > > > > > > >
>> >> > > > > > > > Kaxil Naik, Ryan Hamilton, Ash Berlin-Taylor, and
>> >Vikram Koka
>> >> > > > > > > >
>> >> > > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> > >
>> >> > > --
>> >> > >
>> >> > > Jacob Ward    |    Graduate Data Infrastructure Engineer
>> >> > >
>> >> > > jward@brandwatch.com
>> >> > >
>> >> > >
>> >> > > NEW YORK   | BOSTON   | BRIGHTON   | LONDON   | BERLIN |
>> >STUTTGART |
>> >> > > PARIS   | SINGAPORE | SYDNEY
>> >> > >
>> >> >
>> >
>> >
>> >
>> >--
>> >
>> >Jarek Potiuk
>> >Polidea | Principal Software Engineer
>> >
>> >M: +48 660 796 129

Re: [PROPOSAL][AIP-36 DAG Versioning]

Posted by Tomasz Urbaszek <tu...@apache.org>.
I like the idea of wheels as this is probably the "most pythonic"
solution. And "DAG version" is not only defined by DAG code but also
by all dependencies the DAG uses (custom functions, libraries etc) and
it seems that wheels can address that.

However, I second Ash - keeping wheels in db doesn't sound good. In my
opinion, DAG fetcher is the right solution and the idea surfaces every
time we talk about serialization. This abstraction has a lot of pros
as it allows a lot of customization (wheels, local fs, remote fs,
wheels etc).

Apart from that, if we decided to use wheels we should provide a CLI
command to ease the process of building them. Also, I'm wondering
about developers' workflow. Moving between code of different DAG
version sounds easy if you use git but... what if someone doesn't use
it?

Tomek


On Sat, Aug 8, 2020 at 9:49 AM Ash Berlin-Taylor <as...@apache.org> wrote:
>
> Quick comment (as I'm still mostly on paternity leave):
>
> Storing wheels in the db sounds like a bad Idea to me, especially if we need to store deps in there too (and if we don't store deps, then they are incomplete) - they could get very large, and I've stored blobs of ~10mb in postgres before: I don't recommend it. It "works" but operating it is tricky.
>
>
>
> > the API could simply accept "Wheel file + the Dag id"
>
> This sounds like a huge security risk.
>
>
> My main concern with this idea is that it seems a lot of complexity we are putting on users. Doubly so if they are already using docker where there already exists an Ideal packaging and distribution that could contain dag + needed code.
>
> (Sorry for the brevity)
>
> -ash
>
>
> On 2 August 2020 08:47:39 BST, Jarek Potiuk <Ja...@polidea.com> wrote:
> >Few points from my sid (and proposal!):
> >
> >1) Agree with Max -  with a rather strong NO for pickles (however,
> >indeed cloudpickle solves some of the problems). Pickles came up in
> >our discussion in Polidea recently and the overall message was "no". I
> >agree with Max here - if we can ship python code, turning that into
> >pickle for transit makes little sense to me and brings a plethora of
> >problems.
> >
> >2) I think indeed the versioning solution should treat the "DagRun"
> >structure atomically. While I see why we would like to go with the
> >UI/Scheduler only first rather than implementing them in the workers,
> >adding the "mixed version" is where it breaks down IMHO. Reasoning
> >about such "mixed version" dag is next to impossible. The current
> >behavior is not well defined and non-deterministic (depends on
> >scheduler delays, syncing, type of deployment, restarts of the works
> >etc.) we are moving it up to UI (thus users) rather than solving the
> >problem. So I am not a big fan of this and would rather solve it
> >"well" with atomicity.
> >
> >3) I see the point of Dan as well - we had many discussions and many
> >times the idea about "submitting" the DAG for execution via the API
> >came up - and it makes sense IMHO.
> >
> >Proposal: Implement full versioning with code shipping via DB wheels
> >BLOB (akin to serialized DAGs).
> >
> >I understand that the big issue is how to actually "ship" the code to
> >the worker. And - maybe a wild idea - we can kill several birds with
> >the same stone.
> >
> >There were plenty of discussions on how we could do that but one was
> >never truly explored - using wheel packages.
> >
> >For those who do not know them, there is the PEP:
> >https://www.python.org/dev/peps/pep-0427/
> >
> >Wheels allow to "package" python code in a standard way. They are
> >portable ("purelib" + contain .py rather than .pyc code), they have
> >metadata, versioning information, they can be signed for security,
> >They can contain other packages or python code, Why don't we let
> >scheduler to pack the fingerprinted version of the DAG in a .whl and
> >store it as a blob in a DB next to the serialized form?
> >
> >There were concerns about the size of the code to keep in the DB - but
> >we already use the DB for serialized DAGs and it works fine (I believe
> >we only need to add compressing of the JSon serialized form - as we've
> >learned from AirBnb during their talk at the Airflow Summit - wheels
> >are already compressed). Also - each task will only need the
> >particular "version" of one DAG so even if we keep many of them in the
> >DB, the old version will pretty soon go "cold" and will never be
> >retrieved (and most DBs will handle it well with caching/indexes).
> >
> >And if we want to add "callables" from other files - there is nothing
> >to stop the person who defines dag to add list of files that should be
> >packaged together with the main DAG file (additional_python_files =
> >["common/my_fantastic_library.py"] in DAG constructor). Or we could
> >auto-add all files after the DAG gets imported (i.e. package
> >automatically all files that are imported for that particular DAG from
> >the "dags" folder"). That should be rather easy.
> >
> >This way we could ship the code to workers for the exact version that
> >the DagRun uses. And they can be cached and unpacked/installed to a
> >virtualenv for the execution of that single task. That should be super
> >quick. Such virtualenv can be wiped out after execution.
> >
> >Then we got what Max wants (atomicity of DagRuns) and what Dan wants
> >(the API could simply accept "Wheel file + the Dag id". We have the
> >isolation between tasks running on the same worker (based on
> >virtualenv) so that each process in the same worker can run a
> >different version of the same Dag. We have much less confusion for the
> >UI.
> >
> >Extra bonus 1: we can expand it to package different dependencies in
> >the wheels as well - so that if an operator requires a different
> >(newer) version of a python library, it could be packaged together
> >with the DAG in the same .whl file. This is also a highly requested
> >feature.
> >Extra bonus 2: workers will stop depending on the DAG file mount (!)
> >which was our long term goal and indeed as Dan mentioned - a great
> >step towards multi-tenancy.
> >
> >J.
> >
> >
> >
> >
> >
> >
> >On Fri, Jul 31, 2020 at 6:41 AM Maxime Beauchemin
> ><ma...@gmail.com> wrote:
> >>
> >> Having tried it early on, I'd advocate pretty strongly against
> >pickles and
> >> would rather not get too deep into the why here. Short story is they
> >can
> >> pull the entire memory space or much more than you want, and it's
> >> impossible to reason about where they end. For that reason and other
> >> reasons, they're a security issue. Oh and some objects are not
> >picklable
> >> (Jinja templates! to name a problematic one...). I've also seen
> >> secret-related classes that raise when pickled (thank god!).
> >>
> >> About callback and other things like that, it's quite a puzzle in
> >python.
> >> One solution would be to point to a python namespace
> >> callback="preset.airflow_utils.slack_callback" and assume the
> >function has
> >> to exist in the remote interpreter. Personally I like the DagFetcher
> >idea
> >> (it could be great to get a pointer to that mailing list thread
> >here),
> >> specifically the GitDagFetcher. I don't know how [un]reasonable it
> >is, but
> >> I hate pickles so much that shipping source code around seems much
> >more
> >> reasonable to me. I think out there there's a talk from Mike Star
> >about
> >> Dataswarm at FB and he may mention how their workers may git shallow
> >clone
> >> the pipeline repo. Or maybe they use that "beautifully ugly" hack to
> >use
> >> a gitfs fuse [file system in user space] on the worker [could get
> >deeper
> >> into that, not sure how reasonable that is either].
> >>
> >> About fingerprints, a simple `start_date = datetime.now() -
> >timedelta(1)`
> >> may lead to a never-repeating fingerprint. From memory the spec
> >doesn't
> >> list out the properties considered to build the hash. It be helpful
> >to
> >> specify and review that list.
> >>
> >> Max
> >>
> >> On Wed, Jul 29, 2020 at 5:20 AM Kaxil Naik <ka...@gmail.com>
> >wrote:
> >>
> >> > Thanks, both Max and Dan for your comments, please check my reply
> >below:
> >> >
> >> >
> >> > >  Personally I vote for a DAG version to be pinned and consistent
> >for the
> >> > > duration of the DAG run. Some of the reasons why:
> >> > > - it's easier to reason about, and therefore visualize and
> >troubleshoot
> >> > > - it prevents some cases where dependencies are never met
> >> > > - it prevents the explosion of artifact/metadata (one
> >serialization per
> >> > > dagrun as opposed to one per scheduler cycle) in the case of a
> >dynamic
> >> > DAG
> >> > > whose fingerprint is never the same.
> >> >
> >> >
> >> > In this AIP, we were only looking to fix the current "Viewing
> >behaviour"
> >> > and
> >> > we were intentionally not changing the execution behaviour.
> >> > The change you are suggesting means we need to introduce DAG
> >Versioning for
> >> > the
> >> > workers too. This will need more work as can't use the Serialised
> >> > Representation
> >> > to run the task since users could use custom modules in a different
> >part of
> >> > code,
> >> > example the PythonOperator has python_callable that allows running
> >any
> >> > arbitrary code.
> >> > A similar case is with the *on_*_callbacks* defined on DAG.
> >> >
> >> > Based on the current scope of the AIP, we still plan to use the
> >actual DAG
> >> > files for the
> >> > execution and not use Serialized DAGs for the workers.
> >> >
> >> > To account for all the custom modules we will have to start looking
> >at
> >> > pickle (cloudpickle).
> >> >
> >> > I'm certain that there are lots of
> >> > > those DAGs out there, and that it will overwhelm the metadata
> >database,
> >> > and
> >> > > confuse the users. For an hourly DAG is would mean 24 artifact
> >per day
> >> > > instead of 1000+
> >> >
> >> >
> >> > What kind of dynamic DAGs are we talking about here, I would think
> >the DAG
> >> > signature won't change
> >> > but I might be wrong, can you give an example, please.
> >> >
> >> > If backwards compatibility in behavior is a concern, I'd recommend
> >adding a
> >> > > flag to the DAG class and/or config and make sure we're doing the
> >right
> >> > > thing by default. People who want backward compatibility would
> >have to
> >> > > change that default. But again, that's a lot of extra and
> >confusing
> >> > > complexity that will likely be the source of bugs and user
> >confusion.
> >> > > Having a clear, easy to reason about execution model is super
> >important.
> >> >
> >> > Think about visualizing a DAG that shapeshifted 5 times during its
> >> > > execution, how does anyone make sense of that?
> >> >
> >> >
> >> > Wouldn't that be an edge case? How often would someone change the
> >DAG
> >> > structure in the middle of
> >> > a DAG execution. And since if they do change, the Graph View should
> >show
> >> > all the tasks that were
> >> > run, if it just shows based on the latest version, the behaviour
> >would be
> >> > the same as now.
> >> >
> >> > --------
> >> >
> >> > Strongly agree with Max's points, also I feel the right way to go
> >about
> >> > > this is instead of Airflow schedulers/webservers/workers reading
> >DAG
> >> > Python
> >> > > files, they would instead read from serialized representations of
> >the
> >> > DAGs
> >> > > (e.g. json representation in the Airflow DB). Instead of DAG
> >owners
> >> > pushing
> >> > > their DAG files to the Airflow components via varying mechanisms
> >(e.g.
> >> > > git), they would instead call an Airflow CLI to push the
> >serialized DAG
> >> > > representations to the DB, and for things like dynamic DAGs you
> >could
> >> > > populate them from a DAG or another service.
> >> >
> >> >
> >> > Airflow Webserver and the Scheduler will definitely read from the
> >> > Serialized representation as
> >> > they don't need all the code from the DAG files.
> >> >
> >> > While the workers definitely need access to DAG files as the
> >> > tasks/operators would be using
> >> > code form custom modules and classes which are required to run the
> >tasks.
> >> >
> >> > If we do want to go down that route we will have to use something
> >like
> >> > cloudpickle that serializes
> >> > entire DAG file and their dependencies. And also ensure that
> >someone is not
> >> > able to change the pickled
> >> > source when sending from executor to the worker as that poses a big
> >> > security risk.
> >> >
> >> > - Kaxil
> >> >
> >> > On Wed, Jul 29, 2020 at 12:43 PM Jacob Ward <jw...@brandwatch.com>
> >wrote:
> >> >
> >> > > I came here to say what Max has said, only less eloquently.
> >> > >
> >> > > I do have one concern with locking the version for a single run.
> >> > Currently
> >> > > it is possible for a user to create a dag which intentionally
> >changes as
> >> > a
> >> > > dag executes, i.e. dynamically creating a task for the dag during
> >a run
> >> > by
> >> > > modifying external data, but this change would prevent that. I'm
> >of the
> >> > > opinion that this situation is bad practice anyway so it doesn't
> >matter
> >> > if
> >> > > we make it impossible to do, but others may disagree.
> >> > >
> >> > > On Tue, 28 Jul 2020 at 17:08, Dan Davydov
> ><dd...@twitter.com.invalid>
> >> > > wrote:
> >> > >
> >> > > > Strongly agree with Max's points, also I feel the right way to
> >go about
> >> > > > this is instead of Airflow schedulers/webservers/workers
> >reading DAG
> >> > > Python
> >> > > > files, they would instead read from serialized representations
> >of the
> >> > > DAGs
> >> > > > (e.g. json representation in the Airflow DB). Instead of DAG
> >owners
> >> > > pushing
> >> > > > their DAG files to the Airflow components via varying
> >mechanisms (e.g.
> >> > > > git), they would instead call an Airflow CLI to push the
> >serialized DAG
> >> > > > representations to the DB, and for things like dynamic DAGs you
> >could
> >> > > > populate them from a DAG or another service.
> >> > > >
> >> > > > This would also enable other features like stronger
> >> > > security/multi-tenancy.
> >> > > >
> >> > > > On Tue, Jul 28, 2020 at 6:44 PM Maxime Beauchemin <
> >> > > > maximebeauchemin@gmail.com> wrote:
> >> > > >
> >> > > > > > "mixed version"
> >> > > > >
> >> > > > > Personally I vote for a DAG version to be pinned and
> >consistent for
> >> > the
> >> > > > > duration of the DAG run. Some of the reasons why:
> >> > > > > - it's easier to reason about, and therefore visualize and
> >> > troubleshoot
> >> > > > > - it prevents some cases where dependencies are never met
> >> > > > > - it prevents the explosion of artifact/metadata (one
> >serialization
> >> > per
> >> > > > > dagrun as opposed to one per scheduler cycle) in the case of
> >a
> >> > dynamic
> >> > > > DAG
> >> > > > > whose fingerprint is never the same. I'm certain that there
> >are lots
> >> > of
> >> > > > > those DAGs out there, and that it will overwhelm the metadata
> >> > database,
> >> > > > and
> >> > > > > confuse the users. For an hourly DAG is would mean 24
> >artifact per
> >> > day
> >> > > > > instead of 1000+
> >> > > > >
> >> > > > > If backwards compatibility in behavior is a concern, I'd
> >recommend
> >> > > > adding a
> >> > > > > flag to the DAG class and/or config and make sure we're doing
> >the
> >> > right
> >> > > > > thing by default. People who want backward compatibility
> >would have
> >> > to
> >> > > > > change that default. But again, that's a lot of extra and
> >confusing
> >> > > > > complexity that will likely be the source of bugs and user
> >confusion.
> >> > > > > Having a clear, easy to reason about execution model is super
> >> > > important.
> >> > > > >
> >> > > > > Think about visualizing a DAG that shapeshifted 5 times
> >during its
> >> > > > > execution, how does anyone make sense of that?
> >> > > > >
> >> > > > > Max
> >> > > > >
> >> > > > > On Tue, Jul 28, 2020 at 3:14 AM Kaxil Naik
> ><ka...@gmail.com>
> >> > > wrote:
> >> > > > >
> >> > > > > > Thanks Max for your comments.
> >> > > > > >
> >> > > > > >
> >> > > > > > *DAG Fingerprinting: *this can be tricky, especially in
> >regards to
> >> > > > > dynamic
> >> > > > > > > DAGs, where in some cases each parsing of the DAG can
> >result in a
> >> > > > > > different
> >> > > > > > > fingerprint. I think DAG and tasks attributes are left
> >out from
> >> > the
> >> > > > > > > proposal that should be considered as part of the
> >fingerprint,
> >> > like
> >> > > > > > trigger
> >> > > > > > > rules or task start/end datetime. We should do a full
> >pass of all
> >> > > DAG
> >> > > > > > > arguments and make sure we're not forgetting anything
> >that can
> >> > > change
> >> > > > > > > scheduling logic. Also, let's be careful that something
> >as simple
> >> > > as
> >> > > > a
> >> > > > > > > dynamic start or end date on a task could lead to a
> >different
> >> > > version
> >> > > > > > each
> >> > > > > > > time you parse.
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > The short version of Dag Fingerprinting would be
> >> > > > > > just a hash of the Serialized DAG.
> >> > > > > >
> >> > > > > > *Example DAG*: https://imgur.com/TVuoN3p
> >> > > > > > *Example Serialized DAG*: https://imgur.com/LmA2Bpr
> >> > > > > >
> >> > > > > > It contains all the task & DAG parameters. When they
> >change,
> >> > > Scheduler
> >> > > > > > writes
> >> > > > > > a new version of Serialized DAGs to the DB. The Webserver
> >then
> >> > reads
> >> > > > the
> >> > > > > > DAGs from the DB.
> >> > > > > >
> >> > > > > > I'd recommend limiting serialization/storage of one version
> >> > > > > > > per DAG Run, as opposed to potentially everytime the DAG
> >is
> >> > parsed
> >> > > -
> >> > > > > once
> >> > > > > > > the version for a DAG run is pinned, fingerprinting is
> >not
> >> > > > re-evaluated
> >> > > > > > > until the next DAG run is ready to get created.
> >> > > > > >
> >> > > > > >
> >> > > > > > This is to handle Scenario 3 where a DAG structure is
> >changed
> >> > > mid-way.
> >> > > > > > Since we don't intend to
> >> > > > > > change the execution behaviour, if we limit Storage of 1
> >version
> >> > per
> >> > > > DAG,
> >> > > > > > it won't actually show what
> >> > > > > > was run.
> >> > > > > >
> >> > > > > > Example Dag v1: Task A -> Task B -> Task C
> >> > > > > > The worker has completed the execution of Task B and is
> >just about
> >> > to
> >> > > > > > complete the execution of Task B.
> >> > > > > >
> >> > > > > > The 2nd version of DAG is deployed: Task A -> Task D
> >> > > > > > Now Scheduler queued Task D and it will run to completion.
> >(Task C
> >> > > > won't
> >> > > > > > run)
> >> > > > > >
> >> > > > > > In this case, "the actual representation of the DAG" that
> >run is
> >> > > > neither
> >> > > > > v1
> >> > > > > > nor v2 but a "mixed version"
> >> > > > > >  (Task A -> Task B -> Task D). The plan is that the
> >Scheduler will
> >> > > > create
> >> > > > > > this "mixed version" based on what ran
> >> > > > > > and the Graph View would show this "mixed version".
> >> > > > > >
> >> > > > > > There would also be a toggle button on the Graph View to
> >select v1
> >> > or
> >> > > > v2
> >> > > > > > where the tasks will be highlighted to show
> >> > > > > > that a particular task was in v1 or v2 as shown in
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >https://cwiki.apache.org/confluence/download/attachments/158868919/Picture%201.png?version=2&modificationDate=1595612863000&api=v2
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > *Visualizing change in the tree view:* I think this is very
> >complex
> >> > > and
> >> > > > > > > many things can make this view impossible to render (task
> >> > > dependency
> >> > > > > > > reversal, cycles across versions, ...). Maybe a better
> >visual
> >> > > > approach
> >> > > > > > > would be to render independent, individual tree views for
> >each
> >> > DAG
> >> > > > > > version
> >> > > > > > > (side by side), and doing best effort aligning the tasks
> >across
> >> > > > blocks
> >> > > > > > and
> >> > > > > > > "linking" tasks with lines across blocks when necessary.
> >> > > > > >
> >> > > > > >
> >> > > > > > Agreed, the plan is to do the best effort aligning.
> >> > > > > > At this point in time, task additions to the end of the DAG
> >are
> >> > > > expected
> >> > > > > to
> >> > > > > > be compatible,
> >> > > > > > but changes to task structure within the DAG may cause the
> >tree
> >> > view
> >> > > > not
> >> > > > > to
> >> > > > > > incorporate “old” and “new” in the same view, hence that
> >won't be
> >> > > > shown.
> >> > > > > >
> >> > > > > > Regards,
> >> > > > > > Kaxil
> >> > > > > >
> >> > > > > > On Mon, Jul 27, 2020 at 6:02 PM Maxime Beauchemin <
> >> > > > > > maximebeauchemin@gmail.com> wrote:
> >> > > > > >
> >> > > > > > > Some notes and ideas:
> >> > > > > > >
> >> > > > > > > *DAG Fingerprinting: *this can be tricky, especially in
> >regards
> >> > to
> >> > > > > > dynamic
> >> > > > > > > DAGs, where in some cases each parsing of the DAG can
> >result in a
> >> > > > > > different
> >> > > > > > > fingerprint. I think DAG and tasks attributes are left
> >out from
> >> > the
> >> > > > > > > proposal that should be considered as part of the
> >fingerprint,
> >> > like
> >> > > > > > trigger
> >> > > > > > > rules or task start/end datetime. We should do a full
> >pass of all
> >> > > DAG
> >> > > > > > > arguments and make sure we're not forgetting anything
> >that can
> >> > > change
> >> > > > > > > scheduling logic. Also, let's be careful that something
> >as simple
> >> > > as
> >> > > > a
> >> > > > > > > dynamic start or end date on a task could lead to a
> >different
> >> > > version
> >> > > > > > each
> >> > > > > > > time you parse. I'd recommend limiting
> >serialization/storage of
> >> > one
> >> > > > > > version
> >> > > > > > > per DAG Run, as opposed to potentially everytime the DAG
> >is
> >> > parsed
> >> > > -
> >> > > > > once
> >> > > > > > > the version for a DAG run is pinned, fingerprinting is
> >not
> >> > > > re-evaluated
> >> > > > > > > until the next DAG run is ready to get created.
> >> > > > > > >
> >> > > > > > > *Visualizing change in the tree view:* I think this is
> >very
> >> > complex
> >> > > > and
> >> > > > > > > many things can make this view impossible to render (task
> >> > > dependency
> >> > > > > > > reversal, cycles across versions, ...). Maybe a better
> >visual
> >> > > > approach
> >> > > > > > > would be to render independent, individual tree views for
> >each
> >> > DAG
> >> > > > > > version
> >> > > > > > > (side by side), and doing best effort aligning the tasks
> >across
> >> > > > blocks
> >> > > > > > and
> >> > > > > > > "linking" tasks with lines across blocks when necessary.
> >> > > > > > >
> >> > > > > > > On Fri, Jul 24, 2020 at 12:46 PM Vikram Koka <
> >> > vikram@astronomer.io
> >> > > >
> >> > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Team,
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > We just created 'AIP-36 DAG Versioning' on Confluence
> >and would
> >> > > > very
> >> > > > > > much
> >> > > > > > > > appreciate feedback and suggestions from the community.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-36+DAG+Versioning
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > The DAG Versioning concept has been discussed on
> >multiple
> >> > > occasions
> >> > > > > in
> >> > > > > > > the
> >> > > > > > > > past and has been a topic highlighted as part of
> >Airflow 2.0 as
> >> > > > well.
> >> > > > > > We
> >> > > > > > > at
> >> > > > > > > > Astronomer have heard data engineers at several
> >enterprises ask
> >> > > > about
> >> > > > > > > this
> >> > > > > > > > feature as well, for easier debugging when changes are
> >made to
> >> > > DAGs
> >> > > > > as
> >> > > > > > a
> >> > > > > > > > result of evolving business needs.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > As described in the AIP, we have a proposal focused on
> >ensuring
> >> > > > that
> >> > > > > > the
> >> > > > > > > > visibility behaviour of Airflow is correct, without
> >changing
> >> > the
> >> > > > > > > execution
> >> > > > > > > > behaviour. We considered changing the execution
> >behaviour as
> >> > > well,
> >> > > > > but
> >> > > > > > > > decided that the risks in changing execution behavior
> >were too
> >> > > high
> >> > > > > as
> >> > > > > > > > compared to the benefits and therefore decided to limit
> >the
> >> > scope
> >> > > > to
> >> > > > > > only
> >> > > > > > > > making sure that the visibility was correct.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > We would like to attempt this based on our experience
> >running
> >> > > > Airflow
> >> > > > > > as
> >> > > > > > > a
> >> > > > > > > > service. We believe that this benefits Airflow as a
> >project and
> >> > > the
> >> > > > > > > > development experience of data engineers using Airflow
> >across
> >> > the
> >> > > > > > world.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >  Any feedback, suggestions, and comments would be
> >greatly
> >> > > > > appreciated.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Best Regards,
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Kaxil Naik, Ryan Hamilton, Ash Berlin-Taylor, and
> >Vikram Koka
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> > >
> >> > > --
> >> > >
> >> > > Jacob Ward    |    Graduate Data Infrastructure Engineer
> >> > >
> >> > > jward@brandwatch.com
> >> > >
> >> > >
> >> > > NEW YORK   | BOSTON   | BRIGHTON   | LONDON   | BERLIN |
> >STUTTGART |
> >> > > PARIS   | SINGAPORE | SYDNEY
> >> > >
> >> >
> >
> >
> >
> >--
> >
> >Jarek Potiuk
> >Polidea | Principal Software Engineer
> >
> >M: +48 660 796 129

Re: [PROPOSAL][AIP-36 DAG Versioning]

Posted by Ash Berlin-Taylor <as...@apache.org>.
Quick comment (as I'm still mostly on paternity leave):

Storing wheels in the db sounds like a bad Idea to me, especially if we need to store deps in there too (and if we don't store deps, then they are incomplete) - they could get very large, and I've stored blobs of ~10mb in postgres before: I don't recommend it. It "works" but operating it is tricky.



> the API could simply accept "Wheel file + the Dag id"

This sounds like a huge security risk.


My main concern with this idea is that it seems a lot of complexity we are putting on users. Doubly so if they are already using docker where there already exists an Ideal packaging and distribution that could contain dag + needed code.

(Sorry for the brevity)

-ash


On 2 August 2020 08:47:39 BST, Jarek Potiuk <Ja...@polidea.com> wrote:
>Few points from my sid (and proposal!):
>
>1) Agree with Max -  with a rather strong NO for pickles (however,
>indeed cloudpickle solves some of the problems). Pickles came up in
>our discussion in Polidea recently and the overall message was "no". I
>agree with Max here - if we can ship python code, turning that into
>pickle for transit makes little sense to me and brings a plethora of
>problems.
>
>2) I think indeed the versioning solution should treat the "DagRun"
>structure atomically. While I see why we would like to go with the
>UI/Scheduler only first rather than implementing them in the workers,
>adding the "mixed version" is where it breaks down IMHO. Reasoning
>about such "mixed version" dag is next to impossible. The current
>behavior is not well defined and non-deterministic (depends on
>scheduler delays, syncing, type of deployment, restarts of the works
>etc.) we are moving it up to UI (thus users) rather than solving the
>problem. So I am not a big fan of this and would rather solve it
>"well" with atomicity.
>
>3) I see the point of Dan as well - we had many discussions and many
>times the idea about "submitting" the DAG for execution via the API
>came up - and it makes sense IMHO.
>
>Proposal: Implement full versioning with code shipping via DB wheels
>BLOB (akin to serialized DAGs).
>
>I understand that the big issue is how to actually "ship" the code to
>the worker. And - maybe a wild idea - we can kill several birds with
>the same stone.
>
>There were plenty of discussions on how we could do that but one was
>never truly explored - using wheel packages.
>
>For those who do not know them, there is the PEP:
>https://www.python.org/dev/peps/pep-0427/
>
>Wheels allow to "package" python code in a standard way. They are
>portable ("purelib" + contain .py rather than .pyc code), they have
>metadata, versioning information, they can be signed for security,
>They can contain other packages or python code, Why don't we let
>scheduler to pack the fingerprinted version of the DAG in a .whl and
>store it as a blob in a DB next to the serialized form?
>
>There were concerns about the size of the code to keep in the DB - but
>we already use the DB for serialized DAGs and it works fine (I believe
>we only need to add compressing of the JSon serialized form - as we've
>learned from AirBnb during their talk at the Airflow Summit - wheels
>are already compressed). Also - each task will only need the
>particular "version" of one DAG so even if we keep many of them in the
>DB, the old version will pretty soon go "cold" and will never be
>retrieved (and most DBs will handle it well with caching/indexes).
>
>And if we want to add "callables" from other files - there is nothing
>to stop the person who defines dag to add list of files that should be
>packaged together with the main DAG file (additional_python_files =
>["common/my_fantastic_library.py"] in DAG constructor). Or we could
>auto-add all files after the DAG gets imported (i.e. package
>automatically all files that are imported for that particular DAG from
>the "dags" folder"). That should be rather easy.
>
>This way we could ship the code to workers for the exact version that
>the DagRun uses. And they can be cached and unpacked/installed to a
>virtualenv for the execution of that single task. That should be super
>quick. Such virtualenv can be wiped out after execution.
>
>Then we got what Max wants (atomicity of DagRuns) and what Dan wants
>(the API could simply accept "Wheel file + the Dag id". We have the
>isolation between tasks running on the same worker (based on
>virtualenv) so that each process in the same worker can run a
>different version of the same Dag. We have much less confusion for the
>UI.
>
>Extra bonus 1: we can expand it to package different dependencies in
>the wheels as well - so that if an operator requires a different
>(newer) version of a python library, it could be packaged together
>with the DAG in the same .whl file. This is also a highly requested
>feature.
>Extra bonus 2: workers will stop depending on the DAG file mount (!)
>which was our long term goal and indeed as Dan mentioned - a great
>step towards multi-tenancy.
>
>J.
>
>
>
>
>
>
>On Fri, Jul 31, 2020 at 6:41 AM Maxime Beauchemin
><ma...@gmail.com> wrote:
>>
>> Having tried it early on, I'd advocate pretty strongly against
>pickles and
>> would rather not get too deep into the why here. Short story is they
>can
>> pull the entire memory space or much more than you want, and it's
>> impossible to reason about where they end. For that reason and other
>> reasons, they're a security issue. Oh and some objects are not
>picklable
>> (Jinja templates! to name a problematic one...). I've also seen
>> secret-related classes that raise when pickled (thank god!).
>>
>> About callback and other things like that, it's quite a puzzle in
>python.
>> One solution would be to point to a python namespace
>> callback="preset.airflow_utils.slack_callback" and assume the
>function has
>> to exist in the remote interpreter. Personally I like the DagFetcher
>idea
>> (it could be great to get a pointer to that mailing list thread
>here),
>> specifically the GitDagFetcher. I don't know how [un]reasonable it
>is, but
>> I hate pickles so much that shipping source code around seems much
>more
>> reasonable to me. I think out there there's a talk from Mike Star
>about
>> Dataswarm at FB and he may mention how their workers may git shallow
>clone
>> the pipeline repo. Or maybe they use that "beautifully ugly" hack to
>use
>> a gitfs fuse [file system in user space] on the worker [could get
>deeper
>> into that, not sure how reasonable that is either].
>>
>> About fingerprints, a simple `start_date = datetime.now() -
>timedelta(1)`
>> may lead to a never-repeating fingerprint. From memory the spec
>doesn't
>> list out the properties considered to build the hash. It be helpful
>to
>> specify and review that list.
>>
>> Max
>>
>> On Wed, Jul 29, 2020 at 5:20 AM Kaxil Naik <ka...@gmail.com>
>wrote:
>>
>> > Thanks, both Max and Dan for your comments, please check my reply
>below:
>> >
>> >
>> > >  Personally I vote for a DAG version to be pinned and consistent
>for the
>> > > duration of the DAG run. Some of the reasons why:
>> > > - it's easier to reason about, and therefore visualize and
>troubleshoot
>> > > - it prevents some cases where dependencies are never met
>> > > - it prevents the explosion of artifact/metadata (one
>serialization per
>> > > dagrun as opposed to one per scheduler cycle) in the case of a
>dynamic
>> > DAG
>> > > whose fingerprint is never the same.
>> >
>> >
>> > In this AIP, we were only looking to fix the current "Viewing
>behaviour"
>> > and
>> > we were intentionally not changing the execution behaviour.
>> > The change you are suggesting means we need to introduce DAG
>Versioning for
>> > the
>> > workers too. This will need more work as can't use the Serialised
>> > Representation
>> > to run the task since users could use custom modules in a different
>part of
>> > code,
>> > example the PythonOperator has python_callable that allows running
>any
>> > arbitrary code.
>> > A similar case is with the *on_*_callbacks* defined on DAG.
>> >
>> > Based on the current scope of the AIP, we still plan to use the
>actual DAG
>> > files for the
>> > execution and not use Serialized DAGs for the workers.
>> >
>> > To account for all the custom modules we will have to start looking
>at
>> > pickle (cloudpickle).
>> >
>> > I'm certain that there are lots of
>> > > those DAGs out there, and that it will overwhelm the metadata
>database,
>> > and
>> > > confuse the users. For an hourly DAG is would mean 24 artifact
>per day
>> > > instead of 1000+
>> >
>> >
>> > What kind of dynamic DAGs are we talking about here, I would think
>the DAG
>> > signature won't change
>> > but I might be wrong, can you give an example, please.
>> >
>> > If backwards compatibility in behavior is a concern, I'd recommend
>adding a
>> > > flag to the DAG class and/or config and make sure we're doing the
>right
>> > > thing by default. People who want backward compatibility would
>have to
>> > > change that default. But again, that's a lot of extra and
>confusing
>> > > complexity that will likely be the source of bugs and user
>confusion.
>> > > Having a clear, easy to reason about execution model is super
>important.
>> >
>> > Think about visualizing a DAG that shapeshifted 5 times during its
>> > > execution, how does anyone make sense of that?
>> >
>> >
>> > Wouldn't that be an edge case? How often would someone change the
>DAG
>> > structure in the middle of
>> > a DAG execution. And since if they do change, the Graph View should
>show
>> > all the tasks that were
>> > run, if it just shows based on the latest version, the behaviour
>would be
>> > the same as now.
>> >
>> > --------
>> >
>> > Strongly agree with Max's points, also I feel the right way to go
>about
>> > > this is instead of Airflow schedulers/webservers/workers reading
>DAG
>> > Python
>> > > files, they would instead read from serialized representations of
>the
>> > DAGs
>> > > (e.g. json representation in the Airflow DB). Instead of DAG
>owners
>> > pushing
>> > > their DAG files to the Airflow components via varying mechanisms
>(e.g.
>> > > git), they would instead call an Airflow CLI to push the
>serialized DAG
>> > > representations to the DB, and for things like dynamic DAGs you
>could
>> > > populate them from a DAG or another service.
>> >
>> >
>> > Airflow Webserver and the Scheduler will definitely read from the
>> > Serialized representation as
>> > they don't need all the code from the DAG files.
>> >
>> > While the workers definitely need access to DAG files as the
>> > tasks/operators would be using
>> > code form custom modules and classes which are required to run the
>tasks.
>> >
>> > If we do want to go down that route we will have to use something
>like
>> > cloudpickle that serializes
>> > entire DAG file and their dependencies. And also ensure that
>someone is not
>> > able to change the pickled
>> > source when sending from executor to the worker as that poses a big
>> > security risk.
>> >
>> > - Kaxil
>> >
>> > On Wed, Jul 29, 2020 at 12:43 PM Jacob Ward <jw...@brandwatch.com>
>wrote:
>> >
>> > > I came here to say what Max has said, only less eloquently.
>> > >
>> > > I do have one concern with locking the version for a single run.
>> > Currently
>> > > it is possible for a user to create a dag which intentionally
>changes as
>> > a
>> > > dag executes, i.e. dynamically creating a task for the dag during
>a run
>> > by
>> > > modifying external data, but this change would prevent that. I'm
>of the
>> > > opinion that this situation is bad practice anyway so it doesn't
>matter
>> > if
>> > > we make it impossible to do, but others may disagree.
>> > >
>> > > On Tue, 28 Jul 2020 at 17:08, Dan Davydov
><dd...@twitter.com.invalid>
>> > > wrote:
>> > >
>> > > > Strongly agree with Max's points, also I feel the right way to
>go about
>> > > > this is instead of Airflow schedulers/webservers/workers
>reading DAG
>> > > Python
>> > > > files, they would instead read from serialized representations
>of the
>> > > DAGs
>> > > > (e.g. json representation in the Airflow DB). Instead of DAG
>owners
>> > > pushing
>> > > > their DAG files to the Airflow components via varying
>mechanisms (e.g.
>> > > > git), they would instead call an Airflow CLI to push the
>serialized DAG
>> > > > representations to the DB, and for things like dynamic DAGs you
>could
>> > > > populate them from a DAG or another service.
>> > > >
>> > > > This would also enable other features like stronger
>> > > security/multi-tenancy.
>> > > >
>> > > > On Tue, Jul 28, 2020 at 6:44 PM Maxime Beauchemin <
>> > > > maximebeauchemin@gmail.com> wrote:
>> > > >
>> > > > > > "mixed version"
>> > > > >
>> > > > > Personally I vote for a DAG version to be pinned and
>consistent for
>> > the
>> > > > > duration of the DAG run. Some of the reasons why:
>> > > > > - it's easier to reason about, and therefore visualize and
>> > troubleshoot
>> > > > > - it prevents some cases where dependencies are never met
>> > > > > - it prevents the explosion of artifact/metadata (one
>serialization
>> > per
>> > > > > dagrun as opposed to one per scheduler cycle) in the case of
>a
>> > dynamic
>> > > > DAG
>> > > > > whose fingerprint is never the same. I'm certain that there
>are lots
>> > of
>> > > > > those DAGs out there, and that it will overwhelm the metadata
>> > database,
>> > > > and
>> > > > > confuse the users. For an hourly DAG is would mean 24
>artifact per
>> > day
>> > > > > instead of 1000+
>> > > > >
>> > > > > If backwards compatibility in behavior is a concern, I'd
>recommend
>> > > > adding a
>> > > > > flag to the DAG class and/or config and make sure we're doing
>the
>> > right
>> > > > > thing by default. People who want backward compatibility
>would have
>> > to
>> > > > > change that default. But again, that's a lot of extra and
>confusing
>> > > > > complexity that will likely be the source of bugs and user
>confusion.
>> > > > > Having a clear, easy to reason about execution model is super
>> > > important.
>> > > > >
>> > > > > Think about visualizing a DAG that shapeshifted 5 times
>during its
>> > > > > execution, how does anyone make sense of that?
>> > > > >
>> > > > > Max
>> > > > >
>> > > > > On Tue, Jul 28, 2020 at 3:14 AM Kaxil Naik
><ka...@gmail.com>
>> > > wrote:
>> > > > >
>> > > > > > Thanks Max for your comments.
>> > > > > >
>> > > > > >
>> > > > > > *DAG Fingerprinting: *this can be tricky, especially in
>regards to
>> > > > > dynamic
>> > > > > > > DAGs, where in some cases each parsing of the DAG can
>result in a
>> > > > > > different
>> > > > > > > fingerprint. I think DAG and tasks attributes are left
>out from
>> > the
>> > > > > > > proposal that should be considered as part of the
>fingerprint,
>> > like
>> > > > > > trigger
>> > > > > > > rules or task start/end datetime. We should do a full
>pass of all
>> > > DAG
>> > > > > > > arguments and make sure we're not forgetting anything
>that can
>> > > change
>> > > > > > > scheduling logic. Also, let's be careful that something
>as simple
>> > > as
>> > > > a
>> > > > > > > dynamic start or end date on a task could lead to a
>different
>> > > version
>> > > > > > each
>> > > > > > > time you parse.
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > The short version of Dag Fingerprinting would be
>> > > > > > just a hash of the Serialized DAG.
>> > > > > >
>> > > > > > *Example DAG*: https://imgur.com/TVuoN3p
>> > > > > > *Example Serialized DAG*: https://imgur.com/LmA2Bpr
>> > > > > >
>> > > > > > It contains all the task & DAG parameters. When they
>change,
>> > > Scheduler
>> > > > > > writes
>> > > > > > a new version of Serialized DAGs to the DB. The Webserver
>then
>> > reads
>> > > > the
>> > > > > > DAGs from the DB.
>> > > > > >
>> > > > > > I'd recommend limiting serialization/storage of one version
>> > > > > > > per DAG Run, as opposed to potentially everytime the DAG
>is
>> > parsed
>> > > -
>> > > > > once
>> > > > > > > the version for a DAG run is pinned, fingerprinting is
>not
>> > > > re-evaluated
>> > > > > > > until the next DAG run is ready to get created.
>> > > > > >
>> > > > > >
>> > > > > > This is to handle Scenario 3 where a DAG structure is
>changed
>> > > mid-way.
>> > > > > > Since we don't intend to
>> > > > > > change the execution behaviour, if we limit Storage of 1
>version
>> > per
>> > > > DAG,
>> > > > > > it won't actually show what
>> > > > > > was run.
>> > > > > >
>> > > > > > Example Dag v1: Task A -> Task B -> Task C
>> > > > > > The worker has completed the execution of Task B and is
>just about
>> > to
>> > > > > > complete the execution of Task B.
>> > > > > >
>> > > > > > The 2nd version of DAG is deployed: Task A -> Task D
>> > > > > > Now Scheduler queued Task D and it will run to completion.
>(Task C
>> > > > won't
>> > > > > > run)
>> > > > > >
>> > > > > > In this case, "the actual representation of the DAG" that
>run is
>> > > > neither
>> > > > > v1
>> > > > > > nor v2 but a "mixed version"
>> > > > > >  (Task A -> Task B -> Task D). The plan is that the
>Scheduler will
>> > > > create
>> > > > > > this "mixed version" based on what ran
>> > > > > > and the Graph View would show this "mixed version".
>> > > > > >
>> > > > > > There would also be a toggle button on the Graph View to
>select v1
>> > or
>> > > > v2
>> > > > > > where the tasks will be highlighted to show
>> > > > > > that a particular task was in v1 or v2 as shown in
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>https://cwiki.apache.org/confluence/download/attachments/158868919/Picture%201.png?version=2&modificationDate=1595612863000&api=v2
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > *Visualizing change in the tree view:* I think this is very
>complex
>> > > and
>> > > > > > > many things can make this view impossible to render (task
>> > > dependency
>> > > > > > > reversal, cycles across versions, ...). Maybe a better
>visual
>> > > > approach
>> > > > > > > would be to render independent, individual tree views for
>each
>> > DAG
>> > > > > > version
>> > > > > > > (side by side), and doing best effort aligning the tasks
>across
>> > > > blocks
>> > > > > > and
>> > > > > > > "linking" tasks with lines across blocks when necessary.
>> > > > > >
>> > > > > >
>> > > > > > Agreed, the plan is to do the best effort aligning.
>> > > > > > At this point in time, task additions to the end of the DAG
>are
>> > > > expected
>> > > > > to
>> > > > > > be compatible,
>> > > > > > but changes to task structure within the DAG may cause the
>tree
>> > view
>> > > > not
>> > > > > to
>> > > > > > incorporate “old” and “new” in the same view, hence that
>won't be
>> > > > shown.
>> > > > > >
>> > > > > > Regards,
>> > > > > > Kaxil
>> > > > > >
>> > > > > > On Mon, Jul 27, 2020 at 6:02 PM Maxime Beauchemin <
>> > > > > > maximebeauchemin@gmail.com> wrote:
>> > > > > >
>> > > > > > > Some notes and ideas:
>> > > > > > >
>> > > > > > > *DAG Fingerprinting: *this can be tricky, especially in
>regards
>> > to
>> > > > > > dynamic
>> > > > > > > DAGs, where in some cases each parsing of the DAG can
>result in a
>> > > > > > different
>> > > > > > > fingerprint. I think DAG and tasks attributes are left
>out from
>> > the
>> > > > > > > proposal that should be considered as part of the
>fingerprint,
>> > like
>> > > > > > trigger
>> > > > > > > rules or task start/end datetime. We should do a full
>pass of all
>> > > DAG
>> > > > > > > arguments and make sure we're not forgetting anything
>that can
>> > > change
>> > > > > > > scheduling logic. Also, let's be careful that something
>as simple
>> > > as
>> > > > a
>> > > > > > > dynamic start or end date on a task could lead to a
>different
>> > > version
>> > > > > > each
>> > > > > > > time you parse. I'd recommend limiting
>serialization/storage of
>> > one
>> > > > > > version
>> > > > > > > per DAG Run, as opposed to potentially everytime the DAG
>is
>> > parsed
>> > > -
>> > > > > once
>> > > > > > > the version for a DAG run is pinned, fingerprinting is
>not
>> > > > re-evaluated
>> > > > > > > until the next DAG run is ready to get created.
>> > > > > > >
>> > > > > > > *Visualizing change in the tree view:* I think this is
>very
>> > complex
>> > > > and
>> > > > > > > many things can make this view impossible to render (task
>> > > dependency
>> > > > > > > reversal, cycles across versions, ...). Maybe a better
>visual
>> > > > approach
>> > > > > > > would be to render independent, individual tree views for
>each
>> > DAG
>> > > > > > version
>> > > > > > > (side by side), and doing best effort aligning the tasks
>across
>> > > > blocks
>> > > > > > and
>> > > > > > > "linking" tasks with lines across blocks when necessary.
>> > > > > > >
>> > > > > > > On Fri, Jul 24, 2020 at 12:46 PM Vikram Koka <
>> > vikram@astronomer.io
>> > > >
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Team,
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > We just created 'AIP-36 DAG Versioning' on Confluence
>and would
>> > > > very
>> > > > > > much
>> > > > > > > > appreciate feedback and suggestions from the community.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-36+DAG+Versioning
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > The DAG Versioning concept has been discussed on
>multiple
>> > > occasions
>> > > > > in
>> > > > > > > the
>> > > > > > > > past and has been a topic highlighted as part of
>Airflow 2.0 as
>> > > > well.
>> > > > > > We
>> > > > > > > at
>> > > > > > > > Astronomer have heard data engineers at several
>enterprises ask
>> > > > about
>> > > > > > > this
>> > > > > > > > feature as well, for easier debugging when changes are
>made to
>> > > DAGs
>> > > > > as
>> > > > > > a
>> > > > > > > > result of evolving business needs.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > As described in the AIP, we have a proposal focused on
>ensuring
>> > > > that
>> > > > > > the
>> > > > > > > > visibility behaviour of Airflow is correct, without
>changing
>> > the
>> > > > > > > execution
>> > > > > > > > behaviour. We considered changing the execution
>behaviour as
>> > > well,
>> > > > > but
>> > > > > > > > decided that the risks in changing execution behavior
>were too
>> > > high
>> > > > > as
>> > > > > > > > compared to the benefits and therefore decided to limit
>the
>> > scope
>> > > > to
>> > > > > > only
>> > > > > > > > making sure that the visibility was correct.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > We would like to attempt this based on our experience
>running
>> > > > Airflow
>> > > > > > as
>> > > > > > > a
>> > > > > > > > service. We believe that this benefits Airflow as a
>project and
>> > > the
>> > > > > > > > development experience of data engineers using Airflow
>across
>> > the
>> > > > > > world.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >  Any feedback, suggestions, and comments would be
>greatly
>> > > > > appreciated.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Best Regards,
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Kaxil Naik, Ryan Hamilton, Ash Berlin-Taylor, and
>Vikram Koka
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> > >
>> > > --
>> > >
>> > > Jacob Ward    |    Graduate Data Infrastructure Engineer
>> > >
>> > > jward@brandwatch.com
>> > >
>> > >
>> > > NEW YORK   | BOSTON   | BRIGHTON   | LONDON   | BERLIN |  
>STUTTGART |
>> > > PARIS   | SINGAPORE | SYDNEY
>> > >
>> >
>
>
>
>--
>
>Jarek Potiuk
>Polidea | Principal Software Engineer
>
>M: +48 660 796 129