You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Stefan Seelmann <ma...@stefan-seelmann.de> on 2018/05/26 15:50:41 UTC

How to wait for external process

Hello,

I have a DAG (externally triggered) where some processing is done at an
external system (EC2 instance). The processing is started by an Airflow
task (via HTTP request). The DAG should only continue once that
processing is completed. In a first naive implementation I created a
sensor that gets the progress (via HTTP request) and only if status is
"finished" returns true and the DAG run continues. That works but...

... the external processing can take hours or days, and during that time
a worker is occupied which does nothing but HTTP GET and sleep. There
will be hundreds of DAG runs in parallel which means hundreds of workers
are occupied.

I looked into other operators that do computation on external systems
(ECSOperator, AWSBatchOperator) but they also follow that pattern and
just wait/sleep.

So I want to ask if there is a more efficient way to build such a
workflow with Airflow?

Kind Regards,
Stefan

Avoid sensor sleep by rescheduling task, was: Re: How to wait for external process

Posted by Stefan Seelmann <ma...@stefan-seelmann.de>.
I digged a bit into the Airflow code and I think I found a possible
solution, see draft at [1]:
Add a "reschedule" flag to BaseSensorOperator, when set it doesn't sleep
but raises an AirflowRescheduleTask exception. Within the TaskInstance
this exception is handled, similar to a failure. The task state is set
to UP_FOR_RETRY. The task is rescheduled by the scheduler.

Advantages:
* Only small code changes are required
* Leverage the existing retry mechanism, including delay and exponential
backoff

Tradeoff:
* Overhead by running the task again and again


I'd like to ask the community if that is a valueable change to be
included into Airflow?


If so I'll create a Jira, improve the branch, and send a PR:
* Find a solution for timeout and soft_fail
* Add tests
* Add a task_reschedule table (similar to the task_fail table)

Also some open questions:
* Should a separate state (e.g. UP_FOR_RESCHEDULE) be used to
differentiate between error and intended reschedule? Then proably also
different parameters for delay and exponential backoff make sense.
* Maybe it's feasable to execute the sensor poke code directly by the
scheduler to avoid execution of mini task? But that can be done in a
separate change.


Kind Regards,
Stefan

[1]
https://github.com/seelmann/incubator-airflow/commit/379bf0fdf9bbb9f26f74ac5fa6325ba5a0e975a2

Re: How to wait for external process

Posted by Victor Noagbodji <vn...@amplify-nation.com>.
hi,

here's another vote for persistence. we did similar thing where processing state is stored in the database. there is no part of the DAG that does a periodic check. the DAG retriggers itself and its very first task is to figure out if there is work to do or bail out.

> On May 28, 2018, at 4:28 PM, Ananth Durai <va...@gmail.com> wrote:
> 
> Since you already on AWS, the simplest thing I could think of is to write a
> signal file once the job finished and the downstream job waiting for the
> signal file. In other words, the same pattern how the Hadoop jobs writing
> `_SUCCESS` file and the downstream jobs depends on the signal file.
> 
> Regards,
> Ananth.P,
> 
> 
> 
> 
> 
> 
> On 28 May 2018 at 13:06, Stefan Seelmann <ma...@stefan-seelmann.de> wrote:
> 
>> Thanks Christopher for the idea. That would work, we already have such a
>> "listener" that polls a queue (SQS) and creates the DAG runs. However it
>> would have been nice to have the full process in one DAG to have a
>> better overview about running jobs and leverage the gantt chart, but I
>> think this can be accomplished via custom plugins and views.
>> 
>> On 05/28/2018 08:43 PM, Christopher Bockman wrote:
>>> Haven't done this, but we'll have a similar need in the future, so have
>>> investigated a little.
>>> 
>>> What about a design pattern something like this:
>>> 
>>> 1) When jobs are done (ready for further processing) they publish those
>>> details to a queue (such as GC Pub/Sub or any other sort of queue)
>>> 
>>> 2) A single "listener" DAG sits and periodically checks that queue.  If
>> it
>>> finds anything on it, it triggers (via DAG trigger) all of the DAGs which
>>> are on the queue.*
>>> 
>>> * = if your triggering volume is too high, this may cause airflow issues
>> w/
>>> too many going at once; this could presumably be solved then via custom
>>> rate-limiting on firing these
>>> 
>>> 3) The listener DAG resets itself (triggers itself)
>>> 
>>> 
>>> On Mon, May 28, 2018 at 7:17 AM, Driesprong, Fokko <fokko@driesprong.frl
>>> 
>>> wrote:
>>> 
>>>> Hi Stefan,
>>>> 
>>>> Afaik there isn't a more efficient way of doing this. DAGs that are
>> relying
>>>> on a lot of sensors are experiencing the same issues. The only way right
>>>> now, I can think of, is doing updating the state directly in the
>> database.
>>>> But then you need to know what you are doing. I can image that this
>> would
>>>> be feasible by using an AWS lambda function. Hope this helps.
>>>> 
>>>> Cheers, Fokko
>>>> 
>>>> 2018-05-26 17:50 GMT+02:00 Stefan Seelmann <ma...@stefan-seelmann.de>:
>>>> 
>>>>> Hello,
>>>>> 
>>>>> I have a DAG (externally triggered) where some processing is done at an
>>>>> external system (EC2 instance). The processing is started by an Airflow
>>>>> task (via HTTP request). The DAG should only continue once that
>>>>> processing is completed. In a first naive implementation I created a
>>>>> sensor that gets the progress (via HTTP request) and only if status is
>>>>> "finished" returns true and the DAG run continues. That works but...
>>>>> 
>>>>> ... the external processing can take hours or days, and during that
>> time
>>>>> a worker is occupied which does nothing but HTTP GET and sleep. There
>>>>> will be hundreds of DAG runs in parallel which means hundreds of
>> workers
>>>>> are occupied.
>>>>> 
>>>>> I looked into other operators that do computation on external systems
>>>>> (ECSOperator, AWSBatchOperator) but they also follow that pattern and
>>>>> just wait/sleep.
>>>>> 
>>>>> So I want to ask if there is a more efficient way to build such a
>>>>> workflow with Airflow?
>>>>> 
>>>>> Kind Regards,
>>>>> Stefan
>>>>> 
>>>> 
>>> 
>> 
>> 


Re: How to wait for external process

Posted by Ananth Durai <va...@gmail.com>.
Since you already on AWS, the simplest thing I could think of is to write a
signal file once the job finished and the downstream job waiting for the
signal file. In other words, the same pattern how the Hadoop jobs writing
`_SUCCESS` file and the downstream jobs depends on the signal file.

Regards,
Ananth.P,






On 28 May 2018 at 13:06, Stefan Seelmann <ma...@stefan-seelmann.de> wrote:

> Thanks Christopher for the idea. That would work, we already have such a
> "listener" that polls a queue (SQS) and creates the DAG runs. However it
> would have been nice to have the full process in one DAG to have a
> better overview about running jobs and leverage the gantt chart, but I
> think this can be accomplished via custom plugins and views.
>
> On 05/28/2018 08:43 PM, Christopher Bockman wrote:
> > Haven't done this, but we'll have a similar need in the future, so have
> > investigated a little.
> >
> > What about a design pattern something like this:
> >
> > 1) When jobs are done (ready for further processing) they publish those
> > details to a queue (such as GC Pub/Sub or any other sort of queue)
> >
> > 2) A single "listener" DAG sits and periodically checks that queue.  If
> it
> > finds anything on it, it triggers (via DAG trigger) all of the DAGs which
> > are on the queue.*
> >
> > * = if your triggering volume is too high, this may cause airflow issues
> w/
> > too many going at once; this could presumably be solved then via custom
> > rate-limiting on firing these
> >
> > 3) The listener DAG resets itself (triggers itself)
> >
> >
> > On Mon, May 28, 2018 at 7:17 AM, Driesprong, Fokko <fokko@driesprong.frl
> >
> > wrote:
> >
> >> Hi Stefan,
> >>
> >> Afaik there isn't a more efficient way of doing this. DAGs that are
> relying
> >> on a lot of sensors are experiencing the same issues. The only way right
> >> now, I can think of, is doing updating the state directly in the
> database.
> >> But then you need to know what you are doing. I can image that this
> would
> >> be feasible by using an AWS lambda function. Hope this helps.
> >>
> >> Cheers, Fokko
> >>
> >> 2018-05-26 17:50 GMT+02:00 Stefan Seelmann <ma...@stefan-seelmann.de>:
> >>
> >>> Hello,
> >>>
> >>> I have a DAG (externally triggered) where some processing is done at an
> >>> external system (EC2 instance). The processing is started by an Airflow
> >>> task (via HTTP request). The DAG should only continue once that
> >>> processing is completed. In a first naive implementation I created a
> >>> sensor that gets the progress (via HTTP request) and only if status is
> >>> "finished" returns true and the DAG run continues. That works but...
> >>>
> >>> ... the external processing can take hours or days, and during that
> time
> >>> a worker is occupied which does nothing but HTTP GET and sleep. There
> >>> will be hundreds of DAG runs in parallel which means hundreds of
> workers
> >>> are occupied.
> >>>
> >>> I looked into other operators that do computation on external systems
> >>> (ECSOperator, AWSBatchOperator) but they also follow that pattern and
> >>> just wait/sleep.
> >>>
> >>> So I want to ask if there is a more efficient way to build such a
> >>> workflow with Airflow?
> >>>
> >>> Kind Regards,
> >>> Stefan
> >>>
> >>
> >
>
>

Re: How to wait for external process

Posted by Stefan Seelmann <ma...@stefan-seelmann.de>.
Thanks Christopher for the idea. That would work, we already have such a
"listener" that polls a queue (SQS) and creates the DAG runs. However it
would have been nice to have the full process in one DAG to have a
better overview about running jobs and leverage the gantt chart, but I
think this can be accomplished via custom plugins and views.

On 05/28/2018 08:43 PM, Christopher Bockman wrote:
> Haven't done this, but we'll have a similar need in the future, so have
> investigated a little.
> 
> What about a design pattern something like this:
> 
> 1) When jobs are done (ready for further processing) they publish those
> details to a queue (such as GC Pub/Sub or any other sort of queue)
> 
> 2) A single "listener" DAG sits and periodically checks that queue.  If it
> finds anything on it, it triggers (via DAG trigger) all of the DAGs which
> are on the queue.*
> 
> * = if your triggering volume is too high, this may cause airflow issues w/
> too many going at once; this could presumably be solved then via custom
> rate-limiting on firing these
> 
> 3) The listener DAG resets itself (triggers itself)
> 
> 
> On Mon, May 28, 2018 at 7:17 AM, Driesprong, Fokko <fo...@driesprong.frl>
> wrote:
> 
>> Hi Stefan,
>>
>> Afaik there isn't a more efficient way of doing this. DAGs that are relying
>> on a lot of sensors are experiencing the same issues. The only way right
>> now, I can think of, is doing updating the state directly in the database.
>> But then you need to know what you are doing. I can image that this would
>> be feasible by using an AWS lambda function. Hope this helps.
>>
>> Cheers, Fokko
>>
>> 2018-05-26 17:50 GMT+02:00 Stefan Seelmann <ma...@stefan-seelmann.de>:
>>
>>> Hello,
>>>
>>> I have a DAG (externally triggered) where some processing is done at an
>>> external system (EC2 instance). The processing is started by an Airflow
>>> task (via HTTP request). The DAG should only continue once that
>>> processing is completed. In a first naive implementation I created a
>>> sensor that gets the progress (via HTTP request) and only if status is
>>> "finished" returns true and the DAG run continues. That works but...
>>>
>>> ... the external processing can take hours or days, and during that time
>>> a worker is occupied which does nothing but HTTP GET and sleep. There
>>> will be hundreds of DAG runs in parallel which means hundreds of workers
>>> are occupied.
>>>
>>> I looked into other operators that do computation on external systems
>>> (ECSOperator, AWSBatchOperator) but they also follow that pattern and
>>> just wait/sleep.
>>>
>>> So I want to ask if there is a more efficient way to build such a
>>> workflow with Airflow?
>>>
>>> Kind Regards,
>>> Stefan
>>>
>>
> 


Re: How to wait for external process

Posted by Christopher Bockman <ch...@fathomhealth.co>.
Haven't done this, but we'll have a similar need in the future, so have
investigated a little.

What about a design pattern something like this:

1) When jobs are done (ready for further processing) they publish those
details to a queue (such as GC Pub/Sub or any other sort of queue)

2) A single "listener" DAG sits and periodically checks that queue.  If it
finds anything on it, it triggers (via DAG trigger) all of the DAGs which
are on the queue.*

* = if your triggering volume is too high, this may cause airflow issues w/
too many going at once; this could presumably be solved then via custom
rate-limiting on firing these

3) The listener DAG resets itself (triggers itself)


On Mon, May 28, 2018 at 7:17 AM, Driesprong, Fokko <fo...@driesprong.frl>
wrote:

> Hi Stefan,
>
> Afaik there isn't a more efficient way of doing this. DAGs that are relying
> on a lot of sensors are experiencing the same issues. The only way right
> now, I can think of, is doing updating the state directly in the database.
> But then you need to know what you are doing. I can image that this would
> be feasible by using an AWS lambda function. Hope this helps.
>
> Cheers, Fokko
>
> 2018-05-26 17:50 GMT+02:00 Stefan Seelmann <ma...@stefan-seelmann.de>:
>
> > Hello,
> >
> > I have a DAG (externally triggered) where some processing is done at an
> > external system (EC2 instance). The processing is started by an Airflow
> > task (via HTTP request). The DAG should only continue once that
> > processing is completed. In a first naive implementation I created a
> > sensor that gets the progress (via HTTP request) and only if status is
> > "finished" returns true and the DAG run continues. That works but...
> >
> > ... the external processing can take hours or days, and during that time
> > a worker is occupied which does nothing but HTTP GET and sleep. There
> > will be hundreds of DAG runs in parallel which means hundreds of workers
> > are occupied.
> >
> > I looked into other operators that do computation on external systems
> > (ECSOperator, AWSBatchOperator) but they also follow that pattern and
> > just wait/sleep.
> >
> > So I want to ask if there is a more efficient way to build such a
> > workflow with Airflow?
> >
> > Kind Regards,
> > Stefan
> >
>

Re: How to wait for external process

Posted by "Driesprong, Fokko" <fo...@driesprong.frl>.
Hi Stefan,

Afaik there isn't a more efficient way of doing this. DAGs that are relying
on a lot of sensors are experiencing the same issues. The only way right
now, I can think of, is doing updating the state directly in the database.
But then you need to know what you are doing. I can image that this would
be feasible by using an AWS lambda function. Hope this helps.

Cheers, Fokko

2018-05-26 17:50 GMT+02:00 Stefan Seelmann <ma...@stefan-seelmann.de>:

> Hello,
>
> I have a DAG (externally triggered) where some processing is done at an
> external system (EC2 instance). The processing is started by an Airflow
> task (via HTTP request). The DAG should only continue once that
> processing is completed. In a first naive implementation I created a
> sensor that gets the progress (via HTTP request) and only if status is
> "finished" returns true and the DAG run continues. That works but...
>
> ... the external processing can take hours or days, and during that time
> a worker is occupied which does nothing but HTTP GET and sleep. There
> will be hundreds of DAG runs in parallel which means hundreds of workers
> are occupied.
>
> I looked into other operators that do computation on external systems
> (ECSOperator, AWSBatchOperator) but they also follow that pattern and
> just wait/sleep.
>
> So I want to ask if there is a more efficient way to build such a
> workflow with Airflow?
>
> Kind Regards,
> Stefan
>