You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by sr...@gmail.com, sr...@gmail.com on 2018/07/23 00:17:52 UTC

Simple DAG Structure

Hi - 

   I have recently started using Airflow version 1.9.0 and am having some difficulty setting up a very simple DAG. I have three tasks A, B and C. I'd like A to run every day at 10am and B at 11am. C depends on BOTH A and B running successfully.

   Initially, I decided to create one DAG, add all three tasks to it and set C as downstream to A and B. I then set the schedule_interval of the DAG to @daily. But this meant I couldn't run A and B at 10am and 11am respectively since the they are PythonOperators and tasks dont support schedule_interval (or, at least, it's deprecated syntax and gets ignored).

   I scratched that idea and then created A and B as DAGs, specified the schedule interval as per the cron syntax: '00 10 * * *' for A and '00 11 * * *' for B.  But now when I set C as a downstream of A and B, it complains that C can't belong to two different dags.

   How do I accomplish such a simple dependency structure? 

Ram.

Re: Simple DAG Structure

Posted by Andrew Maguire <an...@gmail.com>.
dag A runs at 10 then after last successful task in that you could log a
message to a db or file somewhere.
dag b runs at 11 then after tasks in b have run just have a python (branch
operator with a do nothing or raise error type path) or check operator to
go do one task to check if dag A reported success and if so then do
whatever else is needed. So no sensors or polling tasks.

example where ive done this is with data where you can more naturally check
the data that should have been produced by dag A (based on last modified
date and things like that) as part of dag B.

you could (depending on set up, maybe might not have access to underlying
airflow db) even just have a task in dag b to go read the tasks tables from
airflow db to see if a successful run of A can be found within the
timeframes as expected.

On Mon, Jul 23, 2018 at 7:27 PM srinivas.ramabhadran@gmail.com <
srinivas.ramabhadran@gmail.com> wrote:

> Andrew -
>
>    I guess I am not sure how the CheckOperator is implemented, but
> wouldn't it amount to the same thing i.e. unnecessary polling? I imagine
> some process is kicked off somewhere and repeatedly polls to check if A and
> B are both done writing their outcome. I do not want to convert what is
> essentially a time dependency (and what I consider to be in the purview of
> the scheduler) into some sort of polling solution.
>
>    I am looking for a solution that respects the time dependencies of A
> and B and only runs them at their specified time. C being a child of A and
> B will run only on successful completion of the two. No task (sensor, check
> or any other poller) ever runs outside of this schedule. The scheduler
> itself might poll but we are not launching new processes that mostly just
> sleep.
>
> Ram.
>
> On 2018/07/23 17:58:56, Andrew Maguire <an...@gmail.com> wrote:
> > Maybe you could have A and B report their outcome somewhere and then use
> > that output, read back in from somewhere, as a check operator in C.
> >
> > This is kinda reinventing the wheel a little bit though as ideally would
> be
> > a way to keep all that state inside airflow.
> >
> > I think what I suggest would work, but maybe a little hackish.
> >
> > On Mon, 23 Jul 2018, 14:33 srinivas.ramabhadran@gmail.com, <
> > srinivas.ramabhadran@gmail.com> wrote:
> >
> > > Carl -
> > >
> > >    Thanks, that definitely works, but it's non-ideal. If I had 100s of
> > > jobs running throughout the day, a TimeSensor task (process) gets
> created
> > > for each task at midnight even though a task may not be required to
> run for
> > > a very long time (e.g. a whole bunch of tasks need to run @ 20:00. All
> of
> > > their time sensors are kicked off at 00:00). Worse still, if I used a
> > > LocalExcecutor with a pool size of 10, some jobs that need to run
> early may
> > > not even get scheduled in favor of time sensors for tasks later in the
> day
> > > which only perform a sleep operation.
> > >
> > >    Is there another way to do this? If not, is there at least another
> way
> > > around the LocalExecutor problem?
> > >
> > > Ram.
> > >
> > >
> > > On 2018/07/23 08:23:45, Carl Johan Gustavsson <
> carl.j.gustavsson@gmail.com>
> > > wrote:
> > > > Hi Ram,
> > > >
> > > > You can have a single DAG scheduled to 10am, which starts A and then
> use
> > > a TimeSensor set to 11 am that B depends on  and then have C depend on
> A
> > > and B.
> > > >
> > > > Something like:
> > > >
> > > > a = BashOperator(‘a’, …)
> > > >
> > > > delay_b = TimeSensor(‘delay_b’, target_time=time(11, 0, 0), …)
> > > > b = BashOperator(‘b’, …)
> > > > b.set_upstream(delay_b)
> > > >
> > > > c = BashOperator(‘c’, …)
> > > > c.set_upstream(a)
> > > > c.set_upstream(b)
> > > >
> > > >
> > > > / Carl Johan
> > > > On 23 July 2018 at 02:18:00, srinivas.ramabhadran@gmail.com (
> > > srinivas.ramabhadran@gmail.com) wrote:
> > > >
> > > > Hi -
> > > >
> > > > I have recently started using Airflow version 1.9.0 and am having
> some
> > > difficulty setting up a very simple DAG. I have three tasks A, B and
> C. I'd
> > > like A to run every day at 10am and B at 11am. C depends on BOTH A and
> B
> > > running successfully.
> > > >
> > > > Initially, I decided to create one DAG, add all three tasks to it and
> > > set C as downstream to A and B. I then set the schedule_interval of
> the DAG
> > > to @daily. But this meant I couldn't run A and B at 10am and 11am
> > > respectively since the they are PythonOperators and tasks dont support
> > > schedule_interval (or, at least, it's deprecated syntax and gets
> ignored).
> > > >
> > > > I scratched that idea and then created A and B as DAGs, specified the
> > > schedule interval as per the cron syntax: '00 10 * * *' for A and '00
> 11 *
> > > * *' for B. But now when I set C as a downstream of A and B, it
> complains
> > > that C can't belong to two different dags.
> > > >
> > > > How do I accomplish such a simple dependency structure?
> > > >
> > > > Ram.
> > > >
> > >
> >
>

RE: Simple DAG Structure

Posted by Jiening Wen <ji...@Optiver.com>.
Hi Ram,

Maybe you can try to run,
- a DAG containing only task A which is scheduled at 10am
- another DAG containing task B scheduled at 11am
- yet another DAG scheduled at some time later, with two ExternalTaskSensors waiting on A and B to finish, and then kick off task C

This might seem an unnecessary overkill but can reduce the time "wasted" on sensors.

Best regards,
Jiening

-----Original Message-----
From: srinivas.ramabhadran@gmail.com [mailto:srinivas.ramabhadran@gmail.com] 
Sent: Monday 23 July 2018 8:28 PM
To: dev@airflow.apache.org
Subject: Re: Simple DAG Structure [External]

Andrew - 

   I guess I am not sure how the CheckOperator is implemented, but wouldn't it amount to the same thing i.e. unnecessary polling? I imagine some process is kicked off somewhere and repeatedly polls to check if A and B are both done writing their outcome. I do not want to convert what is essentially a time dependency (and what I consider to be in the purview of the scheduler) into some sort of polling solution. 

   I am looking for a solution that respects the time dependencies of A and B and only runs them at their specified time. C being a child of A and B will run only on successful completion of the two. No task (sensor, check or any other poller) ever runs outside of this schedule. The scheduler itself might poll but we are not launching new processes that mostly just sleep.

Ram.

On 2018/07/23 17:58:56, Andrew Maguire <an...@gmail.com> wrote: 
> Maybe you could have A and B report their outcome somewhere and then 
> use that output, read back in from somewhere, as a check operator in C.
> 
> This is kinda reinventing the wheel a little bit though as ideally 
> would be a way to keep all that state inside airflow.
> 
> I think what I suggest would work, but maybe a little hackish.
> 
> On Mon, 23 Jul 2018, 14:33 srinivas.ramabhadran@gmail.com, < 
> srinivas.ramabhadran@gmail.com> wrote:
> 
> > Carl -
> >
> >    Thanks, that definitely works, but it's non-ideal. If I had 100s 
> > of jobs running throughout the day, a TimeSensor task (process) gets 
> > created for each task at midnight even though a task may not be 
> > required to run for a very long time (e.g. a whole bunch of tasks 
> > need to run @ 20:00. All of their time sensors are kicked off at 
> > 00:00). Worse still, if I used a LocalExcecutor with a pool size of 
> > 10, some jobs that need to run early may not even get scheduled in 
> > favor of time sensors for tasks later in the day which only perform a sleep operation.
> >
> >    Is there another way to do this? If not, is there at least 
> > another way around the LocalExecutor problem?
> >
> > Ram.
> >
> >
> > On 2018/07/23 08:23:45, Carl Johan Gustavsson 
> > <ca...@gmail.com>
> > wrote:
> > > Hi Ram,
> > >
> > > You can have a single DAG scheduled to 10am, which starts A and 
> > > then use
> > a TimeSensor set to 11 am that B depends on  and then have C depend 
> > on A and B.
> > >
> > > Something like:
> > >
> > > a = BashOperator(‘a’, …)
> > >
> > > delay_b = TimeSensor(‘delay_b’, target_time=time(11, 0, 0), …) b = 
> > > BashOperator(‘b’, …)
> > > b.set_upstream(delay_b)
> > >
> > > c = BashOperator(‘c’, …)
> > > c.set_upstream(a)
> > > c.set_upstream(b)
> > >
> > >
> > > / Carl Johan
> > > On 23 July 2018 at 02:18:00, srinivas.ramabhadran@gmail.com (
> > srinivas.ramabhadran@gmail.com) wrote:
> > >
> > > Hi -
> > >
> > > I have recently started using Airflow version 1.9.0 and am having 
> > > some
> > difficulty setting up a very simple DAG. I have three tasks A, B and 
> > C. I'd like A to run every day at 10am and B at 11am. C depends on 
> > BOTH A and B running successfully.
> > >
> > > Initially, I decided to create one DAG, add all three tasks to it 
> > > and
> > set C as downstream to A and B. I then set the schedule_interval of 
> > the DAG to @daily. But this meant I couldn't run A and B at 10am and 
> > 11am respectively since the they are PythonOperators and tasks dont 
> > support schedule_interval (or, at least, it's deprecated syntax and gets ignored).
> > >
> > > I scratched that idea and then created A and B as DAGs, specified 
> > > the
> > schedule interval as per the cron syntax: '00 10 * * *' for A and 
> > '00 11 *
> > * *' for B. But now when I set C as a downstream of A and B, it 
> > complains that C can't belong to two different dags.
> > >
> > > How do I accomplish such a simple dependency structure?
> > >
> > > Ram.
> > >
> >
> 

Re: Simple DAG Structure

Posted by sr...@gmail.com, sr...@gmail.com.
Andrew - 

   I guess I am not sure how the CheckOperator is implemented, but wouldn't it amount to the same thing i.e. unnecessary polling? I imagine some process is kicked off somewhere and repeatedly polls to check if A and B are both done writing their outcome. I do not want to convert what is essentially a time dependency (and what I consider to be in the purview of the scheduler) into some sort of polling solution. 

   I am looking for a solution that respects the time dependencies of A and B and only runs them at their specified time. C being a child of A and B will run only on successful completion of the two. No task (sensor, check or any other poller) ever runs outside of this schedule. The scheduler itself might poll but we are not launching new processes that mostly just sleep.

Ram.

On 2018/07/23 17:58:56, Andrew Maguire <an...@gmail.com> wrote: 
> Maybe you could have A and B report their outcome somewhere and then use
> that output, read back in from somewhere, as a check operator in C.
> 
> This is kinda reinventing the wheel a little bit though as ideally would be
> a way to keep all that state inside airflow.
> 
> I think what I suggest would work, but maybe a little hackish.
> 
> On Mon, 23 Jul 2018, 14:33 srinivas.ramabhadran@gmail.com, <
> srinivas.ramabhadran@gmail.com> wrote:
> 
> > Carl -
> >
> >    Thanks, that definitely works, but it's non-ideal. If I had 100s of
> > jobs running throughout the day, a TimeSensor task (process) gets created
> > for each task at midnight even though a task may not be required to run for
> > a very long time (e.g. a whole bunch of tasks need to run @ 20:00. All of
> > their time sensors are kicked off at 00:00). Worse still, if I used a
> > LocalExcecutor with a pool size of 10, some jobs that need to run early may
> > not even get scheduled in favor of time sensors for tasks later in the day
> > which only perform a sleep operation.
> >
> >    Is there another way to do this? If not, is there at least another way
> > around the LocalExecutor problem?
> >
> > Ram.
> >
> >
> > On 2018/07/23 08:23:45, Carl Johan Gustavsson <ca...@gmail.com>
> > wrote:
> > > Hi Ram,
> > >
> > > You can have a single DAG scheduled to 10am, which starts A and then use
> > a TimeSensor set to 11 am that B depends on  and then have C depend on A
> > and B.
> > >
> > > Something like:
> > >
> > > a = BashOperator(‘a’, …)
> > >
> > > delay_b = TimeSensor(‘delay_b’, target_time=time(11, 0, 0), …)
> > > b = BashOperator(‘b’, …)
> > > b.set_upstream(delay_b)
> > >
> > > c = BashOperator(‘c’, …)
> > > c.set_upstream(a)
> > > c.set_upstream(b)
> > >
> > >
> > > / Carl Johan
> > > On 23 July 2018 at 02:18:00, srinivas.ramabhadran@gmail.com (
> > srinivas.ramabhadran@gmail.com) wrote:
> > >
> > > Hi -
> > >
> > > I have recently started using Airflow version 1.9.0 and am having some
> > difficulty setting up a very simple DAG. I have three tasks A, B and C. I'd
> > like A to run every day at 10am and B at 11am. C depends on BOTH A and B
> > running successfully.
> > >
> > > Initially, I decided to create one DAG, add all three tasks to it and
> > set C as downstream to A and B. I then set the schedule_interval of the DAG
> > to @daily. But this meant I couldn't run A and B at 10am and 11am
> > respectively since the they are PythonOperators and tasks dont support
> > schedule_interval (or, at least, it's deprecated syntax and gets ignored).
> > >
> > > I scratched that idea and then created A and B as DAGs, specified the
> > schedule interval as per the cron syntax: '00 10 * * *' for A and '00 11 *
> > * *' for B. But now when I set C as a downstream of A and B, it complains
> > that C can't belong to two different dags.
> > >
> > > How do I accomplish such a simple dependency structure?
> > >
> > > Ram.
> > >
> >
> 

Re: Simple DAG Structure

Posted by Andrew Maguire <an...@gmail.com>.
Maybe you could have A and B report their outcome somewhere and then use
that output, read back in from somewhere, as a check operator in C.

This is kinda reinventing the wheel a little bit though as ideally would be
a way to keep all that state inside airflow.

I think what I suggest would work, but maybe a little hackish.

On Mon, 23 Jul 2018, 14:33 srinivas.ramabhadran@gmail.com, <
srinivas.ramabhadran@gmail.com> wrote:

> Carl -
>
>    Thanks, that definitely works, but it's non-ideal. If I had 100s of
> jobs running throughout the day, a TimeSensor task (process) gets created
> for each task at midnight even though a task may not be required to run for
> a very long time (e.g. a whole bunch of tasks need to run @ 20:00. All of
> their time sensors are kicked off at 00:00). Worse still, if I used a
> LocalExcecutor with a pool size of 10, some jobs that need to run early may
> not even get scheduled in favor of time sensors for tasks later in the day
> which only perform a sleep operation.
>
>    Is there another way to do this? If not, is there at least another way
> around the LocalExecutor problem?
>
> Ram.
>
>
> On 2018/07/23 08:23:45, Carl Johan Gustavsson <ca...@gmail.com>
> wrote:
> > Hi Ram,
> >
> > You can have a single DAG scheduled to 10am, which starts A and then use
> a TimeSensor set to 11 am that B depends on  and then have C depend on A
> and B.
> >
> > Something like:
> >
> > a = BashOperator(‘a’, …)
> >
> > delay_b = TimeSensor(‘delay_b’, target_time=time(11, 0, 0), …)
> > b = BashOperator(‘b’, …)
> > b.set_upstream(delay_b)
> >
> > c = BashOperator(‘c’, …)
> > c.set_upstream(a)
> > c.set_upstream(b)
> >
> >
> > / Carl Johan
> > On 23 July 2018 at 02:18:00, srinivas.ramabhadran@gmail.com (
> srinivas.ramabhadran@gmail.com) wrote:
> >
> > Hi -
> >
> > I have recently started using Airflow version 1.9.0 and am having some
> difficulty setting up a very simple DAG. I have three tasks A, B and C. I'd
> like A to run every day at 10am and B at 11am. C depends on BOTH A and B
> running successfully.
> >
> > Initially, I decided to create one DAG, add all three tasks to it and
> set C as downstream to A and B. I then set the schedule_interval of the DAG
> to @daily. But this meant I couldn't run A and B at 10am and 11am
> respectively since the they are PythonOperators and tasks dont support
> schedule_interval (or, at least, it's deprecated syntax and gets ignored).
> >
> > I scratched that idea and then created A and B as DAGs, specified the
> schedule interval as per the cron syntax: '00 10 * * *' for A and '00 11 *
> * *' for B. But now when I set C as a downstream of A and B, it complains
> that C can't belong to two different dags.
> >
> > How do I accomplish such a simple dependency structure?
> >
> > Ram.
> >
>

Re: Simple DAG Structure

Posted by sr...@gmail.com, sr...@gmail.com.
Carl - 

   Thanks, that definitely works, but it's non-ideal. If I had 100s of jobs running throughout the day, a TimeSensor task (process) gets created for each task at midnight even though a task may not be required to run for a very long time (e.g. a whole bunch of tasks need to run @ 20:00. All of their time sensors are kicked off at 00:00). Worse still, if I used a LocalExcecutor with a pool size of 10, some jobs that need to run early may not even get scheduled in favor of time sensors for tasks later in the day which only perform a sleep operation.

   Is there another way to do this? If not, is there at least another way around the LocalExecutor problem?

Ram.
   

On 2018/07/23 08:23:45, Carl Johan Gustavsson <ca...@gmail.com> wrote: 
> Hi Ram,
> 
> You can have a single DAG scheduled to 10am, which starts A and then use a TimeSensor set to 11 am that B depends on  and then have C depend on A and B.
> 
> Something like:
> 
> a = BashOperator(‘a’, …)
> 
> delay_b = TimeSensor(‘delay_b’, target_time=time(11, 0, 0), …)
> b = BashOperator(‘b’, …)
> b.set_upstream(delay_b)
> 
> c = BashOperator(‘c’, …)
> c.set_upstream(a)
> c.set_upstream(b)
> 
> 
> / Carl Johan
> On 23 July 2018 at 02:18:00, srinivas.ramabhadran@gmail.com (srinivas.ramabhadran@gmail.com) wrote:
> 
> Hi -  
> 
> I have recently started using Airflow version 1.9.0 and am having some difficulty setting up a very simple DAG. I have three tasks A, B and C. I'd like A to run every day at 10am and B at 11am. C depends on BOTH A and B running successfully.  
> 
> Initially, I decided to create one DAG, add all three tasks to it and set C as downstream to A and B. I then set the schedule_interval of the DAG to @daily. But this meant I couldn't run A and B at 10am and 11am respectively since the they are PythonOperators and tasks dont support schedule_interval (or, at least, it's deprecated syntax and gets ignored).  
> 
> I scratched that idea and then created A and B as DAGs, specified the schedule interval as per the cron syntax: '00 10 * * *' for A and '00 11 * * *' for B. But now when I set C as a downstream of A and B, it complains that C can't belong to two different dags.  
> 
> How do I accomplish such a simple dependency structure?  
> 
> Ram.  
> 

Re: Simple DAG Structure

Posted by Carl Johan Gustavsson <ca...@gmail.com>.
Hi Ram,

You can have a single DAG scheduled to 10am, which starts A and then use a TimeSensor set to 11 am that B depends on  and then have C depend on A and B.

Something like:

a = BashOperator(‘a’, …)

delay_b = TimeSensor(‘delay_b’, target_time=time(11, 0, 0), …)
b = BashOperator(‘b’, …)
b.set_upstream(delay_b)

c = BashOperator(‘c’, …)
c.set_upstream(a)
c.set_upstream(b)


/ Carl Johan
On 23 July 2018 at 02:18:00, srinivas.ramabhadran@gmail.com (srinivas.ramabhadran@gmail.com) wrote:

Hi -  

I have recently started using Airflow version 1.9.0 and am having some difficulty setting up a very simple DAG. I have three tasks A, B and C. I'd like A to run every day at 10am and B at 11am. C depends on BOTH A and B running successfully.  

Initially, I decided to create one DAG, add all three tasks to it and set C as downstream to A and B. I then set the schedule_interval of the DAG to @daily. But this meant I couldn't run A and B at 10am and 11am respectively since the they are PythonOperators and tasks dont support schedule_interval (or, at least, it's deprecated syntax and gets ignored).  

I scratched that idea and then created A and B as DAGs, specified the schedule interval as per the cron syntax: '00 10 * * *' for A and '00 11 * * *' for B. But now when I set C as a downstream of A and B, it complains that C can't belong to two different dags.  

How do I accomplish such a simple dependency structure?  

Ram.