You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Bolke de Bruin <bd...@gmail.com> on 2017/02/08 06:36:25 UTC
Proposal for new task state "WAITING_ON_CALLBACK"
Hi All,
Now that we have an API in place. I would like to propose a new state for tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have a kind of polling mechanism (ie. Sensors) that wait for an action to happen and check if that action happened by regularly polling a particular backend. This will always use a slot from one of the workers and could starve an airflow cluster for resources. What if a callback to Airflow could happen that task to change its status by calling a callback mechanism without taking up a worker slot. A timeout could (should) be associated with the required callback so that the task can fail if required. So a bit more visual:
Task X from DAG Z does some work and sets “WAITING_ON_CALLBACK” -> API post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set status to SUCCESS”
DAG Z happily continues.
Or
Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s -> time passes -> scheduler sets task to FAILED.
Any thoughts?
- Bolke
Re: Proposal for new task state "WAITING_ON_CALLBACK"
Posted by Maxime Beauchemin <ma...@gmail.com>.
Chiming in here.
* I think the term callback is a bit confusing, it collides with a
different definition in the javascript world
* I like the idea of a status that can only be altered externally (REST /
CLI / sqla / ...) and that the scheduler simply disregards (and probably it
handles the timeout too). Could be simply `ON_HOLD` or more verbose
`WAITING_FOR_EXTERNAL_TRIGGER`
I have a complementary / overlapping proposal in the area of rationalizing
the number of worker slots. My idea was to add an argument to
`BaseSensorOperator` to define whether the scheduler or a worker should
eval the condition. `evaled_by_worker=True`. If set to `False`, the sensor
would be evaluated by the scheduler as it processes the dependencies,
potentially liberating many worker slots. This of course assumes a
lightweight sensor.
Max
On Wed, Feb 8, 2017 at 12:45 PM, Alex Van Boxel <al...@vanboxel.be> wrote:
> Hey,
>
> here is my feedback, because I've been thinking about events as well. I
> would call it it 'WAITING_FOR_EVENT'. Here are the use-cases I would use it
> for:
>
> Have a thread (or process) listen on the Google Audit Log. It contains a
> lot of changes on the Google Project (Google DataProc finished, File in
> Bucket added, ...) and translate it to Events that Airflow understands.
>
>
> *DataProc/BigQuery/Storage*
>
> 1. Start DagProc job through the API (long running) - takes a
> parallelism slot
> 2. Get JobId
> 3. Register for event: *gc:dataproc:job_id_xxxx* with timeout 300s
> 4. execute stops and parallelism slot is freed
>
> The google audit log is constantly translating log entries to airflow
> events. Airflow event_listener looks for specific events registered (could
> be wildcards). As soon as the event match it call's the callback:
>
> 1. on_event is called with *gc:dataproc:job_id_xxxx* and JSON with event
> payload
> 2. handle JSON payload depending on payload throw *FAILURE* or *SUCCESS*
> or register for other event
>
> It could be a timeout
>
> 1. on_event is called with *airflow:timeout:dag_id* timeout event
> 2. handle timeout with
> 1. throw FAILURE
> 2. try to recover by calling DataProc API and re-register new/or same
> event
>
> *Bolke's callback example (mapped to this)*
>
>
> 1. task x for z does some work
> 2. register for event *api:callback:20170101T000000*
> 3. execute stops and parallelism slot is freed
>
> The API translated http callbacks into events
>
> 1. on_event is called with *api:callback:20170101T000000* and JSON with
> event payload
> 2. handle JSON payload depending on payload throw *FAILURE* or
> *SUCCESS* or
> register for other event
>
> It could be a timeout
>
> 1. on_event is called with *airflow:timeout:dag_id* timeout event
> 2. handle timeout with FAILURE
>
>
> So, what do you think?
>
>
> On Wed, Feb 8, 2017 at 2:40 PM Jeremiah Lowin <jl...@apache.org> wrote:
>
> I meant the API -- will check the wiki now. Thanks!
>
> On Wed, Feb 8, 2017 at 8:33 AM Bolke de Bruin <bd...@gmail.com> wrote:
>
> > On this proposal? No, not yet. Just popped in my mind yesterday. API
> there
> > is a bit on the wiki.
> >
> > > On 8 Feb 2017, at 14:31, Jeremiah Lowin <jl...@apache.org> wrote:
> > >
> > > Makes a lot of sense. At the NY meetup there was considerable interest
> in
> > > using the API (and quite a few hacks around exposing the CLI!) -- is
> > there
> > > more complete documentation anywhere?
> > >
> > > Thanks Bolke
> > >
> > > On Wed, Feb 8, 2017 at 1:36 AM Bolke de Bruin <bd...@gmail.com>
> wrote:
> > >
> > >> Hi All,
> > >>
> > >> Now that we have an API in place. I would like to propose a new state
> > for
> > >> tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have
> a
> > >> kind of polling mechanism (ie. Sensors) that wait for an action to
> > happen
> > >> and check if that action happened by regularly polling a particular
> > >> backend. This will always use a slot from one of the workers and could
> > >> starve an airflow cluster for resources. What if a callback to Airflow
> > >> could happen that task to change its status by calling a callback
> > mechanism
> > >> without taking up a worker slot. A timeout could (should) be
> associated
> > >> with the required callback so that the task can fail if required. So a
> > bit
> > >> more visual:
> > >>
> > >>
> > >> Task X from DAG Z does some work and sets “WAITING_ON_CALLBACK” ->
> API
> > >> post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set
> > status
> > >> to SUCCESS”
> > >>
> > >> DAG Z happily continues.
> > >>
> > >> Or
> > >>
> > >> Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s ->
> > time
> > >> passes -> scheduler sets task to FAILED.
> > >>
> > >>
> > >> Any thoughts?
> > >>
> > >> - Bolke
> >
> >
>
> --
> _/
> _/ Alex Van Boxel
>
Re: Proposal for new task state "WAITING_ON_CALLBACK"
Posted by Alex Van Boxel <al...@vanboxel.be>.
Hey,
here is my feedback, because I've been thinking about events as well. I
would call it it 'WAITING_FOR_EVENT'. Here are the use-cases I would use it
for:
Have a thread (or process) listen on the Google Audit Log. It contains a
lot of changes on the Google Project (Google DataProc finished, File in
Bucket added, ...) and translate it to Events that Airflow understands.
*DataProc/BigQuery/Storage*
1. Start DagProc job through the API (long running) - takes a
parallelism slot
2. Get JobId
3. Register for event: *gc:dataproc:job_id_xxxx* with timeout 300s
4. execute stops and parallelism slot is freed
The google audit log is constantly translating log entries to airflow
events. Airflow event_listener looks for specific events registered (could
be wildcards). As soon as the event match it call's the callback:
1. on_event is called with *gc:dataproc:job_id_xxxx* and JSON with event
payload
2. handle JSON payload depending on payload throw *FAILURE* or *SUCCESS*
or register for other event
It could be a timeout
1. on_event is called with *airflow:timeout:dag_id* timeout event
2. handle timeout with
1. throw FAILURE
2. try to recover by calling DataProc API and re-register new/or same
event
*Bolke's callback example (mapped to this)*
1. task x for z does some work
2. register for event *api:callback:20170101T000000*
3. execute stops and parallelism slot is freed
The API translated http callbacks into events
1. on_event is called with *api:callback:20170101T000000* and JSON with
event payload
2. handle JSON payload depending on payload throw *FAILURE* or *SUCCESS* or
register for other event
It could be a timeout
1. on_event is called with *airflow:timeout:dag_id* timeout event
2. handle timeout with FAILURE
So, what do you think?
On Wed, Feb 8, 2017 at 2:40 PM Jeremiah Lowin <jl...@apache.org> wrote:
I meant the API -- will check the wiki now. Thanks!
On Wed, Feb 8, 2017 at 8:33 AM Bolke de Bruin <bd...@gmail.com> wrote:
> On this proposal? No, not yet. Just popped in my mind yesterday. API there
> is a bit on the wiki.
>
> > On 8 Feb 2017, at 14:31, Jeremiah Lowin <jl...@apache.org> wrote:
> >
> > Makes a lot of sense. At the NY meetup there was considerable interest
in
> > using the API (and quite a few hacks around exposing the CLI!) -- is
> there
> > more complete documentation anywhere?
> >
> > Thanks Bolke
> >
> > On Wed, Feb 8, 2017 at 1:36 AM Bolke de Bruin <bd...@gmail.com> wrote:
> >
> >> Hi All,
> >>
> >> Now that we have an API in place. I would like to propose a new state
> for
> >> tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have a
> >> kind of polling mechanism (ie. Sensors) that wait for an action to
> happen
> >> and check if that action happened by regularly polling a particular
> >> backend. This will always use a slot from one of the workers and could
> >> starve an airflow cluster for resources. What if a callback to Airflow
> >> could happen that task to change its status by calling a callback
> mechanism
> >> without taking up a worker slot. A timeout could (should) be associated
> >> with the required callback so that the task can fail if required. So a
> bit
> >> more visual:
> >>
> >>
> >> Task X from DAG Z does some work and sets “WAITING_ON_CALLBACK” -> API
> >> post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set
> status
> >> to SUCCESS”
> >>
> >> DAG Z happily continues.
> >>
> >> Or
> >>
> >> Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s ->
> time
> >> passes -> scheduler sets task to FAILED.
> >>
> >>
> >> Any thoughts?
> >>
> >> - Bolke
>
>
--
_/
_/ Alex Van Boxel
Re: Proposal for new task state "WAITING_ON_CALLBACK"
Posted by Jeremiah Lowin <jl...@apache.org>.
I meant the API -- will check the wiki now. Thanks!
On Wed, Feb 8, 2017 at 8:33 AM Bolke de Bruin <bd...@gmail.com> wrote:
> On this proposal? No, not yet. Just popped in my mind yesterday. API there
> is a bit on the wiki.
>
> > On 8 Feb 2017, at 14:31, Jeremiah Lowin <jl...@apache.org> wrote:
> >
> > Makes a lot of sense. At the NY meetup there was considerable interest in
> > using the API (and quite a few hacks around exposing the CLI!) -- is
> there
> > more complete documentation anywhere?
> >
> > Thanks Bolke
> >
> > On Wed, Feb 8, 2017 at 1:36 AM Bolke de Bruin <bd...@gmail.com> wrote:
> >
> >> Hi All,
> >>
> >> Now that we have an API in place. I would like to propose a new state
> for
> >> tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have a
> >> kind of polling mechanism (ie. Sensors) that wait for an action to
> happen
> >> and check if that action happened by regularly polling a particular
> >> backend. This will always use a slot from one of the workers and could
> >> starve an airflow cluster for resources. What if a callback to Airflow
> >> could happen that task to change its status by calling a callback
> mechanism
> >> without taking up a worker slot. A timeout could (should) be associated
> >> with the required callback so that the task can fail if required. So a
> bit
> >> more visual:
> >>
> >>
> >> Task X from DAG Z does some work and sets “WAITING_ON_CALLBACK” -> API
> >> post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set
> status
> >> to SUCCESS”
> >>
> >> DAG Z happily continues.
> >>
> >> Or
> >>
> >> Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s ->
> time
> >> passes -> scheduler sets task to FAILED.
> >>
> >>
> >> Any thoughts?
> >>
> >> - Bolke
>
>
Re: Proposal for new task state "WAITING_ON_CALLBACK"
Posted by Bolke de Bruin <bd...@gmail.com>.
On this proposal? No, not yet. Just popped in my mind yesterday. API there is a bit on the wiki.
> On 8 Feb 2017, at 14:31, Jeremiah Lowin <jl...@apache.org> wrote:
>
> Makes a lot of sense. At the NY meetup there was considerable interest in
> using the API (and quite a few hacks around exposing the CLI!) -- is there
> more complete documentation anywhere?
>
> Thanks Bolke
>
> On Wed, Feb 8, 2017 at 1:36 AM Bolke de Bruin <bd...@gmail.com> wrote:
>
>> Hi All,
>>
>> Now that we have an API in place. I would like to propose a new state for
>> tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have a
>> kind of polling mechanism (ie. Sensors) that wait for an action to happen
>> and check if that action happened by regularly polling a particular
>> backend. This will always use a slot from one of the workers and could
>> starve an airflow cluster for resources. What if a callback to Airflow
>> could happen that task to change its status by calling a callback mechanism
>> without taking up a worker slot. A timeout could (should) be associated
>> with the required callback so that the task can fail if required. So a bit
>> more visual:
>>
>>
>> Task X from DAG Z does some work and sets “WAITING_ON_CALLBACK” -> API
>> post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set status
>> to SUCCESS”
>>
>> DAG Z happily continues.
>>
>> Or
>>
>> Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s -> time
>> passes -> scheduler sets task to FAILED.
>>
>>
>> Any thoughts?
>>
>> - Bolke
Re: Proposal for new task state "WAITING_ON_CALLBACK"
Posted by Jeremiah Lowin <jl...@apache.org>.
Makes a lot of sense. At the NY meetup there was considerable interest in
using the API (and quite a few hacks around exposing the CLI!) -- is there
more complete documentation anywhere?
Thanks Bolke
On Wed, Feb 8, 2017 at 1:36 AM Bolke de Bruin <bd...@gmail.com> wrote:
> Hi All,
>
> Now that we have an API in place. I would like to propose a new state for
> tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have a
> kind of polling mechanism (ie. Sensors) that wait for an action to happen
> and check if that action happened by regularly polling a particular
> backend. This will always use a slot from one of the workers and could
> starve an airflow cluster for resources. What if a callback to Airflow
> could happen that task to change its status by calling a callback mechanism
> without taking up a worker slot. A timeout could (should) be associated
> with the required callback so that the task can fail if required. So a bit
> more visual:
>
>
> Task X from DAG Z does some work and sets “WAITING_ON_CALLBACK” -> API
> post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set status
> to SUCCESS”
>
> DAG Z happily continues.
>
> Or
>
> Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s -> time
> passes -> scheduler sets task to FAILED.
>
>
> Any thoughts?
>
> - Bolke