You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Evgeny Shulman <ev...@databand.ai> on 2020/03/03 22:21:16 UTC

Re: [DISCUSS] AIP-31: Airflow functional DAG API

Hey Everybody

I am happy to contribute to this discussion! It's something that everybody
is missing at Airflow. If I am part of a discussion around orchestration
framework comparison,   "functional DAGs” arguments are the most used ones.
We are building a similar approach and would be happy to base it on this
AIP improvements.

These are my thoughts, concerns, and ideas around the AIP


   1.

   XComArgs ( great definition,  I assume this is what decorated function
   call return by default at DAGs building phase?)
   1.

      Should it include reference (task_id) to the former task? It will be
      nice to have a downstream operator being able to be linked to upstream
      automatically?
      2.

      `xcom_args.set_upstream`, `xcom_args_set_downstream` should be a nice
      addition. `xcom_args.operator` or `xcom_args.op` property can
help a lot to
      get back to the operator.
      2.

   It will be nice to have a way to provide airflow_context to the function
   itself, as it’s done in execute PythonOperator function right now ( I would
   suggest implementing the new mechanism, and passing everything as context
   keyword, instead of **context)
   3.

   Some thoughts  around @dag.task:
   1.

      It definitely helps to assign a function to a specific dag.
      2.

      it changes the standard behavior ( provide dag or “with dag” context
      statement).
      3.

      It binds a function to a specific dag so it can not be reused.
      4.

      In case the user decides to use 4.b only, how can he provide dag into
      a function? f(dag=dag)? Dag can be a user-space keyword at the function
      4.

   I assume the same mechanism that we use for XComArgs can be used for
   Operators as well. If “templated” parameter has value XComArgs, it should
   be processed with the Jinja engine, but what if I push the operator into
   args/kwargs of another operator?  We can automatically convert it to a real
   value from XCom on runtime. That will make this code possible. ` a =
   AOpearator(); b=your_function(some_parameter=a)`
   5.

   Some valuable extension for the AIP can be around XComArgs usage (it’s
   already mentioned in AIP description). Being able to write :

   @task

   def some_function():

       return  <some data structure>

   f_result = some_function()

   some_other_decorated_function( a= f_result[“some_key”])

6. Another option can be:
`f_result.op.get_xcom(“[some_key][0].some_function”)`.


7. Are you going to traverse all fields/kwargs of the operator? ( usually,
XCom transformation was applied only on templated fields)

The jinja template can be easily changed and the data can be wired. It has
a problem that it will always read the whole result, but it’s already
stored as pickle, so there are no other options here.

( I am still trying to get what is the right place for this kind of
feedback, mailing list or JIRA ticket?)

Regards,

Evgeny






On Wed, Feb 26, 2020 at 12:29 AM Gerard Casas Saez
<gc...@twitter.com.invalid> wrote:

> Short update on status after receiving some feedback on Slack and
> Confluence comments:
>
> • Updated description of PythonFunctionalOperator
> • Added a few clarifying comments on how each component change plays along
> each other
>
> Reached out to Bolke to see if I can get his thoughts on how Lineage and
> Functional DAG API overlap (not sure what’s the best format for that
> discussion, but will be updating this thread as I go along).
>
> Unless there’s someone strongly against with the current proposal, I plan
> on submitting the proposal for vote by mid-next week and start planning
> implementation tasks after that. Haven’t been able to find a good guideline
> for this process, mainly looking at other completed AIP. Let me know if I
> should wait more.
>
> Best,
>
> Gerard Casas Saez
> Twitter | Cortex | @casassaez
> On Feb 24, 2020, 10:31 AM -0700, Jarek Potiuk <Ja...@polidea.com>,
> wrote:
> > Ah yeah... I totally forgot about that :) (shame on me) ... But it does
> > seem appropriate if I came to the same conclusion again looking from
> > another angle.... :D
> >
> > J.
> >
> >
> > On Mon, Feb 24, 2020 at 6:25 PM Gerard Casas Saez
> > <gc...@twitter.com.invalid> wrote:
> >
> > > Agree, I initially pitched the idea on the lineage thread and was
> > > encouraged to pitch it separately. I would love to help figure out how
> to
> > > align this 2 projects better.
> > >
> > > Bolke - want to set up a call or how should we discuss this better?
> Would
> > > love to hear feedback on my proposal.
> > >
> > > Gerard Casas Saez
> > > Twitter | Cortex | @casassaez
> > > On Feb 23, 2020, 1:44 AM -0700, Jarek Potiuk <Jarek.Potiuk@polidea.com
> >,
> > > wrote:
> > > > I like the idea a lot. Good direction.
> > > >
> > > > I know we have a few people who are better in functional thinking
> than me
> > > > so I think I would love those people to work it out. Happy to listen
> to
> > > the
> > > > discussions :)
> > > >
> > > > One thing that struck me however. I am not sure if that falls in the
> same
> > > > camp, but I have a feeling that there is at least some common part
> with
> > > the
> > > > proposal from Bolke about the Lineage.
> > > > Those two things (functional DAG API and Lineage) are not directly
> > > > connected but I think the design of both has at least some common
> part
> > > and
> > > > it would make sense that at least we talk about this and how they
> play
> > > > together.
> > > >
> > > > WDYT Bolke?
> > > >
> > > >
> > > > J
> > > >
> > > > On Fri, Feb 21, 2020 at 7:13 PM Dan Davydov
> <ddavydov@twitter.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Here is the link to the AIP for folk's convenience:
> > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-31%3A+Airflow+functional+DAG+API
> > > > >
> > > > > The proposal and this all looks really good to me :)! I do want to
> > > call out
> > > > > to others that it's important we get the interface 95%+ right from
> the
> > > > > get-go since it could be hard to change later once users start
> > > depending on
> > > > > it.
> > > > >
> > > > > On Fri, Feb 21, 2020 at 12:07 PM Gerard Casas Saez
> > > > > <gc...@twitter.com.invalid> wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > Sending a new message to everyone to gather feedback on the
> AIP-31
> > > about
> > > > > > Airflow functional DAG API. This was initially discussed and
> > > proposed in
> > > > > > [DISCUSS] Airflow functional DAGs. After leaving open a small
> doc to
> > > > > > iterate on the proposal for a couple weeks, I decided to move
> > > forward to
> > > > > > formalize it as an AIP document in confluence. I will still be
> > > tracking
> > > > > > comments on the doc but would rather move the conversation here.
> > > > > >
> > > > > > AIP - 31 focuses on solving the issue about implicit message
> passing
> > > in
> > > > > > Airflow by extending the DAG/Operator API to include a way to
> set up
> > > a
> > > > > > message passing dependency on the same DAG file. This is
> > > complimentary to
> > > > > > task dependency declaration and is intended to be used as another
> > > option
> > > > > to
> > > > > > declare dependencies by declaring message dependencies.
> > > > > >
> > > > > > In addition, AIP-31 proposes a way to declare PythonOperators
> from a
> > > > > > function using decorators. This should help embed custom behavior
> > > into
> > > > > DAGs
> > > > > > without needing to create custom operators for everything.
> > > > > >
> > > > > > Changes proposed:
> > > > > >
> > > > > >
> > > > > > • Add __call__ function in BaseOperator: Add a functional
> interface
> > > to
> > > > > > replace class attributes on execution time.
> > > > > > • XComArg class: This object is a reference to an XCom value that
> > > has not
> > > > > > been created and will need to be resolved in the future.
> > > > > > • PythonFunctionalOperator and Python Function Operator: Extend
> > > > > > PythonOperator to map op_args and op_kwargs from a decorated
> > > interface
> > > > > for
> > > > > > easier set up and add a decorator to create PythonOperators from
> a
> > > > > function
> > > > > > in an easier way.
> > > > > >
> > > > > >
> > > > > > See AIP document for a DAG example.
> > > > > >
> > > > > > Any help on how to proceed with this will be appreciated (also
> joined
> > > > > > Slack and posted in. #airflow-creative). My guess is once we
> agree on
> > > > > what
> > > > > > the API should look like, next step is to do a vote and if
> > > successful to
> > > > > > create JIRA issues and GitHub PRs with the mentioned changes.
> > > > > >
> > > > > > Please let me know if there’s any aspect that people feel
> > > > > > strongly opinionated against or aspects that are not clear and I
> > > should
> > > > > > work on expanding further.
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > Gerard Casas Saez
> > > > > > Twitter | Cortex | @casassaez
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Jarek Potiuk
> > > > Polidea <https://www.polidea.com/> | Principal Software Engineer
> > > >
> > > > M: +48 660 796 129 <+48660796129>
> > > > [image: Polidea] <https://www.polidea.com/>
> > >
> >
> >
> > --
> >
> > Jarek Potiuk
> > Polidea <https://www.polidea.com/> | Principal Software Engineer
> >
> > M: +48 660 796 129 <+48660796129>
> > [image: Polidea] <https://www.polidea.com/>
>

Re: [DISCUSS] AIP-31: Airflow functional DAG API

Posted by Kaxil Naik <ka...@gmail.com>.
>
> We should maybe move this conversation to slack if there’s anything
> specific in the AIP that I did not cover. Otherwise, I will submit it for
> vote and start breaking down the work in issues (I guess now GitHub
> issues?).


Yes, now that the Vote has passed, please feel free to start creating
Github issues so that we can get this feature in.

Regards,
Kaxil

On Fri, Mar 20, 2020 at 12:37 AM Gerard Casas Saez
<gc...@twitter.com.invalid> wrote:

> Hi everyone,
>
> Sorry for leaving this unfinished. I was commenting an extension that
> Tomasz suggested in the AIP doc, to add fields in the operator that serve
> as templated args. This should be pretty easy to do if extending templated
> args. Not sure if doing it extending templated_fields or add a new class
> attribute like `xcom_fields`. For context, Tomasz was suggesting allowing
> message passing like this:
>
> send_email = EmailOperator(
>       task_id=‘send_email’,
>       to="example@example.com”,
>       subject=email_subject_generator(ip_info),
>       html_content=email_body_generator(ip_info)
>   )
>
>
> My replies to Evgeny suggestions:
>
>
> • XComArgs does include the _operator instance (see proposed
> implementation here). I added a clarification on the AIP.
>     • Added op as a key to get the origin operator
>     • Dependency link between tasks is added automatically in the __call__
> function
> • Regarding context, you can set PythonOperator args/kwargs in the
> decorator.
>     • Added that context will be added as a dictionary in the first
> argument of the function
>     • Example: @task(with_context=True) should load the context as the
> first arg. Updated the example in the doc to include this.
> •  Specifying which DAG to use
>     • Either using dag.task
>     • @task(dag=dag)
>     • If task has no DAG then you can use decorated_function.dag = dag
> which should allow for reuse. In addition, I propose a copy method that
> copies the python function into a new operator. Not sure if it may be
> useful to also allow to pass the DAG there. Example usage
> • I would keep the options to either specified templated_fields or fields
> specified in __call__
> • Not sure if I follow what you mean in 5, 6
> •
>
> We should maybe move this conversation to slack if there’s anything
> specific in the AIP that I did not cover. Otherwise, I will submit it for
> vote and start breaking down the work in issues (I guess now GitHub
> issues?).
>
> Gerard Casas Saez
> Twitter | Cortex | @casassaez
> On Mar 3, 2020, 3:21 PM -0700, Evgeny Shulman <ev...@databand.ai>,
> wrote:
> > Hey Everybody
> >
> > I am happy to contribute to this discussion! It's something that
> everybody
> > is missing at Airflow. If I am part of a discussion around orchestration
> > framework comparison, "functional DAGs” arguments are the most used ones.
> > We are building a similar approach and would be happy to base it on this
> > AIP improvements.
> >
> > These are my thoughts, concerns, and ideas around the AIP
> >
> >
> > 1.
> >
> > XComArgs ( great definition, I assume this is what decorated function
> > call return by default at DAGs building phase?)
> > 1.
> >
> > Should it include reference (task_id) to the former task? It will be
> > nice to have a downstream operator being able to be linked to upstream
> > automatically?
> > 2.
> >
> > `xcom_args.set_upstream`, `xcom_args_set_downstream` should be a nice
> > addition. `xcom_args.operator` or `xcom_args.op` property can
> > help a lot to
> > get back to the operator.
> > 2.
> >
> > It will be nice to have a way to provide airflow_context to the function
> > itself, as it’s done in execute PythonOperator function right now ( I
> would
> > suggest implementing the new mechanism, and passing everything as context
> > keyword, instead of **context)
> > 3.
> >
> > Some thoughts around @dag.task:
> > 1.
> >
> > It definitely helps to assign a function to a specific dag.
> > 2.
> >
> > it changes the standard behavior ( provide dag or “with dag” context
> > statement).
> > 3.
> >
> > It binds a function to a specific dag so it can not be reused.
> > 4.
> >
> > In case the user decides to use 4.b only, how can he provide dag into
> > a function? f(dag=dag)? Dag can be a user-space keyword at the function
> > 4.
> >
> > I assume the same mechanism that we use for XComArgs can be used for
> > Operators as well. If “templated” parameter has value XComArgs, it should
> > be processed with the Jinja engine, but what if I push the operator into
> > args/kwargs of another operator? We can automatically convert it to a
> real
> > value from XCom on runtime. That will make this code possible. ` a =
> > AOpearator(); b=your_function(some_parameter=a)`
> > 5.
> >
> > Some valuable extension for the AIP can be around XComArgs usage (it’s
> > already mentioned in AIP description). Being able to write :
> >
> > @task
> >
> > def some_function():
> >
> > return <some data structure>
> >
> > f_result = some_function()
> >
> > some_other_decorated_function( a= f_result[“some_key”])
> >
> > 6. Another option can be:
> > `f_result.op.get_xcom(“[some_key][0].some_function”)`.
> >
> >
> > 7. Are you going to traverse all fields/kwargs of the operator? (
> usually,
> > XCom transformation was applied only on templated fields)
> >
> > The jinja template can be easily changed and the data can be wired. It
> has
> > a problem that it will always read the whole result, but it’s already
> > stored as pickle, so there are no other options here.
> >
> > ( I am still trying to get what is the right place for this kind of
> > feedback, mailing list or JIRA ticket?)
> >
> > Regards,
> >
> > Evgeny
> >
> >
> >
> >
> >
> >
> > On Wed, Feb 26, 2020 at 12:29 AM Gerard Casas Saez
> > <gc...@twitter.com.invalid> wrote:
> >
> > > Short update on status after receiving some feedback on Slack and
> > > Confluence comments:
> > >
> > > • Updated description of PythonFunctionalOperator
> > > • Added a few clarifying comments on how each component change plays
> along
> > > each other
> > >
> > > Reached out to Bolke to see if I can get his thoughts on how Lineage
> and
> > > Functional DAG API overlap (not sure what’s the best format for that
> > > discussion, but will be updating this thread as I go along).
> > >
> > > Unless there’s someone strongly against with the current proposal, I
> plan
> > > on submitting the proposal for vote by mid-next week and start planning
> > > implementation tasks after that. Haven’t been able to find a good
> guideline
> > > for this process, mainly looking at other completed AIP. Let me know
> if I
> > > should wait more.
> > >
> > > Best,
> > >
> > > Gerard Casas Saez
> > > Twitter | Cortex | @casassaez
> > > On Feb 24, 2020, 10:31 AM -0700, Jarek Potiuk <
> Jarek.Potiuk@polidea.com>,
> > > wrote:
> > > > Ah yeah... I totally forgot about that :) (shame on me) ... But it
> does
> > > > seem appropriate if I came to the same conclusion again looking from
> > > > another angle.... :D
> > > >
> > > > J.
> > > >
> > > >
> > > > On Mon, Feb 24, 2020 at 6:25 PM Gerard Casas Saez
> > > > <gc...@twitter.com.invalid> wrote:
> > > >
> > > > > Agree, I initially pitched the idea on the lineage thread and was
> > > > > encouraged to pitch it separately. I would love to help figure out
> how
> > > to
> > > > > align this 2 projects better.
> > > > >
> > > > > Bolke - want to set up a call or how should we discuss this better?
> > > Would
> > > > > love to hear feedback on my proposal.
> > > > >
> > > > > Gerard Casas Saez
> > > > > Twitter | Cortex | @casassaez
> > > > > On Feb 23, 2020, 1:44 AM -0700, Jarek Potiuk <
> Jarek.Potiuk@polidea.com
> > > > ,
> > > > > wrote:
> > > > > > I like the idea a lot. Good direction.
> > > > > >
> > > > > > I know we have a few people who are better in functional thinking
> > > than me
> > > > > > so I think I would love those people to work it out. Happy to
> listen
> > > to
> > > > > the
> > > > > > discussions :)
> > > > > >
> > > > > > One thing that struck me however. I am not sure if that falls in
> the
> > > same
> > > > > > camp, but I have a feeling that there is at least some common
> part
> > > with
> > > > > the
> > > > > > proposal from Bolke about the Lineage.
> > > > > > Those two things (functional DAG API and Lineage) are not
> directly
> > > > > > connected but I think the design of both has at least some common
> > > part
> > > > > and
> > > > > > it would make sense that at least we talk about this and how they
> > > play
> > > > > > together.
> > > > > >
> > > > > > WDYT Bolke?
> > > > > >
> > > > > >
> > > > > > J
> > > > > >
> > > > > > On Fri, Feb 21, 2020 at 7:13 PM Dan Davydov
> > > <ddavydov@twitter.com.invalid
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Here is the link to the AIP for folk's convenience:
> > > > > > >
> > > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-31%3A+Airflow+functional+DAG+API
> > > > > > >
> > > > > > > The proposal and this all looks really good to me :)! I do
> want to
> > > > > call out
> > > > > > > to others that it's important we get the interface 95%+ right
> from
> > > the
> > > > > > > get-go since it could be hard to change later once users start
> > > > > depending on
> > > > > > > it.
> > > > > > >
> > > > > > > On Fri, Feb 21, 2020 at 12:07 PM Gerard Casas Saez
> > > > > > > <gc...@twitter.com.invalid> wrote:
> > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > Sending a new message to everyone to gather feedback on the
> > > AIP-31
> > > > > about
> > > > > > > > Airflow functional DAG API. This was initially discussed and
> > > > > proposed in
> > > > > > > > [DISCUSS] Airflow functional DAGs. After leaving open a small
> > > doc to
> > > > > > > > iterate on the proposal for a couple weeks, I decided to move
> > > > > forward to
> > > > > > > > formalize it as an AIP document in confluence. I will still
> be
> > > > > tracking
> > > > > > > > comments on the doc but would rather move the conversation
> here.
> > > > > > > >
> > > > > > > > AIP - 31 focuses on solving the issue about implicit message
> > > passing
> > > > > in
> > > > > > > > Airflow by extending the DAG/Operator API to include a way to
> > > set up
> > > > > a
> > > > > > > > message passing dependency on the same DAG file. This is
> > > > > complimentary to
> > > > > > > > task dependency declaration and is intended to be used as
> another
> > > > > option
> > > > > > > to
> > > > > > > > declare dependencies by declaring message dependencies.
> > > > > > > >
> > > > > > > > In addition, AIP-31 proposes a way to declare PythonOperators
> > > from a
> > > > > > > > function using decorators. This should help embed custom
> behavior
> > > > > into
> > > > > > > DAGs
> > > > > > > > without needing to create custom operators for everything.
> > > > > > > >
> > > > > > > > Changes proposed:
> > > > > > > >
> > > > > > > >
> > > > > > > > • Add __call__ function in BaseOperator: Add a functional
> > > interface
> > > > > to
> > > > > > > > replace class attributes on execution time.
> > > > > > > > • XComArg class: This object is a reference to an XCom value
> that
> > > > > has not
> > > > > > > > been created and will need to be resolved in the future.
> > > > > > > > • PythonFunctionalOperator and Python Function Operator:
> Extend
> > > > > > > > PythonOperator to map op_args and op_kwargs from a decorated
> > > > > interface
> > > > > > > for
> > > > > > > > easier set up and add a decorator to create PythonOperators
> from
> > > a
> > > > > > > function
> > > > > > > > in an easier way.
> > > > > > > >
> > > > > > > >
> > > > > > > > See AIP document for a DAG example.
> > > > > > > >
> > > > > > > > Any help on how to proceed with this will be appreciated
> (also
> > > joined
> > > > > > > > Slack and posted in. #airflow-creative). My guess is once we
> > > agree on
> > > > > > > what
> > > > > > > > the API should look like, next step is to do a vote and if
> > > > > successful to
> > > > > > > > create JIRA issues and GitHub PRs with the mentioned changes.
> > > > > > > >
> > > > > > > > Please let me know if there’s any aspect that people feel
> > > > > > > > strongly opinionated against or aspects that are not clear
> and I
> > > > > should
> > > > > > > > work on expanding further.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > >
> > > > > > > > Gerard Casas Saez
> > > > > > > > Twitter | Cortex | @casassaez
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Jarek Potiuk
> > > > > > Polidea <https://www.polidea.com/> | Principal Software Engineer
> > > > > >
> > > > > > M: +48 660 796 129 <+48660796129>
> > > > > > [image: Polidea] <https://www.polidea.com/>
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Jarek Potiuk
> > > > Polidea <https://www.polidea.com/> | Principal Software Engineer
> > > >
> > > > M: +48 660 796 129 <+48660796129>
> > > > [image: Polidea] <https://www.polidea.com/>
> > >
>

Re: [DISCUSS] AIP-31: Airflow functional DAG API

Posted by Tomasz Urbaszek <tu...@apache.org>.
Hi all,

Should we proceed with the vote?

T.

On Fri, Mar 20, 2020 at 1:37 AM Gerard Casas Saez
<gc...@twitter.com.invalid> wrote:

> Hi everyone,
>
> Sorry for leaving this unfinished. I was commenting an extension that
> Tomasz suggested in the AIP doc, to add fields in the operator that serve
> as templated args. This should be pretty easy to do if extending templated
> args. Not sure if doing it extending templated_fields or add a new class
> attribute like `xcom_fields`. For context, Tomasz was suggesting allowing
> message passing like this:
>
> send_email = EmailOperator(
>       task_id=‘send_email’,
>       to="example@example.com”,
>       subject=email_subject_generator(ip_info),
>       html_content=email_body_generator(ip_info)
>   )
>
>
> My replies to Evgeny suggestions:
>
>
> • XComArgs does include the _operator instance (see proposed
> implementation here). I added a clarification on the AIP.
>     • Added op as a key to get the origin operator
>     • Dependency link between tasks is added automatically in the __call__
> function
> • Regarding context, you can set PythonOperator args/kwargs in the
> decorator.
>     • Added that context will be added as a dictionary in the first
> argument of the function
>     • Example: @task(with_context=True) should load the context as the
> first arg. Updated the example in the doc to include this.
> •  Specifying which DAG to use
>     • Either using dag.task
>     • @task(dag=dag)
>     • If task has no DAG then you can use decorated_function.dag = dag
> which should allow for reuse. In addition, I propose a copy method that
> copies the python function into a new operator. Not sure if it may be
> useful to also allow to pass the DAG there. Example usage
> • I would keep the options to either specified templated_fields or fields
> specified in __call__
> • Not sure if I follow what you mean in 5, 6
> •
>
> We should maybe move this conversation to slack if there’s anything
> specific in the AIP that I did not cover. Otherwise, I will submit it for
> vote and start breaking down the work in issues (I guess now GitHub
> issues?).
>
> Gerard Casas Saez
> Twitter | Cortex | @casassaez
> On Mar 3, 2020, 3:21 PM -0700, Evgeny Shulman <ev...@databand.ai>,
> wrote:
> > Hey Everybody
> >
> > I am happy to contribute to this discussion! It's something that
> everybody
> > is missing at Airflow. If I am part of a discussion around orchestration
> > framework comparison, "functional DAGs” arguments are the most used ones.
> > We are building a similar approach and would be happy to base it on this
> > AIP improvements.
> >
> > These are my thoughts, concerns, and ideas around the AIP
> >
> >
> > 1.
> >
> > XComArgs ( great definition, I assume this is what decorated function
> > call return by default at DAGs building phase?)
> > 1.
> >
> > Should it include reference (task_id) to the former task? It will be
> > nice to have a downstream operator being able to be linked to upstream
> > automatically?
> > 2.
> >
> > `xcom_args.set_upstream`, `xcom_args_set_downstream` should be a nice
> > addition. `xcom_args.operator` or `xcom_args.op` property can
> > help a lot to
> > get back to the operator.
> > 2.
> >
> > It will be nice to have a way to provide airflow_context to the function
> > itself, as it’s done in execute PythonOperator function right now ( I
> would
> > suggest implementing the new mechanism, and passing everything as context
> > keyword, instead of **context)
> > 3.
> >
> > Some thoughts around @dag.task:
> > 1.
> >
> > It definitely helps to assign a function to a specific dag.
> > 2.
> >
> > it changes the standard behavior ( provide dag or “with dag” context
> > statement).
> > 3.
> >
> > It binds a function to a specific dag so it can not be reused.
> > 4.
> >
> > In case the user decides to use 4.b only, how can he provide dag into
> > a function? f(dag=dag)? Dag can be a user-space keyword at the function
> > 4.
> >
> > I assume the same mechanism that we use for XComArgs can be used for
> > Operators as well. If “templated” parameter has value XComArgs, it should
> > be processed with the Jinja engine, but what if I push the operator into
> > args/kwargs of another operator? We can automatically convert it to a
> real
> > value from XCom on runtime. That will make this code possible. ` a =
> > AOpearator(); b=your_function(some_parameter=a)`
> > 5.
> >
> > Some valuable extension for the AIP can be around XComArgs usage (it’s
> > already mentioned in AIP description). Being able to write :
> >
> > @task
> >
> > def some_function():
> >
> > return <some data structure>
> >
> > f_result = some_function()
> >
> > some_other_decorated_function( a= f_result[“some_key”])
> >
> > 6. Another option can be:
> > `f_result.op.get_xcom(“[some_key][0].some_function”)`.
> >
> >
> > 7. Are you going to traverse all fields/kwargs of the operator? (
> usually,
> > XCom transformation was applied only on templated fields)
> >
> > The jinja template can be easily changed and the data can be wired. It
> has
> > a problem that it will always read the whole result, but it’s already
> > stored as pickle, so there are no other options here.
> >
> > ( I am still trying to get what is the right place for this kind of
> > feedback, mailing list or JIRA ticket?)
> >
> > Regards,
> >
> > Evgeny
> >
> >
> >
> >
> >
> >
> > On Wed, Feb 26, 2020 at 12:29 AM Gerard Casas Saez
> > <gc...@twitter.com.invalid> wrote:
> >
> > > Short update on status after receiving some feedback on Slack and
> > > Confluence comments:
> > >
> > > • Updated description of PythonFunctionalOperator
> > > • Added a few clarifying comments on how each component change plays
> along
> > > each other
> > >
> > > Reached out to Bolke to see if I can get his thoughts on how Lineage
> and
> > > Functional DAG API overlap (not sure what’s the best format for that
> > > discussion, but will be updating this thread as I go along).
> > >
> > > Unless there’s someone strongly against with the current proposal, I
> plan
> > > on submitting the proposal for vote by mid-next week and start planning
> > > implementation tasks after that. Haven’t been able to find a good
> guideline
> > > for this process, mainly looking at other completed AIP. Let me know
> if I
> > > should wait more.
> > >
> > > Best,
> > >
> > > Gerard Casas Saez
> > > Twitter | Cortex | @casassaez
> > > On Feb 24, 2020, 10:31 AM -0700, Jarek Potiuk <
> Jarek.Potiuk@polidea.com>,
> > > wrote:
> > > > Ah yeah... I totally forgot about that :) (shame on me) ... But it
> does
> > > > seem appropriate if I came to the same conclusion again looking from
> > > > another angle.... :D
> > > >
> > > > J.
> > > >
> > > >
> > > > On Mon, Feb 24, 2020 at 6:25 PM Gerard Casas Saez
> > > > <gc...@twitter.com.invalid> wrote:
> > > >
> > > > > Agree, I initially pitched the idea on the lineage thread and was
> > > > > encouraged to pitch it separately. I would love to help figure out
> how
> > > to
> > > > > align this 2 projects better.
> > > > >
> > > > > Bolke - want to set up a call or how should we discuss this better?
> > > Would
> > > > > love to hear feedback on my proposal.
> > > > >
> > > > > Gerard Casas Saez
> > > > > Twitter | Cortex | @casassaez
> > > > > On Feb 23, 2020, 1:44 AM -0700, Jarek Potiuk <
> Jarek.Potiuk@polidea.com
> > > > ,
> > > > > wrote:
> > > > > > I like the idea a lot. Good direction.
> > > > > >
> > > > > > I know we have a few people who are better in functional thinking
> > > than me
> > > > > > so I think I would love those people to work it out. Happy to
> listen
> > > to
> > > > > the
> > > > > > discussions :)
> > > > > >
> > > > > > One thing that struck me however. I am not sure if that falls in
> the
> > > same
> > > > > > camp, but I have a feeling that there is at least some common
> part
> > > with
> > > > > the
> > > > > > proposal from Bolke about the Lineage.
> > > > > > Those two things (functional DAG API and Lineage) are not
> directly
> > > > > > connected but I think the design of both has at least some common
> > > part
> > > > > and
> > > > > > it would make sense that at least we talk about this and how they
> > > play
> > > > > > together.
> > > > > >
> > > > > > WDYT Bolke?
> > > > > >
> > > > > >
> > > > > > J
> > > > > >
> > > > > > On Fri, Feb 21, 2020 at 7:13 PM Dan Davydov
> > > <ddavydov@twitter.com.invalid
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Here is the link to the AIP for folk's convenience:
> > > > > > >
> > > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-31%3A+Airflow+functional+DAG+API
> > > > > > >
> > > > > > > The proposal and this all looks really good to me :)! I do
> want to
> > > > > call out
> > > > > > > to others that it's important we get the interface 95%+ right
> from
> > > the
> > > > > > > get-go since it could be hard to change later once users start
> > > > > depending on
> > > > > > > it.
> > > > > > >
> > > > > > > On Fri, Feb 21, 2020 at 12:07 PM Gerard Casas Saez
> > > > > > > <gc...@twitter.com.invalid> wrote:
> > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > Sending a new message to everyone to gather feedback on the
> > > AIP-31
> > > > > about
> > > > > > > > Airflow functional DAG API. This was initially discussed and
> > > > > proposed in
> > > > > > > > [DISCUSS] Airflow functional DAGs. After leaving open a small
> > > doc to
> > > > > > > > iterate on the proposal for a couple weeks, I decided to move
> > > > > forward to
> > > > > > > > formalize it as an AIP document in confluence. I will still
> be
> > > > > tracking
> > > > > > > > comments on the doc but would rather move the conversation
> here.
> > > > > > > >
> > > > > > > > AIP - 31 focuses on solving the issue about implicit message
> > > passing
> > > > > in
> > > > > > > > Airflow by extending the DAG/Operator API to include a way to
> > > set up
> > > > > a
> > > > > > > > message passing dependency on the same DAG file. This is
> > > > > complimentary to
> > > > > > > > task dependency declaration and is intended to be used as
> another
> > > > > option
> > > > > > > to
> > > > > > > > declare dependencies by declaring message dependencies.
> > > > > > > >
> > > > > > > > In addition, AIP-31 proposes a way to declare PythonOperators
> > > from a
> > > > > > > > function using decorators. This should help embed custom
> behavior
> > > > > into
> > > > > > > DAGs
> > > > > > > > without needing to create custom operators for everything.
> > > > > > > >
> > > > > > > > Changes proposed:
> > > > > > > >
> > > > > > > >
> > > > > > > > • Add __call__ function in BaseOperator: Add a functional
> > > interface
> > > > > to
> > > > > > > > replace class attributes on execution time.
> > > > > > > > • XComArg class: This object is a reference to an XCom value
> that
> > > > > has not
> > > > > > > > been created and will need to be resolved in the future.
> > > > > > > > • PythonFunctionalOperator and Python Function Operator:
> Extend
> > > > > > > > PythonOperator to map op_args and op_kwargs from a decorated
> > > > > interface
> > > > > > > for
> > > > > > > > easier set up and add a decorator to create PythonOperators
> from
> > > a
> > > > > > > function
> > > > > > > > in an easier way.
> > > > > > > >
> > > > > > > >
> > > > > > > > See AIP document for a DAG example.
> > > > > > > >
> > > > > > > > Any help on how to proceed with this will be appreciated
> (also
> > > joined
> > > > > > > > Slack and posted in. #airflow-creative). My guess is once we
> > > agree on
> > > > > > > what
> > > > > > > > the API should look like, next step is to do a vote and if
> > > > > successful to
> > > > > > > > create JIRA issues and GitHub PRs with the mentioned changes.
> > > > > > > >
> > > > > > > > Please let me know if there’s any aspect that people feel
> > > > > > > > strongly opinionated against or aspects that are not clear
> and I
> > > > > should
> > > > > > > > work on expanding further.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > >
> > > > > > > > Gerard Casas Saez
> > > > > > > > Twitter | Cortex | @casassaez
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Jarek Potiuk
> > > > > > Polidea <https://www.polidea.com/> | Principal Software Engineer
> > > > > >
> > > > > > M: +48 660 796 129 <+48660796129>
> > > > > > [image: Polidea] <https://www.polidea.com/>
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Jarek Potiuk
> > > > Polidea <https://www.polidea.com/> | Principal Software Engineer
> > > >
> > > > M: +48 660 796 129 <+48660796129>
> > > > [image: Polidea] <https://www.polidea.com/>
> > >
>

Re: [DISCUSS] AIP-31: Airflow functional DAG API

Posted by Gerard Casas Saez <gc...@twitter.com.INVALID>.
Hi everyone,

Sorry for leaving this unfinished. I was commenting an extension that Tomasz suggested in the AIP doc, to add fields in the operator that serve as templated args. This should be pretty easy to do if extending templated args. Not sure if doing it extending templated_fields or add a new class attribute like `xcom_fields`. For context, Tomasz was suggesting allowing message passing like this:

send_email = EmailOperator(
      task_id=‘send_email’,
      to="example@example.com”,
      subject=email_subject_generator(ip_info),
      html_content=email_body_generator(ip_info)
  )


My replies to Evgeny suggestions:


• XComArgs does include the _operator instance (see proposed implementation here). I added a clarification on the AIP.
    • Added op as a key to get the origin operator
    • Dependency link between tasks is added automatically in the __call__ function
• Regarding context, you can set PythonOperator args/kwargs in the decorator.
    • Added that context will be added as a dictionary in the first argument of the function
    • Example: @task(with_context=True) should load the context as the first arg. Updated the example in the doc to include this.
•  Specifying which DAG to use
    • Either using dag.task
    • @task(dag=dag)
    • If task has no DAG then you can use decorated_function.dag = dag which should allow for reuse. In addition, I propose a copy method that copies the python function into a new operator. Not sure if it may be useful to also allow to pass the DAG there. Example usage
• I would keep the options to either specified templated_fields or fields specified in __call__
• Not sure if I follow what you mean in 5, 6
•

We should maybe move this conversation to slack if there’s anything specific in the AIP that I did not cover. Otherwise, I will submit it for vote and start breaking down the work in issues (I guess now GitHub issues?).

Gerard Casas Saez
Twitter | Cortex | @casassaez
On Mar 3, 2020, 3:21 PM -0700, Evgeny Shulman <ev...@databand.ai>, wrote:
> Hey Everybody
>
> I am happy to contribute to this discussion! It's something that everybody
> is missing at Airflow. If I am part of a discussion around orchestration
> framework comparison, "functional DAGs” arguments are the most used ones.
> We are building a similar approach and would be happy to base it on this
> AIP improvements.
>
> These are my thoughts, concerns, and ideas around the AIP
>
>
> 1.
>
> XComArgs ( great definition, I assume this is what decorated function
> call return by default at DAGs building phase?)
> 1.
>
> Should it include reference (task_id) to the former task? It will be
> nice to have a downstream operator being able to be linked to upstream
> automatically?
> 2.
>
> `xcom_args.set_upstream`, `xcom_args_set_downstream` should be a nice
> addition. `xcom_args.operator` or `xcom_args.op` property can
> help a lot to
> get back to the operator.
> 2.
>
> It will be nice to have a way to provide airflow_context to the function
> itself, as it’s done in execute PythonOperator function right now ( I would
> suggest implementing the new mechanism, and passing everything as context
> keyword, instead of **context)
> 3.
>
> Some thoughts around @dag.task:
> 1.
>
> It definitely helps to assign a function to a specific dag.
> 2.
>
> it changes the standard behavior ( provide dag or “with dag” context
> statement).
> 3.
>
> It binds a function to a specific dag so it can not be reused.
> 4.
>
> In case the user decides to use 4.b only, how can he provide dag into
> a function? f(dag=dag)? Dag can be a user-space keyword at the function
> 4.
>
> I assume the same mechanism that we use for XComArgs can be used for
> Operators as well. If “templated” parameter has value XComArgs, it should
> be processed with the Jinja engine, but what if I push the operator into
> args/kwargs of another operator? We can automatically convert it to a real
> value from XCom on runtime. That will make this code possible. ` a =
> AOpearator(); b=your_function(some_parameter=a)`
> 5.
>
> Some valuable extension for the AIP can be around XComArgs usage (it’s
> already mentioned in AIP description). Being able to write :
>
> @task
>
> def some_function():
>
> return <some data structure>
>
> f_result = some_function()
>
> some_other_decorated_function( a= f_result[“some_key”])
>
> 6. Another option can be:
> `f_result.op.get_xcom(“[some_key][0].some_function”)`.
>
>
> 7. Are you going to traverse all fields/kwargs of the operator? ( usually,
> XCom transformation was applied only on templated fields)
>
> The jinja template can be easily changed and the data can be wired. It has
> a problem that it will always read the whole result, but it’s already
> stored as pickle, so there are no other options here.
>
> ( I am still trying to get what is the right place for this kind of
> feedback, mailing list or JIRA ticket?)
>
> Regards,
>
> Evgeny
>
>
>
>
>
>
> On Wed, Feb 26, 2020 at 12:29 AM Gerard Casas Saez
> <gc...@twitter.com.invalid> wrote:
>
> > Short update on status after receiving some feedback on Slack and
> > Confluence comments:
> >
> > • Updated description of PythonFunctionalOperator
> > • Added a few clarifying comments on how each component change plays along
> > each other
> >
> > Reached out to Bolke to see if I can get his thoughts on how Lineage and
> > Functional DAG API overlap (not sure what’s the best format for that
> > discussion, but will be updating this thread as I go along).
> >
> > Unless there’s someone strongly against with the current proposal, I plan
> > on submitting the proposal for vote by mid-next week and start planning
> > implementation tasks after that. Haven’t been able to find a good guideline
> > for this process, mainly looking at other completed AIP. Let me know if I
> > should wait more.
> >
> > Best,
> >
> > Gerard Casas Saez
> > Twitter | Cortex | @casassaez
> > On Feb 24, 2020, 10:31 AM -0700, Jarek Potiuk <Ja...@polidea.com>,
> > wrote:
> > > Ah yeah... I totally forgot about that :) (shame on me) ... But it does
> > > seem appropriate if I came to the same conclusion again looking from
> > > another angle.... :D
> > >
> > > J.
> > >
> > >
> > > On Mon, Feb 24, 2020 at 6:25 PM Gerard Casas Saez
> > > <gc...@twitter.com.invalid> wrote:
> > >
> > > > Agree, I initially pitched the idea on the lineage thread and was
> > > > encouraged to pitch it separately. I would love to help figure out how
> > to
> > > > align this 2 projects better.
> > > >
> > > > Bolke - want to set up a call or how should we discuss this better?
> > Would
> > > > love to hear feedback on my proposal.
> > > >
> > > > Gerard Casas Saez
> > > > Twitter | Cortex | @casassaez
> > > > On Feb 23, 2020, 1:44 AM -0700, Jarek Potiuk <Jarek.Potiuk@polidea.com
> > > ,
> > > > wrote:
> > > > > I like the idea a lot. Good direction.
> > > > >
> > > > > I know we have a few people who are better in functional thinking
> > than me
> > > > > so I think I would love those people to work it out. Happy to listen
> > to
> > > > the
> > > > > discussions :)
> > > > >
> > > > > One thing that struck me however. I am not sure if that falls in the
> > same
> > > > > camp, but I have a feeling that there is at least some common part
> > with
> > > > the
> > > > > proposal from Bolke about the Lineage.
> > > > > Those two things (functional DAG API and Lineage) are not directly
> > > > > connected but I think the design of both has at least some common
> > part
> > > > and
> > > > > it would make sense that at least we talk about this and how they
> > play
> > > > > together.
> > > > >
> > > > > WDYT Bolke?
> > > > >
> > > > >
> > > > > J
> > > > >
> > > > > On Fri, Feb 21, 2020 at 7:13 PM Dan Davydov
> > <ddavydov@twitter.com.invalid
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Here is the link to the AIP for folk's convenience:
> > > > > >
> > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-31%3A+Airflow+functional+DAG+API
> > > > > >
> > > > > > The proposal and this all looks really good to me :)! I do want to
> > > > call out
> > > > > > to others that it's important we get the interface 95%+ right from
> > the
> > > > > > get-go since it could be hard to change later once users start
> > > > depending on
> > > > > > it.
> > > > > >
> > > > > > On Fri, Feb 21, 2020 at 12:07 PM Gerard Casas Saez
> > > > > > <gc...@twitter.com.invalid> wrote:
> > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > Sending a new message to everyone to gather feedback on the
> > AIP-31
> > > > about
> > > > > > > Airflow functional DAG API. This was initially discussed and
> > > > proposed in
> > > > > > > [DISCUSS] Airflow functional DAGs. After leaving open a small
> > doc to
> > > > > > > iterate on the proposal for a couple weeks, I decided to move
> > > > forward to
> > > > > > > formalize it as an AIP document in confluence. I will still be
> > > > tracking
> > > > > > > comments on the doc but would rather move the conversation here.
> > > > > > >
> > > > > > > AIP - 31 focuses on solving the issue about implicit message
> > passing
> > > > in
> > > > > > > Airflow by extending the DAG/Operator API to include a way to
> > set up
> > > > a
> > > > > > > message passing dependency on the same DAG file. This is
> > > > complimentary to
> > > > > > > task dependency declaration and is intended to be used as another
> > > > option
> > > > > > to
> > > > > > > declare dependencies by declaring message dependencies.
> > > > > > >
> > > > > > > In addition, AIP-31 proposes a way to declare PythonOperators
> > from a
> > > > > > > function using decorators. This should help embed custom behavior
> > > > into
> > > > > > DAGs
> > > > > > > without needing to create custom operators for everything.
> > > > > > >
> > > > > > > Changes proposed:
> > > > > > >
> > > > > > >
> > > > > > > • Add __call__ function in BaseOperator: Add a functional
> > interface
> > > > to
> > > > > > > replace class attributes on execution time.
> > > > > > > • XComArg class: This object is a reference to an XCom value that
> > > > has not
> > > > > > > been created and will need to be resolved in the future.
> > > > > > > • PythonFunctionalOperator and Python Function Operator: Extend
> > > > > > > PythonOperator to map op_args and op_kwargs from a decorated
> > > > interface
> > > > > > for
> > > > > > > easier set up and add a decorator to create PythonOperators from
> > a
> > > > > > function
> > > > > > > in an easier way.
> > > > > > >
> > > > > > >
> > > > > > > See AIP document for a DAG example.
> > > > > > >
> > > > > > > Any help on how to proceed with this will be appreciated (also
> > joined
> > > > > > > Slack and posted in. #airflow-creative). My guess is once we
> > agree on
> > > > > > what
> > > > > > > the API should look like, next step is to do a vote and if
> > > > successful to
> > > > > > > create JIRA issues and GitHub PRs with the mentioned changes.
> > > > > > >
> > > > > > > Please let me know if there’s any aspect that people feel
> > > > > > > strongly opinionated against or aspects that are not clear and I
> > > > should
> > > > > > > work on expanding further.
> > > > > > >
> > > > > > > Best,
> > > > > > >
> > > > > > > Gerard Casas Saez
> > > > > > > Twitter | Cortex | @casassaez
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Jarek Potiuk
> > > > > Polidea <https://www.polidea.com/> | Principal Software Engineer
> > > > >
> > > > > M: +48 660 796 129 <+48660796129>
> > > > > [image: Polidea] <https://www.polidea.com/>
> > > >
> > >
> > >
> > > --
> > >
> > > Jarek Potiuk
> > > Polidea <https://www.polidea.com/> | Principal Software Engineer
> > >
> > > M: +48 660 796 129 <+48660796129>
> > > [image: Polidea] <https://www.polidea.com/>
> >