You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by Reed Villanueva <rv...@ucera.org> on 2019/12/19 00:25:47 UTC

Control airflow task parallelism per dag for specific tasks?

Is there a way to control the parallelism for particular tasks in an
airflow dag? Eg. say I have a dag definition like...

for dataset in list_of_datasets:
    # some simple operation
    task_1 = BashOperator(task_id=f'task_1_{dataset.name}', ...)
    # load intensive operation
    task_2 = BashOperator()
    # another simple operation
    task_3 = BashOperator()

    task_1 >> task_2 >> task_3

Is there a way to have something where task_1 can have, say, 5 of its kind
running in a dag instance, while only 2 instances of task_2 may be running
in a dag instance (also implying that if there are 2 instances of task_2
already running, then only 3 instances of task_1 can run)? Any other common
ways to work around this kind of requirement (I imagine this must come up
often for pipelines)?

-- 
This electronic message is intended only for the named 
recipient, and may 
contain information that is confidential or 
privileged. If you are not the 
intended recipient, you are 
hereby notified that any disclosure, copying, 
distribution or 
use of the contents of this message is strictly 
prohibited. If 
you have received this message in error or are not the 
named
recipient, please notify us immediately by contacting the 
sender at 
the electronic mail address noted above, and delete 
and destroy all copies 
of this message. Thank you.

Re: Control airflow task parallelism per dag for specific tasks?

Posted by Reed Villanueva <rv...@ucera.org>.
Pools? Interesting, thanks.
Will take a look.

On Wed, Dec 18, 2019 at 10:19 PM Jarek Potiuk <Ja...@polidea.com>
wrote:

> I am not sure how you want to multiply the tasks - I imagine you want to
> have something like task1_1, task1_2, task1_3 etc. Or maybe you think about
> back-filling and running same tasks for different runs?
>
> In both cases you can use pools:
> https://airflow.apache.org/docs/stable/concepts.html#pools to limit how
> many tasks can be run in parallel for given pool.
>
> There is another way to limit parallelism of tasks - applicable for
> example in cases where you have different kinds of machines with different
> capabilities (for example with/without GPU). You can have some affinities
> defined between tasks and actual machines that are executing them - in
> Celery executor you can define queues in your celery configuration and you
> can assign your task to one of the queues. Then you can have a number of
> workers/slots defined for all machines in the queue and as effect you can
> also limit parallelism of the tasks this way:
> https://airflow.apache.org/docs/stable/concepts.html#queues
>
> J.
>
> On Thu, Dec 19, 2019 at 1:29 AM Reed Villanueva <rv...@ucera.org>
> wrote:
>
>> Is there a way to control the parallelism for particular tasks in an
>> airflow dag? Eg. say I have a dag definition like...
>>
>> for dataset in list_of_datasets:
>>     # some simple operation
>>     task_1 = BashOperator(task_id=f'task_1_{dataset.name}', ...)
>>     # load intensive operation
>>     task_2 = BashOperator()
>>     # another simple operation
>>     task_3 = BashOperator()
>>
>>     task_1 >> task_2 >> task_3
>>
>> Is there a way to have something where task_1 can have, say, 5 of its
>> kind running in a dag instance, while only 2 instances of task_2 may be
>> running in a dag instance (also implying that if there are 2 instances of
>> task_2 already running, then only 3 instances of task_1 can run)? Any other
>> common ways to work around this kind of requirement (I imagine this must
>> come up often for pipelines)?
>>
>> This electronic message is intended only for the named
>> recipient, and may contain information that is confidential or
>> privileged. If you are not the intended recipient, you are
>> hereby notified that any disclosure, copying, distribution or
>> use of the contents of this message is strictly prohibited. If
>> you have received this message in error or are not the named
>> recipient, please notify us immediately by contacting the
>> sender at the electronic mail address noted above, and delete
>> and destroy all copies of this message. Thank you.
>>
>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48660796129>
> [image: Polidea] <https://www.polidea.com/>
>
>

-- 
This electronic message is intended only for the named 
recipient, and may 
contain information that is confidential or 
privileged. If you are not the 
intended recipient, you are 
hereby notified that any disclosure, copying, 
distribution or 
use of the contents of this message is strictly 
prohibited. If 
you have received this message in error or are not the 
named
recipient, please notify us immediately by contacting the 
sender at 
the electronic mail address noted above, and delete 
and destroy all copies 
of this message. Thank you.

Re: Control airflow task parallelism per dag for specific tasks?

Posted by Jarek Potiuk <Ja...@polidea.com>.
I am not sure how you want to multiply the tasks - I imagine you want to
have something like task1_1, task1_2, task1_3 etc. Or maybe you think about
back-filling and running same tasks for different runs?

In both cases you can use pools:
https://airflow.apache.org/docs/stable/concepts.html#pools to limit how
many tasks can be run in parallel for given pool.

There is another way to limit parallelism of tasks - applicable for example
in cases where you have different kinds of machines with different
capabilities (for example with/without GPU). You can have some affinities
defined between tasks and actual machines that are executing them - in
Celery executor you can define queues in your celery configuration and you
can assign your task to one of the queues. Then you can have a number of
workers/slots defined for all machines in the queue and as effect you can
also limit parallelism of the tasks this way:
https://airflow.apache.org/docs/stable/concepts.html#queues

J.

On Thu, Dec 19, 2019 at 1:29 AM Reed Villanueva <rv...@ucera.org>
wrote:

> Is there a way to control the parallelism for particular tasks in an
> airflow dag? Eg. say I have a dag definition like...
>
> for dataset in list_of_datasets:
>     # some simple operation
>     task_1 = BashOperator(task_id=f'task_1_{dataset.name}', ...)
>     # load intensive operation
>     task_2 = BashOperator()
>     # another simple operation
>     task_3 = BashOperator()
>
>     task_1 >> task_2 >> task_3
>
> Is there a way to have something where task_1 can have, say, 5 of its kind
> running in a dag instance, while only 2 instances of task_2 may be running
> in a dag instance (also implying that if there are 2 instances of task_2
> already running, then only 3 instances of task_1 can run)? Any other common
> ways to work around this kind of requirement (I imagine this must come up
> often for pipelines)?
>
> This electronic message is intended only for the named
> recipient, and may contain information that is confidential or
> privileged. If you are not the intended recipient, you are
> hereby notified that any disclosure, copying, distribution or
> use of the contents of this message is strictly prohibited. If
> you have received this message in error or are not the named
> recipient, please notify us immediately by contacting the
> sender at the electronic mail address noted above, and delete
> and destroy all copies of this message. Thank you.
>


-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>