You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Ash Berlin-Taylor <as...@apache.org> on 2022/06/09 13:32:20 UTC
[RESULT] AIP-48 Data Driven Scheduling
This vote is now passed with:
7 x +1 binding votes (Jed, Jarek, Vikram, Ephraim, Kaxil, Elad, Brent)
and 5 non-binding (Drew, Pankaj, Dennis, Phani, Pierre)
0x -1 votes
We'll start work on this shortly (first step is to create a milestone
and create initial tickets.)
Cheers,
Ash
On Wed, Jun 1 2022 at 17:34:13 +0100, Ash Berlin-Taylor
<as...@apache.org> wrote:
> Hi All,
>
> Now that Summit is over (well done all the speakers! The talks I've
> caught so far have been great) I'm ready to push forward with Data
> Driven Scheduling, and I would like to call for a vote on
> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling>
>
> The vote for last for 7 days, until 2022/06/07 at 16:30 UTC.
>
> (This is my +1 vote)
>
> I have just published updates to the AIP, hopefully to make the AIP
> tighter in scope (and easier to implement too). The tl;dr of this AIP:
>
> - Add a concept of Dataset (which is a uri-parsable str. Airflow
> places no meaning on what the URI contains/means/is - "airflow:"
> scheme is reserved)
> - A task "produces" a dataset by a) Having it in it's outlets
> attribute, and b) finishing with SUCCESS. (That is, Airflow doesn't
> know/care about data transfer/SQL tables etc. It is just conceptually)
> - A DAG says that it wants to be triggered when it's dataset (or any
> of it's datasets) change. When this happens the scheduler will create
> the dag run.
>
> This is just a high level summary, please read the confluence page
> for full details.
>
> We have already thought about lots of ways we can (and will) extend
> this in the over time, detailed in the "Future work" section. Our
> goal with this AIP is to build the kernel of Data-aware Scheduling
> that we can build on over time.
>
> A teaser/example DAG that hopefully gives a clue as to what we are
> talking about here:
>
> ```
> import pandas as pd
>
> from airflowimport dag, Dataset
>
>
> dataset= Dataset("s3://s3_default@some_bucket/order_data")
> @dag
> def my_dag():
>
> @dag.task(outlets=[dataset])
> def producer():
> # What this task actually does doesn't matter to Airflow, the
> simple act of running to SUCCESS means the dataset
> # is updated, and downstream dags will get triggered
> ...
>
>
>
> dataset= Dataset("s3://s3_default@some_bucket/order_data")
> @dag(schedule_on=dataset)
> def consuming_dag():
> @dag.task
> def consumer(uri):
> df= pandas.read_from_s3(uri)
> print(f" Dataset had {df.count()} rows")
>
> consumer(df=ref.uri)
> ```
>
> If anyone has any changes you think are fundamental/foundational to
> the core idea you have 1 week to raise it :) (Names of parameters we
> can easily change as we implement this) Our desire is to get this
> written and released Airflow 2.4.
>
> Thanks,
> Ash
>