You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Yulei Li <yu...@pinterest.com.INVALID> on 2020/09/02 20:02:57 UTC

Re: [PROPOSAL][AIP-36 DAG Versioning]

Hi Team,

My name is Yulei and i'm from the workflow team @ Pinterest. Want to join the discussion regarding the [AIP-36 DAG Versioning] as at Pinterest we implemented something similar to what has been proposed. So we want to hear and gather feedbacks from the community and discuss the possibility of contributing back or help to build this DAG versioning feature in Airflow.

At Pinterest, we have the use case of dynamic DAG where the same workflow definition can generated different DAG layouts across different DagRuns. So we want users to be able to view these different DAG layouts for different DagRuns, and when they perform operation on these DagRuns (clear/mark_success), they will be able to perform it on the tasks from that particular  "DagRun Snapshot" instead of the latest DAG layout. Thus we created a new type of DAG called DynamicDAG, and implemented the DAG versioning support for it across the webserver/scheduler/worker modules (and make it not interfere with the regular DAGs). In short, the DAG versioning is implemented at the DagRun level and we use the execution_date as the version information. In the scheduler side, when the scheduling process schedules a dagrun or a dagrun got triggered from UI/CLI for the DynamicDAG, the system will serialize the DAG layout and upload it to data store (S3 + mysql db in our case). And when the scheduler is scheduling DagRuns and tasks from DagRuns, the serialized DAG layout will be fetched and deserialized. In the webserver side, we updated the UI views to be able to render different DAG layouts based on the DagRun version. Same in the worker side, when it execute a task, the worker will first retrieved the versioned DAG layout and then retrieve the task from it.

As to the DAG layout serialization, we have two approaches based on our use cases:

We built a serialization module based on the python dill <https://pypi.org/project/dill/> package which is a pickling solution. This approach is used in our auto-migrated workflow, where we utilize airflow to schedule DAGs that got migrated from our legacy workflow system directly. These migrated workflows do not have DAG files and we translate them into airflow DAG on the fly. The workflow job will be translated into essentially "CommandOperator" that doesn't have complicated data structure which is safe to serialize/deserialize. We have productionized this approach.
(WIP) We also want to build a customized serialization module (potentially based on the Airflow serialization module). The idea is that while serializing a task, we only save the necessary metadata such as the classpath, operator type .etc and when running these tasks, the actual operator will be loaded based on these metadata. Once this is ready, we plan to use it for the auto-migration use case as well to get ride of pickling entirely.

In addition, since we are using the K8SExecutor, each task execution runs inside its own container and we record the dag code version/image version when a DagRun is serialized, so we don't need to worry too much about shipping code to the worker as it has been taken care of our CI/CD pipeline. But do agree that we will need to have a more generic way to address it.

So to sum up, the differences between this proposal and what we implemented at Pinterest are:

The DAG Versioning we implemented only applies to the DynamicDAG object to fulfill that particular use case. We did it this way to limit the infra changes needed but it can potentially be applied to all DAGs.
We supported the DAG versioning at the DagRun level.
We didn't provide a generic solution to address the code shipping concerns but built it based on our own infra setup.

Looking forward to hear feedbacks from the community and we are happy to share a more detailed design doc. Also let us know if this thread is the correct place to discuss it or should I create a new thread. Thanks

Best,

Yulei

Pinterest | Software Engineer

On 2020/08/13 20:31:55, Jarek Potiuk <J....@polidea.com> wrote: 
> I fully agree with the "user" not having to know any of the "wheel'> 
> details. Similarly as they do not have to know python interpreter or the> 
> underlying libc library details. This  all should be hidden from the users.> 
> 
> I think the wheels API that we might have there, does not have to be> 
> user-facing. We could - rather easily - make a client that points to a DAG> 
> file and builds appropriate wheel package under-the-hood and submits it. I> 
> reallly doubt any of the users will directly use the API to submit DAGs -> 
> they will use some clients built on top of it.> 
> 
> I think we should separate the user side form the implementation -> 
> similarly as we do not expect the users to know any details on how "DAG> 
> Fetcher" should work - in any case with the DAG fetcher, we need to define> 
> how DAG fetcher will make sure about "atomicity" anyway - how to make sure> 
> that you get a "consistent" version of all the dependent python files when> 
> you fetch them? This is the part of DAG fetcher that i do not like because> 
> it assumes that "someone else" maintains the consistency and provides the> 
> "consistent view" somewhere on the "DAG Server" side (whatever the server> 
> side is).> 
> 
> There were many ideas about some kind of manifest describing the files etc,> 
> but I think all of that depends on some kind of ability of providing a> 
> "snapshot" of files that will be consistent set to execute. With 'DAG> 
> Fetcher" this is somthing that "DAG Fetching server" has to provide. It's> 
> super easy if that "server" is GIT - we already use it for GIT sync. But> 
> it's rather difficult to provide a good abstraction for it for "generic"> 
> DAG fetcher.> 
> 
> IMHO this is far easier to provide such consistent set at a "submission> 
> time". In pretty-much all cases, the user submitting the job already has> 
> consistent set of python files that the DAG uses. This is pretty much> 
> given. I think the job of the "submission" mechanism is to make a> 
> "snapshot" out of that consistent set and submit this snapshot, rather than> 
> individual files. Git provides it out of the box, but if we want to be> 
> generic - I see no other way than to build such "snapshot" locally. And> 
> Wheels seems like a very good candidate - if only it's an implementation> 
> detail and will be hidden from the users.> 
> 
> J.> 
> 
> 
> 
> 
> On Tue, Aug 11, 2020 at 8:33 PM Ash Berlin-Taylor <as...@apache.org> wrote:> 
> 
> > Anything to doing with the process of building wheels should be a "power> 
> > user" only feature, and should not be required for many users - many many> 
> > users of airflow are not primarily Python developers, but data scientists,> 
> > and needing them to understand anything about the python build toolchain is> 
> > too much of a learning curve for the benefit.> 
> >> 
> > After all it is very rare that people hit the multiple concurrent versions> 
> > of a dag.> 
> >> 
> > -ash> 
> >> 
> > On 10 August 2020 17:37:32 BST, Tomasz Urbaszek <tu...@apache.org>> 
> > wrote:> 
> > >I like the idea of wheels as this is probably the "most pythonic"> 
> > >solution. And "DAG version" is not only defined by DAG code but also> 
> > >by all dependencies the DAG uses (custom functions, libraries etc) and> 
> > >it seems that wheels can address that.> 
> > >> 
> > >However, I second Ash - keeping wheels in db doesn't sound good. In my> 
> > >opinion, DAG fetcher is the right solution and the idea surfaces every> 
> > >time we talk about serialization. This abstraction has a lot of pros> 
> > >as it allows a lot of customization (wheels, local fs, remote fs,> 
> > >wheels etc).> 
> > >> 
> > >Apart from that, if we decided to use wheels we should provide a CLI> 
> > >command to ease the process of building them. Also, I'm wondering> 
> > >about developers' workflow. Moving between code of different DAG> 
> > >version sounds easy if you use git but... what if someone doesn't use> 
> > >it?> 
> > >> 
> > >Tomek> 
> > >> 
> > >> 
> > >On Sat, Aug 8, 2020 at 9:49 AM Ash Berlin-Taylor <as...@apache.org>> 
> > >wrote:> 
> > >>> 
> > >> Quick comment (as I'm still mostly on paternity leave):> 
> > >>> 
> > >> Storing wheels in the db sounds like a bad Idea to me, especially if> 
> > >we need to store deps in there too (and if we don't store deps, then> 
> > >they are incomplete) - they could get very large, and I've stored blobs> 
> > >of ~10mb in postgres before: I don't recommend it. It "works" but> 
> > >operating it is tricky.> 
> > >>> 
> > >>> 
> > >>> 
> > >> > the API could simply accept "Wheel file + the Dag id"> 
> > >>> 
> > >> This sounds like a huge security risk.> 
> > >>> 
> > >>> 
> > >> My main concern with this idea is that it seems a lot of complexity> 
> > >we are putting on users. Doubly so if they are already using docker> 
> > >where there already exists an Ideal packaging and distribution that> 
> > >could contain dag + needed code.> 
> > >>> 
> > >> (Sorry for the brevity)> 
> > >>> 
> > >> -ash> 
> > >>> 
> > >>> 
> > >> On 2 August 2020 08:47:39 BST, Jarek Potiuk> 
> > ><Ja...@polidea.com> wrote:> 
> > >> >Few points from my sid (and proposal!):> 
> > >> >> 
> > >> >1) Agree with Max -  with a rather strong NO for pickles (however,> 
> > >> >indeed cloudpickle solves some of the problems). Pickles came up in> 
> > >> >our discussion in Polidea recently and the overall message was "no".> 
> > >I> 
> > >> >agree with Max here - if we can ship python code, turning that into> 
> > >> >pickle for transit makes little sense to me and brings a plethora of> 
> > >> >problems.> 
> > >> >> 
> > >> >2) I think indeed the versioning solution should treat the "DagRun"> 
> > >> >structure atomically. While I see why we would like to go with the> 
> > >> >UI/Scheduler only first rather than implementing them in the> 
> > >workers,> 
> > >> >adding the "mixed version" is where it breaks down IMHO. Reasoning> 
> > >> >about such "mixed version" dag is next to impossible. The current> 
> > >> >behavior is not well defined and non-deterministic (depends on> 
> > >> >scheduler delays, syncing, type of deployment, restarts of the works> 
> > >> >etc.) we are moving it up to UI (thus users) rather than solving the> 
> > >> >problem. So I am not a big fan of this and would rather solve it> 
> > >> >"well" with atomicity.> 
> > >> >> 
> > >> >3) I see the point of Dan as well - we had many discussions and many> 
> > >> >times the idea about "submitting" the DAG for execution via the API> 
> > >> >came up - and it makes sense IMHO.> 
> > >> >> 
> > >> >Proposal: Implement full versioning with code shipping via DB wheels> 
> > >> >BLOB (akin to serialized DAGs).> 
> > >> >> 
> > >> >I understand that the big issue is how to actually "ship" the code> 
> > >to> 
> > >> >the worker. And - maybe a wild idea - we can kill several birds with> 
> > >> >the same stone.> 
> > >> >> 
> > >> >There were plenty of discussions on how we could do that but one was> 
> > >> >never truly explored - using wheel packages.> 
> > >> >> 
> > >> >For those who do not know them, there is the PEP:> 
> > >> >https://www.python.org/dev/peps/pep-0427/> 
> > >> >> 
> > >> >Wheels allow to "package" python code in a standard way. They are> 
> > >> >portable ("purelib" + contain .py rather than .pyc code), they have> 
> > >> >metadata, versioning information, they can be signed for security,> 
> > >> >They can contain other packages or python code, Why don't we let> 
> > >> >scheduler to pack the fingerprinted version of the DAG in a .whl and> 
> > >> >store it as a blob in a DB next to the serialized form?> 
> > >> >> 
> > >> >There were concerns about the size of the code to keep in the DB -> 
> > >but> 
> > >> >we already use the DB for serialized DAGs and it works fine (I> 
> > >believe> 
> > >> >we only need to add compressing of the JSon serialized form - as> 
> > >we've> 
> > >> >learned from AirBnb during their talk at the Airflow Summit - wheels> 
> > >> >are already compressed). Also - each task will only need the> 
> > >> >particular "version" of one DAG so even if we keep many of them in> 
> > >the> 
> > >> >DB, the old version will pretty soon go "cold" and will never be> 
> > >> >retrieved (and most DBs will handle it well with caching/indexes).> 
> > >> >> 
> > >> >And if we want to add "callables" from other files - there is> 
> > >nothing> 
> > >> >to stop the person who defines dag to add list of files that should> 
> > >be> 
> > >> >packaged together with the main DAG file (additional_python_files => 
> > >> >["common/my_fantastic_library.py"] in DAG constructor). Or we could> 
> > >> >auto-add all files after the DAG gets imported (i.e. package> 
> > >> >automatically all files that are imported for that particular DAG> 
> > >from> 
> > >> >the "dags" folder"). That should be rather easy.> 
> > >> >> 
> > >> >This way we could ship the code to workers for the exact version> 
> > >that> 
> > >> >the DagRun uses. And they can be cached and unpacked/installed to a> 
> > >> >virtualenv for the execution of that single task. That should be> 
> > >super> 
> > >> >quick. Such virtualenv can be wiped out after execution.> 
> > >> >> 
> > >> >Then we got what Max wants (atomicity of DagRuns) and what Dan wants> 
> > >> >(the API could simply accept "Wheel file + the Dag id". We have the> 
> > >> >isolation between tasks running on the same worker (based on> 
> > >> >virtualenv) so that each process in the same worker can run a> 
> > >> >different version of the same Dag. We have much less confusion for> 
> > >the> 
> > >> >UI.> 
> > >> >> 
> > >> >Extra bonus 1: we can expand it to package different dependencies in> 
> > >> >the wheels as well - so that if an operator requires a different> 
> > >> >(newer) version of a python library, it could be packaged together> 
> > >> >with the DAG in the same .whl file. This is also a highly requested> 
> > >> >feature.> 
> > >> >Extra bonus 2: workers will stop depending on the DAG file mount (!)> 
> > >> >which was our long term goal and indeed as Dan mentioned - a great> 
> > >> >step towards multi-tenancy.> 
> > >> >> 
> > >> >J.> 
> > >> >> 
> > >> >> 
> > >> >> 
> > >> >> 
> > >> >> 
> > >> >> 
> > >> >On Fri, Jul 31, 2020 at 6:41 AM Maxime Beauchemin> 
> > >> ><ma...@gmail.com> wrote:> 
> > >> >>> 
> > >> >> Having tried it early on, I'd advocate pretty strongly against> 
> > >> >pickles and> 
> > >> >> would rather not get too deep into the why here. Short story is> 
> > >they> 
> > >> >can> 
> > >> >> pull the entire memory space or much more than you want, and it's> 
> > >> >> impossible to reason about where they end. For that reason and> 
> > >other> 
> > >> >> reasons, they're a security issue. Oh and some objects are not> 
> > >> >picklable> 
> > >> >> (Jinja templates! to name a problematic one...). I've also seen> 
> > >> >> secret-related classes that raise when pickled (thank god!).> 
> > >> >>> 
> > >> >> About callback and other things like that, it's quite a puzzle in> 
> > >> >python.> 
> > >> >> One solution would be to point to a python namespace> 
> > >> >> callback="preset.airflow_utils.slack_callback" and assume the> 
> > >> >function has> 
> > >> >> to exist in the remote interpreter. Personally I like the> 
> > >DagFetcher> 
> > >> >idea> 
> > >> >> (it could be great to get a pointer to that mailing list thread> 
> > >> >here),> 
> > >> >> specifically the GitDagFetcher. I don't know how [un]reasonable it> 
> > >> >is, but> 
> > >> >> I hate pickles so much that shipping source code around seems much> 
> > >> >more> 
> > >> >> reasonable to me. I think out there there's a talk from Mike Star> 
> > >> >about> 
> > >> >> Dataswarm at FB and he may mention how their workers may git> 
> > >shallow> 
> > >> >clone> 
> > >> >> the pipeline repo. Or maybe they use that "beautifully ugly" hack> 
> > >to> 
> > >> >use> 
> > >> >> a gitfs fuse [file system in user space] on the worker [could get> 
> > >> >deeper> 
> > >> >> into that, not sure how reasonable that is either].> 
> > >> >>> 
> > >> >> About fingerprints, a simple `start_date = datetime.now() -> 
> > >> >timedelta(1)`> 
> > >> >> may lead to a never-repeating fingerprint. From memory the spec> 
> > >> >doesn't> 
> > >> >> list out the properties considered to build the hash. It be> 
> > >helpful> 
> > >> >to> 
> > >> >> specify and review that list.> 
> > >> >>> 
> > >> >> Max> 
> > >> >>> 
> > >> >> On Wed, Jul 29, 2020 at 5:20 AM Kaxil Naik <ka...@gmail.com>> 
> > >> >wrote:> 
> > >> >>> 
> > >> >> > Thanks, both Max and Dan for your comments, please check my> 
> > >reply> 
> > >> >below:> 
> > >> >> >> 
> > >> >> >> 
> > >> >> > >  Personally I vote for a DAG version to be pinned and> 
> > >consistent> 
> > >> >for the> 
> > >> >> > > duration of the DAG run. Some of the reasons why:> 
> > >> >> > > - it's easier to reason about, and therefore visualize and> 
> > >> >troubleshoot> 
> > >> >> > > - it prevents some cases where dependencies are never met> 
> > >> >> > > - it prevents the explosion of artifact/metadata (one> 
> > >> >serialization per> 
> > >> >> > > dagrun as opposed to one per scheduler cycle) in the case of a> 
> > >> >dynamic> 
> > >> >> > DAG> 
> > >> >> > > whose fingerprint is never the same.> 
> > >> >> >> 
> > >> >> >> 
> > >> >> > In this AIP, we were only looking to fix the current "Viewing> 
> > >> >behaviour"> 
> > >> >> > and> 
> > >> >> > we were intentionally not changing the execution behaviour.> 
> > >> >> > The change you are suggesting means we need to introduce DAG> 
> > >> >Versioning for> 
> > >> >> > the> 
> > >> >> > workers too. This will need more work as can't use the> 
> > >Serialised> 
> > >> >> > Representation> 
> > >> >> > to run the task since users could use custom modules in a> 
> > >different> 
> > >> >part of> 
> > >> >> > code,> 
> > >> >> > example the PythonOperator has python_callable that allows> 
> > >running> 
> > >> >any> 
> > >> >> > arbitrary code.> 
> > >> >> > A similar case is with the *on_*_callbacks* defined on DAG.> 
> > >> >> >> 
> > >> >> > Based on the current scope of the AIP, we still plan to use the> 
> > >> >actual DAG> 
> > >> >> > files for the> 
> > >> >> > execution and not use Serialized DAGs for the workers.> 
> > >> >> >> 
> > >> >> > To account for all the custom modules we will have to start> 
> > >looking> 
> > >> >at> 
> > >> >> > pickle (cloudpickle).> 
> > >> >> >> 
> > >> >> > I'm certain that there are lots of> 
> > >> >> > > those DAGs out there, and that it will overwhelm the metadata> 
> > >> >database,> 
> > >> >> > and> 
> > >> >> > > confuse the users. For an hourly DAG is would mean 24 artifact> 
> > >> >per day> 
> > >> >> > > instead of 1000+> 
> > >> >> >> 
> > >> >> >> 
> > >> >> > What kind of dynamic DAGs are we talking about here, I would> 
> > >think> 
> > >> >the DAG> 
> > >> >> > signature won't change> 
> > >> >> > but I might be wrong, can you give an example, please.> 
> > >> >> >> 
> > >> >> > If backwards compatibility in behavior is a concern, I'd> 
> > >recommend> 
> > >> >adding a> 
> > >> >> > > flag to the DAG class and/or config and make sure we're doing> 
> > >the> 
> > >> >right> 
> > >> >> > > thing by default. People who want backward compatibility would> 
> > >> >have to> 
> > >> >> > > change that default. But again, that's a lot of extra and> 
> > >> >confusing> 
> > >> >> > > complexity that will likely be the source of bugs and user> 
> > >> >confusion.> 
> > >> >> > > Having a clear, easy to reason about execution model is super> 
> > >> >important.> 
> > >> >> >> 
> > >> >> > Think about visualizing a DAG that shapeshifted 5 times during> 
> > >its> 
> > >> >> > > execution, how does anyone make sense of that?> 
> > >> >> >> 
> > >> >> >> 
> > >> >> > Wouldn't that be an edge case? How often would someone change> 
> > >the> 
> > >> >DAG> 
> > >> >> > structure in the middle of> 
> > >> >>
[message truncated...]