You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by J C Lawrence <cl...@kanga.nu> on 2016/09/08 20:50:32 UTC

Dynamically defining tasks in a DAG -- HOW?

I have a few hundred thousand files arriving from an external service
each day and would like to ETL their contents into my store with
Airflow.  As the files are large and numerous and slow to process, I'd
also like to process them in parallel...so I thought something like
this:

    def sub_dag (
        parent_dag_name, 
        child_dag_name, 
        start_date,
        schedule_interval):
      dag = DAG(
        "%s.%s" % (parent_dag_name, child_dag_name),
        schedule_interval = schedule_interval,
        start_date = start_date,
      )
      fan_out = operators.DummyOperator(
        task_id = "fan_out",
        dag = dag,
      )
      fan_in = operators.DummyOperator(
        task_id = "fan_in",
        dag = dag,
      )
      cur = hooks.PostgresHook ("MY_DB").get_cursor ()
      cur.execute ("""SELECT file_id 
                     FROM some_table
                     WHERE something;""".format (foo = func(start_date))
      for rec in cur: 
        fid = rec[0]
        o = operators.PythonOperator (
          task_id = "ImportThing__%s" % fid,
          provide_context = True,
          python_callable = import_func,
          params = {"file_id": fid,},
          dag = dag)
        o.set_upstream (fan_out)
        o.set_downstream (fan_in)
      cur.close ()
      return dag

The idea being that the number and identity of the tasks in the sub-DAG
would vary dynamically depending on what day it was running for (ie
which what rows come back from the query for that day). But...no, this
doesn't seem to work.

Any recommendations for how to approach this?

-- JCL

Re: Dynamically defining tasks in a DAG -- HOW?

Posted by Adinata <ma...@gmail.com>.
Hi,

I also have this kind of use case. We need to generate reports for many
users where the selected users comes out from where specific queries.

I did something like this:

[image: Inline image 1]

With the relevant DAG definition:

for i in range(0, int(Variable.get('weekly-mailer-worker', '5'))):
    send_email = ExecuteFannedOutOperator(
        task_id='execute-%s-%s' % (task_id, i),
        dag=dag,
        tolerance=100
    )
    start_weekly_mailer >> send_email
    send_email >> post_execute

start-weekly-mailer will run the queries, and then send task definition to
queue (we used SQS). The execute-weekly-mailer will pull the task
definition from the queue, and will keep running until the queue is empty.
The post-execute mostly for cleanups.

If you see the DAG definition, I uses Variable, so I can increase the
number of worker from webserver (I can't decrease already running task,
because, you know, this is a hack). I think you can also set the Variable
on the start-weekly-mailer, to define how many parallel worker will need to
do, e.g. based on the number of row results.

I hope we can improve airflow to also handle this kind of uses case.


--
*Adinata*
TOKI 2009
SMAN Plus Provinsi Riau 9th
13509022 - Informatika ITB 2009
Engineer - UrbanIndo.com

On Fri, Sep 9, 2016 at 6:12 AM, J C Lawrence <cl...@kanga.nu> wrote:

> On Thu, 8 Sep 2016 14:04:58 -0700, Ben Tallman <be...@apigee.com> wrote:
>
> > We have done this a lot, and the one issue is that every time the DAG
> > is evaluated (even during a run), the SQL will be re-run, and tasks
> > can vary. In fact, we had a select statement that actually marked
> > items as in process during select, and THAT was bad.
>
> Yeah, I'm keeping an eye on that.
>
> The problem I'm having however is that the DAGs are not getting
> populated with the tasks relevant to that specific scheduling run.  Do
> you have this working under Airflow today?
>
> > We have moved to x number of tasks, and each one grabs a line from
> > the DB, and 0 to n of them can actually get skipped if they don't get
> > a line from the DB.
>
> Yeah, I just don't want to a) setup yet another DB/table for an
> interstitial process or b) to re-invent dispatch/locking against a DB
> yet again.
>
> -- JCL
>

Re: Dynamically defining tasks in a DAG -- HOW?

Posted by J C Lawrence <cl...@kanga.nu>.
On Thu, 8 Sep 2016 14:04:58 -0700, Ben Tallman <be...@apigee.com> wrote:

> We have done this a lot, and the one issue is that every time the DAG
> is evaluated (even during a run), the SQL will be re-run, and tasks
> can vary. In fact, we had a select statement that actually marked
> items as in process during select, and THAT was bad.

Yeah, I'm keeping an eye on that.

The problem I'm having however is that the DAGs are not getting
populated with the tasks relevant to that specific scheduling run.  Do
you have this working under Airflow today?

> We have moved to x number of tasks, and each one grabs a line from
> the DB, and 0 to n of them can actually get skipped if they don't get
> a line from the DB.

Yeah, I just don't want to a) setup yet another DB/table for an
interstitial process or b) to re-invent dispatch/locking against a DB
yet again.

-- JCL

Re: Dynamically defining tasks in a DAG -- HOW?

Posted by Lance Norskog <la...@gmail.com>.
Also, you might wish to add a Pool so that only N tasks go against your
Postgres server at any one time.
We have pools for all of our resources.


On Thu, Sep 8, 2016 at 2:04 PM, Ben Tallman <be...@apigee.com> wrote:

> We have done this a lot, and the one issue is that every time the DAG is
> evaluated (even during a run), the SQL will be re-run, and tasks can vary.
> In fact, we had a select statement that actually marked items as in process
> during select, and THAT was bad.
>
> We have moved to x number of tasks, and each one grabs a line from the DB,
> and 0 to n of them can actually get skipped if they don't get a line from
> the DB.
>
> To be clear, we would really like the DAG's tasks to be frozen at time of
> schedule, but that has not been our experience, and I believe will take a
> fairly major re-factor. Furthermore, I believe that the definition of a
> Dynamic Acyclic Graph is that it is re-evaluated during runtime and that
> the path is non-determinate at runtime.
>
>
> Thanks,
> Ben
>
> *--*
> *ben tallman* | *apigee
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM
> JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%
> 2F%2Fwww.apigee.com%2F&si=5141814536306688&pi=999a610c-
> 8298-4095-eefd-dfab06b90c1f>*
>  | m: +1.503.680.5709 | o: +1.503.608.7552 | twitter @anonymousmanage
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM
> JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%
> 2F%2Ftwitter.com%2Fanonymousmanage&si=5141814536306688&pi=999a610c-
> 8298-4095-eefd-dfab06b90c1f>
>  @apigee
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM
> JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=https%
> 3A%2F%2Ftwitter.com%2Fapigee&si=5141814536306688&pi=
> 999a610c-8298-4095-eefd-dfab06b90c1f>
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM
> JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%
> 2F%2Fadapt.apigee.com%2F&si=5141814536306688&pi=999a610c-
> 8298-4095-eefd-dfab06b90c1f>
>
> On Thu, Sep 8, 2016 at 1:50 PM, J C Lawrence <cl...@kanga.nu> wrote:
>
> > I have a few hundred thousand files arriving from an external service
> > each day and would like to ETL their contents into my store with
> > Airflow.  As the files are large and numerous and slow to process, I'd
> > also like to process them in parallel...so I thought something like
> > this:
> >
> >     def sub_dag (
> >         parent_dag_name,
> >         child_dag_name,
> >         start_date,
> >         schedule_interval):
> >       dag = DAG(
> >         "%s.%s" % (parent_dag_name, child_dag_name),
> >         schedule_interval = schedule_interval,
> >         start_date = start_date,
> >       )
> >       fan_out = operators.DummyOperator(
> >         task_id = "fan_out",
> >         dag = dag,
> >       )
> >       fan_in = operators.DummyOperator(
> >         task_id = "fan_in",
> >         dag = dag,
> >       )
> >       cur = hooks.PostgresHook ("MY_DB").get_cursor ()
> >       cur.execute ("""SELECT file_id
> >                      FROM some_table
> >                      WHERE something;""".format (foo = func(start_date))
> >       for rec in cur:
> >         fid = rec[0]
> >         o = operators.PythonOperator (
> >           task_id = "ImportThing__%s" % fid,
> >           provide_context = True,
> >           python_callable = import_func,
> >           params = {"file_id": fid,},
> >           dag = dag)
> >         o.set_upstream (fan_out)
> >         o.set_downstream (fan_in)
> >       cur.close ()
> >       return dag
> >
> > The idea being that the number and identity of the tasks in the sub-DAG
> > would vary dynamically depending on what day it was running for (ie
> > which what rows come back from the query for that day). But...no, this
> > doesn't seem to work.
> >
> > Any recommendations for how to approach this?
> >
> > -- JCL
> >
>



-- 
Lance Norskog
lance.norskog@gmail.com
Redwood City, CA

Re: Dynamically defining tasks in a DAG -- HOW?

Posted by Ben Tallman <be...@apigee.com>.
We have done this a lot, and the one issue is that every time the DAG is
evaluated (even during a run), the SQL will be re-run, and tasks can vary.
In fact, we had a select statement that actually marked items as in process
during select, and THAT was bad.

We have moved to x number of tasks, and each one grabs a line from the DB,
and 0 to n of them can actually get skipped if they don't get a line from
the DB.

To be clear, we would really like the DAG's tasks to be frozen at time of
schedule, but that has not been our experience, and I believe will take a
fairly major re-factor. Furthermore, I believe that the definition of a
Dynamic Acyclic Graph is that it is re-evaluated during runtime and that
the path is non-determinate at runtime.


Thanks,
Ben

*--*
*ben tallman* | *apigee
<http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%2F%2Fwww.apigee.com%2F&si=5141814536306688&pi=999a610c-8298-4095-eefd-dfab06b90c1f>*
 | m: +1.503.680.5709 | o: +1.503.608.7552 | twitter @anonymousmanage
<http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%2F%2Ftwitter.com%2Fanonymousmanage&si=5141814536306688&pi=999a610c-8298-4095-eefd-dfab06b90c1f>
 @apigee
<http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=https%3A%2F%2Ftwitter.com%2Fapigee&si=5141814536306688&pi=999a610c-8298-4095-eefd-dfab06b90c1f>
<http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%2F%2Fadapt.apigee.com%2F&si=5141814536306688&pi=999a610c-8298-4095-eefd-dfab06b90c1f>

On Thu, Sep 8, 2016 at 1:50 PM, J C Lawrence <cl...@kanga.nu> wrote:

> I have a few hundred thousand files arriving from an external service
> each day and would like to ETL their contents into my store with
> Airflow.  As the files are large and numerous and slow to process, I'd
> also like to process them in parallel...so I thought something like
> this:
>
>     def sub_dag (
>         parent_dag_name,
>         child_dag_name,
>         start_date,
>         schedule_interval):
>       dag = DAG(
>         "%s.%s" % (parent_dag_name, child_dag_name),
>         schedule_interval = schedule_interval,
>         start_date = start_date,
>       )
>       fan_out = operators.DummyOperator(
>         task_id = "fan_out",
>         dag = dag,
>       )
>       fan_in = operators.DummyOperator(
>         task_id = "fan_in",
>         dag = dag,
>       )
>       cur = hooks.PostgresHook ("MY_DB").get_cursor ()
>       cur.execute ("""SELECT file_id
>                      FROM some_table
>                      WHERE something;""".format (foo = func(start_date))
>       for rec in cur:
>         fid = rec[0]
>         o = operators.PythonOperator (
>           task_id = "ImportThing__%s" % fid,
>           provide_context = True,
>           python_callable = import_func,
>           params = {"file_id": fid,},
>           dag = dag)
>         o.set_upstream (fan_out)
>         o.set_downstream (fan_in)
>       cur.close ()
>       return dag
>
> The idea being that the number and identity of the tasks in the sub-DAG
> would vary dynamically depending on what day it was running for (ie
> which what rows come back from the query for that day). But...no, this
> doesn't seem to work.
>
> Any recommendations for how to approach this?
>
> -- JCL
>