You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Bolke de Bruin <bd...@gmail.com> on 2017/02/08 06:36:25 UTC

Proposal for new task state "WAITING_ON_CALLBACK"

Hi All,

Now that we have an API in place. I would like to propose a new state for tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have a kind of polling mechanism (ie. Sensors) that wait for an action to happen and check if that action happened by regularly polling a particular backend. This will always use a slot from one of the workers and could starve an airflow cluster for resources. What if a callback to Airflow could happen that task to change its status by calling a callback mechanism without taking up a worker slot. A timeout could (should) be associated with the required callback so that the task can fail if required. So a bit more visual:


Task X from DAG Z  does some work and sets “WAITING_ON_CALLBACK” -> API post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set status to SUCCESS”

DAG Z happily continues.

Or

Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s -> time passes -> scheduler sets task to FAILED.


Any thoughts?

- Bolke

Re: Proposal for new task state "WAITING_ON_CALLBACK"

Posted by Maxime Beauchemin <ma...@gmail.com>.
Chiming in here.

* I think the term callback is a bit confusing, it collides with a
different definition in the javascript world
* I like the idea of a status that can only be altered externally (REST /
CLI / sqla / ...) and that the scheduler simply disregards (and probably it
handles the timeout too). Could be simply `ON_HOLD` or more verbose
`WAITING_FOR_EXTERNAL_TRIGGER`

I have a complementary / overlapping proposal in the area of rationalizing
the number of worker slots. My idea was to add an argument to
`BaseSensorOperator` to define whether the scheduler or a worker should
eval the condition. `evaled_by_worker=True`. If set to `False`, the sensor
would be evaluated by the scheduler as it processes the dependencies,
potentially liberating many worker slots. This of course assumes a
lightweight sensor.

Max


On Wed, Feb 8, 2017 at 12:45 PM, Alex Van Boxel <al...@vanboxel.be> wrote:

> Hey,
>
> here is my feedback, because I've been thinking about events as well. I
> would call it it 'WAITING_FOR_EVENT'. Here are the use-cases I would use it
> for:
>
> Have a thread (or process) listen on the Google Audit Log. It contains a
> lot of changes on the Google Project (Google DataProc finished, File in
> Bucket added, ...) and translate it to Events that Airflow understands.
>
>
> *DataProc/BigQuery/Storage*
>
>    1. Start DagProc job through the API (long running) - takes a
>    parallelism slot
>    2. Get JobId
>    3. Register for event: *gc:dataproc:job_id_xxxx* with timeout 300s
>    4. execute stops and parallelism slot is freed
>
> The google audit log is constantly translating log entries to airflow
> events. Airflow event_listener looks for specific events registered (could
> be wildcards). As soon as the event match it call's the callback:
>
>    1. on_event is called with *gc:dataproc:job_id_xxxx* and JSON with event
>    payload
>    2. handle JSON payload depending on payload throw *FAILURE* or *SUCCESS*
>    or register for other event
>
> It could be a timeout
>
>    1. on_event is called with *airflow:timeout:dag_id* timeout event
>    2. handle timeout with
>       1. throw FAILURE
>       2. try to recover by calling DataProc API and re-register new/or same
>       event
>
> *Bolke's callback example (mapped to this)*
>
>
>    1. task x for z does some work
>    2. register for event *api:callback:20170101T000000*
>    3. execute stops and parallelism slot is freed
>
> The API translated http callbacks into events
>
>    1. on_event is called with *api:callback:20170101T000000* and JSON with
>    event payload
>    2. handle JSON payload depending on payload throw *FAILURE* or
> *SUCCESS* or
>    register for other event
>
> It could be a timeout
>
>    1. on_event is called with *airflow:timeout:dag_id* timeout event
>    2. handle timeout with FAILURE
>
>
> So, what do you think?
>
>
> On Wed, Feb 8, 2017 at 2:40 PM Jeremiah Lowin <jl...@apache.org> wrote:
>
> I meant the API -- will check the wiki now. Thanks!
>
> On Wed, Feb 8, 2017 at 8:33 AM Bolke de Bruin <bd...@gmail.com> wrote:
>
> > On this proposal? No, not yet. Just popped in my mind yesterday. API
> there
> > is a bit on the wiki.
> >
> > > On 8 Feb 2017, at 14:31, Jeremiah Lowin <jl...@apache.org> wrote:
> > >
> > > Makes a lot of sense. At the NY meetup there was considerable interest
> in
> > > using the API (and quite a few hacks around exposing the CLI!) -- is
> > there
> > > more complete documentation anywhere?
> > >
> > > Thanks Bolke
> > >
> > > On Wed, Feb 8, 2017 at 1:36 AM Bolke de Bruin <bd...@gmail.com>
> wrote:
> > >
> > >> Hi All,
> > >>
> > >> Now that we have an API in place. I would like to propose a new state
> > for
> > >> tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have
> a
> > >> kind of polling mechanism (ie. Sensors) that wait for an action to
> > happen
> > >> and check if that action happened by regularly polling a particular
> > >> backend. This will always use a slot from one of the workers and could
> > >> starve an airflow cluster for resources. What if a callback to Airflow
> > >> could happen that task to change its status by calling a callback
> > mechanism
> > >> without taking up a worker slot. A timeout could (should) be
> associated
> > >> with the required callback so that the task can fail if required. So a
> > bit
> > >> more visual:
> > >>
> > >>
> > >> Task X from DAG Z  does some work and sets “WAITING_ON_CALLBACK” ->
> API
> > >> post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set
> > status
> > >> to SUCCESS”
> > >>
> > >> DAG Z happily continues.
> > >>
> > >> Or
> > >>
> > >> Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s ->
> > time
> > >> passes -> scheduler sets task to FAILED.
> > >>
> > >>
> > >> Any thoughts?
> > >>
> > >> - Bolke
> >
> >
>
> --
>   _/
> _/ Alex Van Boxel
>

Re: Proposal for new task state "WAITING_ON_CALLBACK"

Posted by Alex Van Boxel <al...@vanboxel.be>.
Hey,

here is my feedback, because I've been thinking about events as well. I
would call it it 'WAITING_FOR_EVENT'. Here are the use-cases I would use it
for:

Have a thread (or process) listen on the Google Audit Log. It contains a
lot of changes on the Google Project (Google DataProc finished, File in
Bucket added, ...) and translate it to Events that Airflow understands.


*DataProc/BigQuery/Storage*

   1. Start DagProc job through the API (long running) - takes a
   parallelism slot
   2. Get JobId
   3. Register for event: *gc:dataproc:job_id_xxxx* with timeout 300s
   4. execute stops and parallelism slot is freed

The google audit log is constantly translating log entries to airflow
events. Airflow event_listener looks for specific events registered (could
be wildcards). As soon as the event match it call's the callback:

   1. on_event is called with *gc:dataproc:job_id_xxxx* and JSON with event
   payload
   2. handle JSON payload depending on payload throw *FAILURE* or *SUCCESS*
   or register for other event

It could be a timeout

   1. on_event is called with *airflow:timeout:dag_id* timeout event
   2. handle timeout with
      1. throw FAILURE
      2. try to recover by calling DataProc API and re-register new/or same
      event

*Bolke's callback example (mapped to this)*


   1. task x for z does some work
   2. register for event *api:callback:20170101T000000*
   3. execute stops and parallelism slot is freed

The API translated http callbacks into events

   1. on_event is called with *api:callback:20170101T000000* and JSON with
   event payload
   2. handle JSON payload depending on payload throw *FAILURE* or *SUCCESS* or
   register for other event

It could be a timeout

   1. on_event is called with *airflow:timeout:dag_id* timeout event
   2. handle timeout with FAILURE


So, what do you think?


On Wed, Feb 8, 2017 at 2:40 PM Jeremiah Lowin <jl...@apache.org> wrote:

I meant the API -- will check the wiki now. Thanks!

On Wed, Feb 8, 2017 at 8:33 AM Bolke de Bruin <bd...@gmail.com> wrote:

> On this proposal? No, not yet. Just popped in my mind yesterday. API there
> is a bit on the wiki.
>
> > On 8 Feb 2017, at 14:31, Jeremiah Lowin <jl...@apache.org> wrote:
> >
> > Makes a lot of sense. At the NY meetup there was considerable interest
in
> > using the API (and quite a few hacks around exposing the CLI!) -- is
> there
> > more complete documentation anywhere?
> >
> > Thanks Bolke
> >
> > On Wed, Feb 8, 2017 at 1:36 AM Bolke de Bruin <bd...@gmail.com> wrote:
> >
> >> Hi All,
> >>
> >> Now that we have an API in place. I would like to propose a new state
> for
> >> tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have a
> >> kind of polling mechanism (ie. Sensors) that wait for an action to
> happen
> >> and check if that action happened by regularly polling a particular
> >> backend. This will always use a slot from one of the workers and could
> >> starve an airflow cluster for resources. What if a callback to Airflow
> >> could happen that task to change its status by calling a callback
> mechanism
> >> without taking up a worker slot. A timeout could (should) be associated
> >> with the required callback so that the task can fail if required. So a
> bit
> >> more visual:
> >>
> >>
> >> Task X from DAG Z  does some work and sets “WAITING_ON_CALLBACK” -> API
> >> post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set
> status
> >> to SUCCESS”
> >>
> >> DAG Z happily continues.
> >>
> >> Or
> >>
> >> Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s ->
> time
> >> passes -> scheduler sets task to FAILED.
> >>
> >>
> >> Any thoughts?
> >>
> >> - Bolke
>
>

-- 
  _/
_/ Alex Van Boxel

Re: Proposal for new task state "WAITING_ON_CALLBACK"

Posted by Jeremiah Lowin <jl...@apache.org>.
I meant the API -- will check the wiki now. Thanks!

On Wed, Feb 8, 2017 at 8:33 AM Bolke de Bruin <bd...@gmail.com> wrote:

> On this proposal? No, not yet. Just popped in my mind yesterday. API there
> is a bit on the wiki.
>
> > On 8 Feb 2017, at 14:31, Jeremiah Lowin <jl...@apache.org> wrote:
> >
> > Makes a lot of sense. At the NY meetup there was considerable interest in
> > using the API (and quite a few hacks around exposing the CLI!) -- is
> there
> > more complete documentation anywhere?
> >
> > Thanks Bolke
> >
> > On Wed, Feb 8, 2017 at 1:36 AM Bolke de Bruin <bd...@gmail.com> wrote:
> >
> >> Hi All,
> >>
> >> Now that we have an API in place. I would like to propose a new state
> for
> >> tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have a
> >> kind of polling mechanism (ie. Sensors) that wait for an action to
> happen
> >> and check if that action happened by regularly polling a particular
> >> backend. This will always use a slot from one of the workers and could
> >> starve an airflow cluster for resources. What if a callback to Airflow
> >> could happen that task to change its status by calling a callback
> mechanism
> >> without taking up a worker slot. A timeout could (should) be associated
> >> with the required callback so that the task can fail if required. So a
> bit
> >> more visual:
> >>
> >>
> >> Task X from DAG Z  does some work and sets “WAITING_ON_CALLBACK” -> API
> >> post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set
> status
> >> to SUCCESS”
> >>
> >> DAG Z happily continues.
> >>
> >> Or
> >>
> >> Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s ->
> time
> >> passes -> scheduler sets task to FAILED.
> >>
> >>
> >> Any thoughts?
> >>
> >> - Bolke
>
>

Re: Proposal for new task state "WAITING_ON_CALLBACK"

Posted by Bolke de Bruin <bd...@gmail.com>.
On this proposal? No, not yet. Just popped in my mind yesterday. API there is a bit on the wiki.

> On 8 Feb 2017, at 14:31, Jeremiah Lowin <jl...@apache.org> wrote:
> 
> Makes a lot of sense. At the NY meetup there was considerable interest in
> using the API (and quite a few hacks around exposing the CLI!) -- is there
> more complete documentation anywhere?
> 
> Thanks Bolke
> 
> On Wed, Feb 8, 2017 at 1:36 AM Bolke de Bruin <bd...@gmail.com> wrote:
> 
>> Hi All,
>> 
>> Now that we have an API in place. I would like to propose a new state for
>> tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have a
>> kind of polling mechanism (ie. Sensors) that wait for an action to happen
>> and check if that action happened by regularly polling a particular
>> backend. This will always use a slot from one of the workers and could
>> starve an airflow cluster for resources. What if a callback to Airflow
>> could happen that task to change its status by calling a callback mechanism
>> without taking up a worker slot. A timeout could (should) be associated
>> with the required callback so that the task can fail if required. So a bit
>> more visual:
>> 
>> 
>> Task X from DAG Z  does some work and sets “WAITING_ON_CALLBACK” -> API
>> post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set status
>> to SUCCESS”
>> 
>> DAG Z happily continues.
>> 
>> Or
>> 
>> Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s -> time
>> passes -> scheduler sets task to FAILED.
>> 
>> 
>> Any thoughts?
>> 
>> - Bolke


Re: Proposal for new task state "WAITING_ON_CALLBACK"

Posted by Jeremiah Lowin <jl...@apache.org>.
Makes a lot of sense. At the NY meetup there was considerable interest in
using the API (and quite a few hacks around exposing the CLI!) -- is there
more complete documentation anywhere?

Thanks Bolke

On Wed, Feb 8, 2017 at 1:36 AM Bolke de Bruin <bd...@gmail.com> wrote:

> Hi All,
>
> Now that we have an API in place. I would like to propose a new state for
> tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have a
> kind of polling mechanism (ie. Sensors) that wait for an action to happen
> and check if that action happened by regularly polling a particular
> backend. This will always use a slot from one of the workers and could
> starve an airflow cluster for resources. What if a callback to Airflow
> could happen that task to change its status by calling a callback mechanism
> without taking up a worker slot. A timeout could (should) be associated
> with the required callback so that the task can fail if required. So a bit
> more visual:
>
>
> Task X from DAG Z  does some work and sets “WAITING_ON_CALLBACK” -> API
> post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set status
> to SUCCESS”
>
> DAG Z happily continues.
>
> Or
>
> Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s -> time
> passes -> scheduler sets task to FAILED.
>
>
> Any thoughts?
>
> - Bolke