You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by David Szakallas <ds...@whitepages.com> on 2018/09/20 14:07:51 UTC

Creating dynamic pool from task

Hi all,

I have a DAG that creates a cluster, starts computation tasks, and after they completed, tears down the cluster. I want to limit concurrency for the computation tasks carried on this cluster to fixed number. So logically, I need a pool that is exclusive to the cluster created by a task. I don't want interference with other DAGs or different runs of the same DAG.

I thought I could solve this problem by creating a pool dynamically from a task after the cluster is created and delete it once the computation tasks are finished. I thought I could template the pool parameter of the computation tasks to make them use this dynamically created cluster.

However this way the computation tasks will never be triggered. So I think the pool parameter is saved in the task instance before being templated. I would like to hear your thoughts on how to achieve the desired behavior.

Thanks,

Dávid Szakállas
Software Engineer | Whitepages Data Services





Re: Creating dynamic pool from task

Posted by Taylor Edmiston <te...@gmail.com>.
I also believe Chris is correct that it's not quite possible to be that
dynamic today.

If you can find a workaround like only running 1 DAG run for each DAG at a
time and reusing the pool, or perhaps it might work to create the pool
based on the DagRun's execution date instead of its id?

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story
<https://stackoverflow.com/story/taylor>


On Tue, Sep 25, 2018 at 10:24 AM, Chris Palmer <cr...@gmail.com> wrote:

> David,
>
> I was playing around with this over the weekend, and mostly found that it
> doesn't seem to be possible. I was able to get an operator to template out
> the pool attribute, when it renders it's templates. However this doesn't
> normally get done until execution, and so the un-templated pool attribute
> get's used when the scheduler sends the task to the executor.
>
> Chris
>
> On Fri, Sep 21, 2018 at 6:12 PM Chris Palmer <cr...@gmail.com> wrote:
>
> > I see, so for a given DagRun you want to limit the compute tasks that are
> > running. But I'm guessing you want multiple DagRuns to be able to run
> > concurrently to operate on their own clusters independently.
> >
> > From what I could tell in the code, the pool gets checked before
> execution
> > (which is when templates are rendered). Which makes dynamic pools
> difficult
> > to do.
> >
> > It's probably possible to find a solution but I think it's likely going
> to
> > involve some ugly code/inspection of the python stack.
> >
> > Chris
> >
> > On Sep 21, 2018 4:47 PM, "David Szakallas" <ds...@whitepages.com>
> > wrote:
> >
> > Chris, the tasks are independent of each other so they can run
> > concurrently. I have to limit the concurrency though, so they don’t
> starve.
> > As the cluster is created dynamically with a task, a shared pool with
> other
> > DAGs or other runs of the same DAG is not preferable.
> >
> > I imagined something like this:
> >
> >                                                                     .——>
> >  [compute_1] ——.
> >                                                                   / ——>
> >  [compute_2] ——  \
> >                                                                 /
> >              .                        \
> > [create_cluster] —> [create_pool_x6]                         .
> >             [ delete_pool ] —> [delete cluster]
> >                                                                \
> >               .                        /
> >                                                                  \ ——>
> >  [compute_19] —— /
> >                                                                    . ——>
> > [compute_20] ——.
> > Thanks,
> > David
> >
> >
> > > On Sep 21, 2018, at 7:23 PM, Chris Palmer <ch...@crpalmer.com> wrote:
> > >
> > > What would cause multiple computation tasks to run on the cluster at
> the
> > > same time? Are you worried about concurrent DagRuns? Does setting dag
> > > concurrency and/or task concurrency appropriately solve your problem?
> > >
> > > Chris
> > >
> > > On Thu, Sep 20, 2018 at 8:31 PM David Szakallas <
> > dszakallas@whitepages.com>
> > > wrote:
> > >
> > >> What I am doing is very similar. However I am including the DagRun's
> id
> > in
> > >> the pool name to make it unique, as I need to make sure every run gets
> > its
> > >> own pool. I am getting that from the context object, which is only
> > >> available within execute methods or templates. How do you make sure
> each
> > >> run has it's own pool?
> > >>
> > >>
> > >> Thanks,
> > >>
> > >> Dávid Szakállas
> > >> Software Engineer | Whitepages Data Services
> > >>
> > >> ________________________________
> > >> From: Taylor Edmiston <te...@gmail.com>
> > >> Sent: Thursday, September 20, 2018 6:17:05 PM
> > >> To: dev@airflow.incubator.apache.org
> > >> Subject: Re: Creating dynamic pool from task
> > >>
> > >> I've done something similar.  I have a task at the front of the DAG
> that
> > >> ensures the connection pool exists and creates the pool if it doesn't.
> > >> I've pasted my code below.  This runs in a for loop that creates one
> DAG
> > >> per iteration each with its own pool.  Then I pass the pool name into
> > the
> > >> sensors.
> > >>
> > >> Does this work for your use case?
> > >>
> > >> --
> > >>
> > >> redshift_pool = PythonOperator(
> > >>    task_id='redshift_pool',
> > >>    dag=dag,
> > >>    python_callable=ensure_redshift_pool,
> > >>    op_kwargs={
> > >>        'name': workflow.pool,
> > >>        'slots': REDSHIFT_POOL_SLOTS,
> > >>    },
> > >>    ...
> > >> )
> > >>
> > >> @provide_session
> > >> def ensure_redshift_pool(name, slots, session=None):
> > >>    pool = Pool(pool=name, slots=slots)
> > >>    pool_query = (
> > >>        session.query(Pool)
> > >>        .filter(Pool.pool == name)
> > >>    )
> > >>    pool_query_result = pool_query.one_or_none()
> > >>    if not pool_query_result:
> > >>        logger.info(f'redshift pool "{name}" does not exist - creating
> > >> it')
> > >>        session.add(pool)
> > >>        session.commit()
> > >>        logger.info(f'created redshift pool "{name}"')
> > >>    else:
> > >>        logger.info(f'redshift pool "{name}" already exists')
> > >>
> > >> --
> > >>
> > >> *Taylor Edmiston*
> > >> Blog <https://blog.tedmiston.com/> | LinkedIn
> > >> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow
> > >> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer
> > Story
> > >> <https://stackoverflow.com/story/taylor>
> > >>
> > >>
> > >>
> > >> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <
> > >> dszakallas@whitepages.com>
> > >> wrote:
> > >>
> > >>> Hi all,
> > >>>
> > >>> I have a DAG that creates a cluster, starts computation tasks, and
> > after
> > >>> they completed, tears down the cluster. I want to limit concurrency
> for
> > >> the
> > >>> computation tasks carried on this cluster to fixed number. So
> > logically,
> > >> I
> > >>> need a pool that is exclusive to the cluster created by a task. I
> don't
> > >>> want interference with other DAGs or different runs of the same DAG.
> > >>>
> > >>> I thought I could solve this problem by creating a pool dynamically
> > from
> > >> a
> > >>> task after the cluster is created and delete it once the computation
> > >> tasks
> > >>> are finished. I thought I could template the pool parameter of the
> > >>> computation tasks to make them use this dynamically created cluster.
> > >>>
> > >>> However this way the computation tasks will never be triggered. So I
> > >> think
> > >>> the pool parameter is saved in the task instance before being
> > templated.
> > >> I
> > >>> would like to hear your thoughts on how to achieve the desired
> > behavior.
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Dávid Szakállas
> > >>> Software Engineer | Whitepages Data Services
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>
> >
> >
> >
>

Re: Creating dynamic pool from task

Posted by Chris Palmer <cr...@gmail.com>.
David,

I was playing around with this over the weekend, and mostly found that it
doesn't seem to be possible. I was able to get an operator to template out
the pool attribute, when it renders it's templates. However this doesn't
normally get done until execution, and so the un-templated pool attribute
get's used when the scheduler sends the task to the executor.

Chris

On Fri, Sep 21, 2018 at 6:12 PM Chris Palmer <cr...@gmail.com> wrote:

> I see, so for a given DagRun you want to limit the compute tasks that are
> running. But I'm guessing you want multiple DagRuns to be able to run
> concurrently to operate on their own clusters independently.
>
> From what I could tell in the code, the pool gets checked before execution
> (which is when templates are rendered). Which makes dynamic pools difficult
> to do.
>
> It's probably possible to find a solution but I think it's likely going to
> involve some ugly code/inspection of the python stack.
>
> Chris
>
> On Sep 21, 2018 4:47 PM, "David Szakallas" <ds...@whitepages.com>
> wrote:
>
> Chris, the tasks are independent of each other so they can run
> concurrently. I have to limit the concurrency though, so they don’t starve.
> As the cluster is created dynamically with a task, a shared pool with other
> DAGs or other runs of the same DAG is not preferable.
>
> I imagined something like this:
>
>                                                                     .——>
>  [compute_1] ——.
>                                                                   / ——>
>  [compute_2] ——  \
>                                                                 /
>              .                        \
> [create_cluster] —> [create_pool_x6]                         .
>             [ delete_pool ] —> [delete cluster]
>                                                                \
>               .                        /
>                                                                  \ ——>
>  [compute_19] —— /
>                                                                    . ——>
> [compute_20] ——.
> Thanks,
> David
>
>
> > On Sep 21, 2018, at 7:23 PM, Chris Palmer <ch...@crpalmer.com> wrote:
> >
> > What would cause multiple computation tasks to run on the cluster at the
> > same time? Are you worried about concurrent DagRuns? Does setting dag
> > concurrency and/or task concurrency appropriately solve your problem?
> >
> > Chris
> >
> > On Thu, Sep 20, 2018 at 8:31 PM David Szakallas <
> dszakallas@whitepages.com>
> > wrote:
> >
> >> What I am doing is very similar. However I am including the DagRun's id
> in
> >> the pool name to make it unique, as I need to make sure every run gets
> its
> >> own pool. I am getting that from the context object, which is only
> >> available within execute methods or templates. How do you make sure each
> >> run has it's own pool?
> >>
> >>
> >> Thanks,
> >>
> >> Dávid Szakállas
> >> Software Engineer | Whitepages Data Services
> >>
> >> ________________________________
> >> From: Taylor Edmiston <te...@gmail.com>
> >> Sent: Thursday, September 20, 2018 6:17:05 PM
> >> To: dev@airflow.incubator.apache.org
> >> Subject: Re: Creating dynamic pool from task
> >>
> >> I've done something similar.  I have a task at the front of the DAG that
> >> ensures the connection pool exists and creates the pool if it doesn't.
> >> I've pasted my code below.  This runs in a for loop that creates one DAG
> >> per iteration each with its own pool.  Then I pass the pool name into
> the
> >> sensors.
> >>
> >> Does this work for your use case?
> >>
> >> --
> >>
> >> redshift_pool = PythonOperator(
> >>    task_id='redshift_pool',
> >>    dag=dag,
> >>    python_callable=ensure_redshift_pool,
> >>    op_kwargs={
> >>        'name': workflow.pool,
> >>        'slots': REDSHIFT_POOL_SLOTS,
> >>    },
> >>    ...
> >> )
> >>
> >> @provide_session
> >> def ensure_redshift_pool(name, slots, session=None):
> >>    pool = Pool(pool=name, slots=slots)
> >>    pool_query = (
> >>        session.query(Pool)
> >>        .filter(Pool.pool == name)
> >>    )
> >>    pool_query_result = pool_query.one_or_none()
> >>    if not pool_query_result:
> >>        logger.info(f'redshift pool "{name}" does not exist - creating
> >> it')
> >>        session.add(pool)
> >>        session.commit()
> >>        logger.info(f'created redshift pool "{name}"')
> >>    else:
> >>        logger.info(f'redshift pool "{name}" already exists')
> >>
> >> --
> >>
> >> *Taylor Edmiston*
> >> Blog <https://blog.tedmiston.com/> | LinkedIn
> >> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow
> >> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer
> Story
> >> <https://stackoverflow.com/story/taylor>
> >>
> >>
> >>
> >> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <
> >> dszakallas@whitepages.com>
> >> wrote:
> >>
> >>> Hi all,
> >>>
> >>> I have a DAG that creates a cluster, starts computation tasks, and
> after
> >>> they completed, tears down the cluster. I want to limit concurrency for
> >> the
> >>> computation tasks carried on this cluster to fixed number. So
> logically,
> >> I
> >>> need a pool that is exclusive to the cluster created by a task. I don't
> >>> want interference with other DAGs or different runs of the same DAG.
> >>>
> >>> I thought I could solve this problem by creating a pool dynamically
> from
> >> a
> >>> task after the cluster is created and delete it once the computation
> >> tasks
> >>> are finished. I thought I could template the pool parameter of the
> >>> computation tasks to make them use this dynamically created cluster.
> >>>
> >>> However this way the computation tasks will never be triggered. So I
> >> think
> >>> the pool parameter is saved in the task instance before being
> templated.
> >> I
> >>> would like to hear your thoughts on how to achieve the desired
> behavior.
> >>>
> >>> Thanks,
> >>>
> >>> Dávid Szakállas
> >>> Software Engineer | Whitepages Data Services
> >>>
> >>>
> >>>
> >>>
> >>>
> >>
>
>
>

Re: Creating dynamic pool from task

Posted by Chris Palmer <cr...@gmail.com>.
I see, so for a given DagRun you want to limit the compute tasks that are
running. But I'm guessing you want multiple DagRuns to be able to run
concurrently to operate on their own clusters independently.

From what I could tell in the code, the pool gets checked before execution
(which is when templates are rendered). Which makes dynamic pools difficult
to do.

It's probably possible to find a solution but I think it's likely going to
involve some ugly code/inspection of the python stack.

Chris

On Sep 21, 2018 4:47 PM, "David Szakallas" <ds...@whitepages.com>
wrote:

Chris, the tasks are independent of each other so they can run
concurrently. I have to limit the concurrency though, so they don’t starve.
As the cluster is created dynamically with a task, a shared pool with other
DAGs or other runs of the same DAG is not preferable.

I imagined something like this:

                                                                    .——>
 [compute_1] ——.
                                                                  / ——>
 [compute_2] ——  \
                                                                /
             .                        \
[create_cluster] —> [create_pool_x6]                         .
          [ delete_pool ] —> [delete cluster]
                                                               \
            .                        /
                                                                 \ ——>
 [compute_19] —— /
                                                                   . ——>
[compute_20] ——.
Thanks,
David


> On Sep 21, 2018, at 7:23 PM, Chris Palmer <ch...@crpalmer.com> wrote:
>
> What would cause multiple computation tasks to run on the cluster at the
> same time? Are you worried about concurrent DagRuns? Does setting dag
> concurrency and/or task concurrency appropriately solve your problem?
>
> Chris
>
> On Thu, Sep 20, 2018 at 8:31 PM David Szakallas <dszakallas@whitepages.com
>
> wrote:
>
>> What I am doing is very similar. However I am including the DagRun's id
in
>> the pool name to make it unique, as I need to make sure every run gets
its
>> own pool. I am getting that from the context object, which is only
>> available within execute methods or templates. How do you make sure each
>> run has it's own pool?
>>
>>
>> Thanks,
>>
>> Dávid Szakállas
>> Software Engineer | Whitepages Data Services
>>
>> ________________________________
>> From: Taylor Edmiston <te...@gmail.com>
>> Sent: Thursday, September 20, 2018 6:17:05 PM
>> To: dev@airflow.incubator.apache.org
>> Subject: Re: Creating dynamic pool from task
>>
>> I've done something similar.  I have a task at the front of the DAG that
>> ensures the connection pool exists and creates the pool if it doesn't.
>> I've pasted my code below.  This runs in a for loop that creates one DAG
>> per iteration each with its own pool.  Then I pass the pool name into the
>> sensors.
>>
>> Does this work for your use case?
>>
>> --
>>
>> redshift_pool = PythonOperator(
>>    task_id='redshift_pool',
>>    dag=dag,
>>    python_callable=ensure_redshift_pool,
>>    op_kwargs={
>>        'name': workflow.pool,
>>        'slots': REDSHIFT_POOL_SLOTS,
>>    },
>>    ...
>> )
>>
>> @provide_session
>> def ensure_redshift_pool(name, slots, session=None):
>>    pool = Pool(pool=name, slots=slots)
>>    pool_query = (
>>        session.query(Pool)
>>        .filter(Pool.pool == name)
>>    )
>>    pool_query_result = pool_query.one_or_none()
>>    if not pool_query_result:
>>        logger.info(f'redshift pool "{name}" does not exist - creating
>> it')
>>        session.add(pool)
>>        session.commit()
>>        logger.info(f'created redshift pool "{name}"')
>>    else:
>>        logger.info(f'redshift pool "{name}" already exists')
>>
>> --
>>
>> *Taylor Edmiston*
>> Blog <https://blog.tedmiston.com/> | LinkedIn
>> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow
>> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer
Story
>> <https://stackoverflow.com/story/taylor>
>>
>>
>>
>> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <
>> dszakallas@whitepages.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I have a DAG that creates a cluster, starts computation tasks, and after
>>> they completed, tears down the cluster. I want to limit concurrency for
>> the
>>> computation tasks carried on this cluster to fixed number. So logically,
>> I
>>> need a pool that is exclusive to the cluster created by a task. I don't
>>> want interference with other DAGs or different runs of the same DAG.
>>>
>>> I thought I could solve this problem by creating a pool dynamically from
>> a
>>> task after the cluster is created and delete it once the computation
>> tasks
>>> are finished. I thought I could template the pool parameter of the
>>> computation tasks to make them use this dynamically created cluster.
>>>
>>> However this way the computation tasks will never be triggered. So I
>> think
>>> the pool parameter is saved in the task instance before being templated.
>> I
>>> would like to hear your thoughts on how to achieve the desired behavior.
>>>
>>> Thanks,
>>>
>>> Dávid Szakállas
>>> Software Engineer | Whitepages Data Services
>>>
>>>
>>>
>>>
>>>
>>

Re: Creating dynamic pool from task

Posted by David Szakallas <ds...@whitepages.com>.
Chris, the tasks are independent of each other so they can run concurrently. I have to limit the concurrency though, so they don’t starve. As the cluster is created dynamically with a task, a shared pool with other DAGs or other runs of the same DAG is not preferable.

I imagined something like this:

                                                                    .——>   [compute_1] ——.
                                                                  / ——>   [compute_2] ——  \
                                                                /                       .                        \
[create_cluster] —> [create_pool_x6]                         .                        [ delete_pool ] —> [delete cluster]
                                                               \                        .                        /
                                                                 \ ——>   [compute_19] —— /
                                                                   . ——>  [compute_20] ——.
Thanks,
David

> On Sep 21, 2018, at 7:23 PM, Chris Palmer <ch...@crpalmer.com> wrote:
> 
> What would cause multiple computation tasks to run on the cluster at the
> same time? Are you worried about concurrent DagRuns? Does setting dag
> concurrency and/or task concurrency appropriately solve your problem?
> 
> Chris
> 
> On Thu, Sep 20, 2018 at 8:31 PM David Szakallas <ds...@whitepages.com>
> wrote:
> 
>> What I am doing is very similar. However I am including the DagRun's id in
>> the pool name to make it unique, as I need to make sure every run gets its
>> own pool. I am getting that from the context object, which is only
>> available within execute methods or templates. How do you make sure each
>> run has it's own pool?
>> 
>> 
>> Thanks,
>> 
>> Dávid Szakállas
>> Software Engineer | Whitepages Data Services
>> 
>> ________________________________
>> From: Taylor Edmiston <te...@gmail.com>
>> Sent: Thursday, September 20, 2018 6:17:05 PM
>> To: dev@airflow.incubator.apache.org
>> Subject: Re: Creating dynamic pool from task
>> 
>> I've done something similar.  I have a task at the front of the DAG that
>> ensures the connection pool exists and creates the pool if it doesn't.
>> I've pasted my code below.  This runs in a for loop that creates one DAG
>> per iteration each with its own pool.  Then I pass the pool name into the
>> sensors.
>> 
>> Does this work for your use case?
>> 
>> --
>> 
>> redshift_pool = PythonOperator(
>>    task_id='redshift_pool',
>>    dag=dag,
>>    python_callable=ensure_redshift_pool,
>>    op_kwargs={
>>        'name': workflow.pool,
>>        'slots': REDSHIFT_POOL_SLOTS,
>>    },
>>    ...
>> )
>> 
>> @provide_session
>> def ensure_redshift_pool(name, slots, session=None):
>>    pool = Pool(pool=name, slots=slots)
>>    pool_query = (
>>        session.query(Pool)
>>        .filter(Pool.pool == name)
>>    )
>>    pool_query_result = pool_query.one_or_none()
>>    if not pool_query_result:
>>        logger.info(f'redshift pool "{name}" does not exist - creating
>> it')
>>        session.add(pool)
>>        session.commit()
>>        logger.info(f'created redshift pool "{name}"')
>>    else:
>>        logger.info(f'redshift pool "{name}" already exists')
>> 
>> --
>> 
>> *Taylor Edmiston*
>> Blog <https://blog.tedmiston.com/> | LinkedIn
>> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow
>> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story
>> <https://stackoverflow.com/story/taylor>
>> 
>> 
>> 
>> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <
>> dszakallas@whitepages.com>
>> wrote:
>> 
>>> Hi all,
>>> 
>>> I have a DAG that creates a cluster, starts computation tasks, and after
>>> they completed, tears down the cluster. I want to limit concurrency for
>> the
>>> computation tasks carried on this cluster to fixed number. So logically,
>> I
>>> need a pool that is exclusive to the cluster created by a task. I don't
>>> want interference with other DAGs or different runs of the same DAG.
>>> 
>>> I thought I could solve this problem by creating a pool dynamically from
>> a
>>> task after the cluster is created and delete it once the computation
>> tasks
>>> are finished. I thought I could template the pool parameter of the
>>> computation tasks to make them use this dynamically created cluster.
>>> 
>>> However this way the computation tasks will never be triggered. So I
>> think
>>> the pool parameter is saved in the task instance before being templated.
>> I
>>> would like to hear your thoughts on how to achieve the desired behavior.
>>> 
>>> Thanks,
>>> 
>>> Dávid Szakállas
>>> Software Engineer | Whitepages Data Services
>>> 
>>> 
>>> 
>>> 
>>> 
>> 


Re: Creating dynamic pool from task

Posted by Chris Palmer <ch...@crpalmer.com>.
What would cause multiple computation tasks to run on the cluster at the
same time? Are you worried about concurrent DagRuns? Does setting dag
concurrency and/or task concurrency appropriately solve your problem?

Chris

On Thu, Sep 20, 2018 at 8:31 PM David Szakallas <ds...@whitepages.com>
wrote:

> What I am doing is very similar. However I am including the DagRun's id in
> the pool name to make it unique, as I need to make sure every run gets its
> own pool. I am getting that from the context object, which is only
> available within execute methods or templates. How do you make sure each
> run has it's own pool?
>
>
> Thanks,
>
> Dávid Szakállas
> Software Engineer | Whitepages Data Services
>
> ________________________________
> From: Taylor Edmiston <te...@gmail.com>
> Sent: Thursday, September 20, 2018 6:17:05 PM
> To: dev@airflow.incubator.apache.org
> Subject: Re: Creating dynamic pool from task
>
> I've done something similar.  I have a task at the front of the DAG that
> ensures the connection pool exists and creates the pool if it doesn't.
> I've pasted my code below.  This runs in a for loop that creates one DAG
> per iteration each with its own pool.  Then I pass the pool name into the
> sensors.
>
> Does this work for your use case?
>
> --
>
> redshift_pool = PythonOperator(
>     task_id='redshift_pool',
>     dag=dag,
>     python_callable=ensure_redshift_pool,
>     op_kwargs={
>         'name': workflow.pool,
>         'slots': REDSHIFT_POOL_SLOTS,
>     },
>     ...
> )
>
> @provide_session
> def ensure_redshift_pool(name, slots, session=None):
>     pool = Pool(pool=name, slots=slots)
>     pool_query = (
>         session.query(Pool)
>         .filter(Pool.pool == name)
>     )
>     pool_query_result = pool_query.one_or_none()
>     if not pool_query_result:
>         logger.info(f'redshift pool "{name}" does not exist - creating
> it')
>         session.add(pool)
>         session.commit()
>         logger.info(f'created redshift pool "{name}"')
>     else:
>         logger.info(f'redshift pool "{name}" already exists')
>
> --
>
> *Taylor Edmiston*
> Blog <https://blog.tedmiston.com/> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow
> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story
> <https://stackoverflow.com/story/taylor>
>
>
>
> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <
> dszakallas@whitepages.com>
> wrote:
>
> > Hi all,
> >
> > I have a DAG that creates a cluster, starts computation tasks, and after
> > they completed, tears down the cluster. I want to limit concurrency for
> the
> > computation tasks carried on this cluster to fixed number. So logically,
> I
> > need a pool that is exclusive to the cluster created by a task. I don't
> > want interference with other DAGs or different runs of the same DAG.
> >
> > I thought I could solve this problem by creating a pool dynamically from
> a
> > task after the cluster is created and delete it once the computation
> tasks
> > are finished. I thought I could template the pool parameter of the
> > computation tasks to make them use this dynamically created cluster.
> >
> > However this way the computation tasks will never be triggered. So I
> think
> > the pool parameter is saved in the task instance before being templated.
> I
> > would like to hear your thoughts on how to achieve the desired behavior.
> >
> > Thanks,
> >
> > Dávid Szakállas
> > Software Engineer | Whitepages Data Services
> >
> >
> >
> >
> >
>

Re: Creating dynamic pool from task

Posted by David Szakallas <ds...@whitepages.com>.
What I am doing is very similar. However I am including the DagRun's id in the pool name to make it unique, as I need to make sure every run gets its own pool. I am getting that from the context object, which is only available within execute methods or templates. How do you make sure each run has it's own pool?


Thanks,

Dávid Szakállas
Software Engineer | Whitepages Data Services

________________________________
From: Taylor Edmiston <te...@gmail.com>
Sent: Thursday, September 20, 2018 6:17:05 PM
To: dev@airflow.incubator.apache.org
Subject: Re: Creating dynamic pool from task

I've done something similar.  I have a task at the front of the DAG that
ensures the connection pool exists and creates the pool if it doesn't.
I've pasted my code below.  This runs in a for loop that creates one DAG
per iteration each with its own pool.  Then I pass the pool name into the
sensors.

Does this work for your use case?

--

redshift_pool = PythonOperator(
    task_id='redshift_pool',
    dag=dag,
    python_callable=ensure_redshift_pool,
    op_kwargs={
        'name': workflow.pool,
        'slots': REDSHIFT_POOL_SLOTS,
    },
    ...
)

@provide_session
def ensure_redshift_pool(name, slots, session=None):
    pool = Pool(pool=name, slots=slots)
    pool_query = (
        session.query(Pool)
        .filter(Pool.pool == name)
    )
    pool_query_result = pool_query.one_or_none()
    if not pool_query_result:
        logger.info(f'redshift pool "{name}" does not exist - creating it')
        session.add(pool)
        session.commit()
        logger.info(f'created redshift pool "{name}"')
    else:
        logger.info(f'redshift pool "{name}" already exists')

--

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story
<https://stackoverflow.com/story/taylor>



On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <ds...@whitepages.com>
wrote:

> Hi all,
>
> I have a DAG that creates a cluster, starts computation tasks, and after
> they completed, tears down the cluster. I want to limit concurrency for the
> computation tasks carried on this cluster to fixed number. So logically, I
> need a pool that is exclusive to the cluster created by a task. I don't
> want interference with other DAGs or different runs of the same DAG.
>
> I thought I could solve this problem by creating a pool dynamically from a
> task after the cluster is created and delete it once the computation tasks
> are finished. I thought I could template the pool parameter of the
> computation tasks to make them use this dynamically created cluster.
>
> However this way the computation tasks will never be triggered. So I think
> the pool parameter is saved in the task instance before being templated. I
> would like to hear your thoughts on how to achieve the desired behavior.
>
> Thanks,
>
> Dávid Szakállas
> Software Engineer | Whitepages Data Services
>
>
>
>
>

Re: Creating dynamic pool from task

Posted by Taylor Edmiston <te...@gmail.com>.
I've done something similar.  I have a task at the front of the DAG that
ensures the connection pool exists and creates the pool if it doesn't.
I've pasted my code below.  This runs in a for loop that creates one DAG
per iteration each with its own pool.  Then I pass the pool name into the
sensors.

Does this work for your use case?

--

redshift_pool = PythonOperator(
    task_id='redshift_pool',
    dag=dag,
    python_callable=ensure_redshift_pool,
    op_kwargs={
        'name': workflow.pool,
        'slots': REDSHIFT_POOL_SLOTS,
    },
    ...
)

@provide_session
def ensure_redshift_pool(name, slots, session=None):
    pool = Pool(pool=name, slots=slots)
    pool_query = (
        session.query(Pool)
        .filter(Pool.pool == name)
    )
    pool_query_result = pool_query.one_or_none()
    if not pool_query_result:
        logger.info(f'redshift pool "{name}" does not exist - creating it')
        session.add(pool)
        session.commit()
        logger.info(f'created redshift pool "{name}"')
    else:
        logger.info(f'redshift pool "{name}" already exists')

--

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story
<https://stackoverflow.com/story/taylor>



On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <ds...@whitepages.com>
wrote:

> Hi all,
>
> I have a DAG that creates a cluster, starts computation tasks, and after
> they completed, tears down the cluster. I want to limit concurrency for the
> computation tasks carried on this cluster to fixed number. So logically, I
> need a pool that is exclusive to the cluster created by a task. I don't
> want interference with other DAGs or different runs of the same DAG.
>
> I thought I could solve this problem by creating a pool dynamically from a
> task after the cluster is created and delete it once the computation tasks
> are finished. I thought I could template the pool parameter of the
> computation tasks to make them use this dynamically created cluster.
>
> However this way the computation tasks will never be triggered. So I think
> the pool parameter is saved in the task instance before being templated. I
> would like to hear your thoughts on how to achieve the desired behavior.
>
> Thanks,
>
> Dávid Szakállas
> Software Engineer | Whitepages Data Services
>
>
>
>
>