You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Ash Berlin-Taylor <as...@apache.org> on 2022/06/09 13:32:20 UTC

[RESULT] AIP-48 Data Driven Scheduling

This vote is now passed with:

7 x +1 binding votes (Jed, Jarek, Vikram, Ephraim, Kaxil, Elad, Brent) 
and 5 non-binding (Drew, Pankaj, Dennis, Phani, Pierre)
0x -1 votes

We'll start work on this shortly (first step is to create a milestone 
and create initial tickets.)

Cheers,
Ash

On Wed, Jun 1 2022 at 17:34:13 +0100, Ash Berlin-Taylor 
<as...@apache.org> wrote:
> Hi All,
> 
> Now that Summit is over (well done all the speakers! The talks I've 
> caught so far have been great) I'm ready to push forward with Data 
> Driven Scheduling, and I would like to call for a vote on 
> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling>
> 
> The vote for last for 7 days, until 2022/06/07 at 16:30 UTC.
> 
> (This is my +1 vote)
> 
> I have just published updates to the AIP, hopefully to make the AIP 
> tighter in scope (and easier to implement too). The tl;dr of this AIP:
> 
> - Add a concept of Dataset (which is a uri-parsable str. Airflow 
> places no meaning on what the URI contains/means/is - "airflow:" 
> scheme is reserved)
> - A task "produces" a dataset by a) Having it in it's outlets 
> attribute, and b) finishing with SUCCESS. (That is, Airflow doesn't  
> know/care about data transfer/SQL tables etc. It is just conceptually)
> - A DAG says that it wants to be triggered when it's dataset (or any 
> of it's datasets) change. When this happens the scheduler will create 
> the dag run.
> 
> This is just a high level summary, please read the confluence page 
> for full details.
> 
> We have already thought about lots of ways we can (and will) extend 
> this in the over time, detailed in the "Future work" section. Our 
> goal with this AIP is to build the kernel of Data-aware Scheduling 
> that we can build on over time.
> 
> A teaser/example DAG that hopefully gives a clue as to what we are 
> talking about here:
> 
> ```
> import pandas as pd
> 
> from airflowimport dag, Dataset
> 
> 
> dataset= Dataset("s3://s3_default@some_bucket/order_data")
> @dag
> def my_dag():
> 
>     @dag.task(outlets=[dataset])
>     def producer():
>         # What this task actually does doesn't matter to Airflow, the 
> simple act of running to SUCCESS means the dataset
>         # is updated, and downstream dags will get triggered
>         ...
> 
> 
> 
> dataset= Dataset("s3://s3_default@some_bucket/order_data")
> @dag(schedule_on=dataset)
> def consuming_dag():
>     @dag.task
>     def consumer(uri):
>         df= pandas.read_from_s3(uri)
>         print(f" Dataset had {df.count()} rows")
> 
>     consumer(df=ref.uri)
> ```
> 
> If anyone has any changes you think are fundamental/foundational to 
> the core idea you have 1 week to raise it :) (Names of parameters we 
> can easily change as we implement this) Our desire is to get this 
> written and released Airflow 2.4.
> 
> Thanks,
> Ash
>