You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Gabriel Silk <gs...@dropbox.com.INVALID> on 2018/08/08 17:11:29 UTC

Basic modeling question

Hello Airflow community,

I have a basic question about how best to model a common data pipeline
pattern here at Dropbox.

At Dropbox, all of our logs are ingested and written into Hive in hourly
and/or daily rollups. On top of this data we build many weekly and monthly
rollups, which typically run on a daily cadence and compute results over a
rolling window.

If we have a metric X, it seems natural to put the daily, weekly, and
monthly rollups for metric X all in the same DAG.

However, the different rollups have different dependency structures. The
daily job only depends on a single day partition, whereas the weekly job
depends on 7, the monthly on 28.

In Airflow, it seems the two paradigms for modeling dependencies are:
1) Depend on a *single run of a task* within the same DAG
2) Depend on *multiple runs of task* by using an ExternalTaskSensor

I'm not sure how I could possibly model this scenario using approach #1,
and I'm not sure approach #2 is the most elegant or performant way to model
this scenario.

Any thoughts or suggestions?

Re: Basic modeling question

Posted by Maxime Beauchemin <ma...@gmail.com>.
There's also the hack of using templating to skip executions. Say for a
BashOperator:

{% if execution_date.weekday() == 1 %}
echo "skipping today"
{% else %}
./run_workload.sh
{% endif %}

On Wed, Aug 8, 2018 at 4:27 PM Gabriel Silk <gs...@dropbox.com.invalid>
wrote:

> Alexis, do you mean you would have done this using an ExternalTaskSensor?
> Or is there some other way to depend on a range of tasks?
>
> On Wed, Aug 8, 2018 at 3:35 PM, Alexis Rolland <alexis.rolland@ubisoft.com
> >
> wrote:
>
> > Not sure if it’s optimal compared to what James proposes, but I would
> have
> > simply made the weekly and monthly rollup tasks as downstream tasks of
> the
> > daily log ingestion tasks they depend on. Then I would have used trigger
> > rules ‘all_done’ to ensure those rollup tasks start when their parent
> tasks
> > are completed.
> >
> > https://airflow.incubator.apache.org/concepts.html#trigger-rules
> >
> > (daily log ingestion) > (daily rollup)
> > (daily log ingestion) > (weekly rollup + TriggerRule.all_done)
> > (daily log ingestion) > (monthly rollup + TriggerRule.all_done)
> >
> > Cheers
> >
> > Alexis
> >
> > On 9 Aug 2018, at 02:57, James Meickle <jmeickle@quantopian.com.
> > INVALID<ma...@quantopian.com.INVALID>> wrote:
> >
> > It sounds like you want something like this?
> >
> > root_operator = DummyOperator()
> >
> > def offset_operator(i):
> >  my_sql_query = "SELECT * FROM {{{{ds_add(execution_date, {offset})
> > }}}};".format(offset=i)
> >  sql_operator = SQLOperator(task_id="offset_by_{}".format(i)",
> > query=my_sql_query)
> >  return sql_operator
> >
> > offset_operators = list(offset_operator(i) for i in range(7))
> > root_operator >> offset_operators
> >
> > # Daily just waits on today, no offset
> > do_daily_work = DummyOperator()
> > offset_operators[0] >> do_daily_work
> >
> > # Weekly waits on today AND the six prior offsets
> > do_weekly_work = DummyOperator()
> > offset_operators >> do_weekly_work
> >
> > IOW, every day you wait for that day's data to be available, and then run
> > the daily job; you also wait for the previous six days data to be
> > available, and when it is, run the weekly job.
> >
> > n.b. - if you do it this way you will have up to 7 tasks polling the
> "same"
> > data point, which is slightly wasteful. But it's also not much code or
> > mental effort to write it this way.
> >
> > On Wed, Aug 8, 2018 at 2:44 PM Gabriel Silk <gsilk@dropbox.com.invalid<
> > mailto:gsilk@dropbox.com.invalid>>
> > wrote:
> >
> > My main concern is how to express the fact that the weekly rollup depends
> > on the previous 7 days worth of data, and ensure that it does not run
> until
> > the tasks that generate those 7 days of data have run, assuming that
> tasks
> > can run non-sequentially.
> >
> > It's easy enough when you have the following situation:
> >
> > (daily log ingestion) <-- (daily rollup)
> >
> > In any given DAG run, you are guaranteed to have the data needed for
> (daily
> > rollup), because the dependency that generated its data just ran.
> >
> > But I'm not sure how best to model it when you have all of the following:
> >
> > (daily log ingestion) <-- (daily rollup)
> > (daily log ingestion) <-- (weekly rollup)
> > (daily log ingestion) <-- (monthly rollup)
> >
> >
> >
> > On Wed, Aug 8, 2018 at 11:29 AM, Taylor Edmiston <tedmiston@gmail.com
> > <ma...@gmail.com>>
> > wrote:
> >
> > Gabriel -
> >
> > Ah, I missed your earlier comment about weekly/monthly rollups also being
> > on a daily cadence.  So is your concern e.g., more about reducing the
> > redundant process of the weekly rollup tasks for the days of that range
> > that already processed in the previous DAG run(s)?  Or mainly about the
> > dependency of not executing the first weekly at all until the first 7
> > daily
> > rollups worth of data have built up?
> >
> > *Taylor Edmiston*
> > Blog <https://blog.tedmiston.com/> | CV
> > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > <https://angel.co/taylor> | Stack Overflow
> > <https://stackoverflow.com/users/149428/taylor-edmiston>
> >
> >
> > On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jmeickle@quantopian.com<
> > mailto:jmeickle@quantopian.com>.
> > invalid> wrote:
> >
> > If you want to run (daily, rolling weekly, rolling monthly) backups on
> > a
> > daily basis, and they're mostly the same but have some additional
> > dependencies, you can write a DAG factory method, which you call three
> > times. Certain nodes only get added to the longer-than-daily backups.
> >
> > On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gsilk@dropbox.com.invalid<
> > mailto:gsilk@dropbox.com.invalid>
> >
> > wrote:
> >
> > Thanks Andy and Taylor for the suggestions --
> >
> > I see how that would work for the case where you want a weekly rollup
> > that
> > runs on a weekly cadence.
> >
> > But what about a rolling weekly or monthly rollup that runs each day?
> >
> > On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper <
> > andy.cooper@astronomer.io<ma...@astronomer.io>>
> > wrote:
> >
> > To expand on Taylor's idea
> >
> > I recently wrote a ScheduleBlackoutSensor that would allow you to
> > prevent a
> > task from running if it meets the criteria provided. It accepts an
> > array
> > of
> > args for any number of the criteria so you could leverage this
> > sensor
> > to
> > provide "blackout" runs for a range of days of the week.
> >
> > https://github.com/apache/incubator-airflow/pull/3702/files
> >
> > For example,
> >
> > task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)
> >
> > Would prevent a task from running Monday - Saturday, allowing it to
> > run
> > on
> > Sunday.
> >
> > You could leverage this Sensor as you would any other sensor or you
> > could
> > invert the logic so that you would only need to specify
> >
> > task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)
> >
> > To "whitelist" a task to run on Sundays.
> >
> >
> > Let me know if you have any questions
> >
> > On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston <
> > tedmiston@gmail.com<ma...@gmail.com>>
> > wrote:
> >
> > Gabriel -
> >
> > One approach I've seen for a similar use case is to have multiple
> > related
> > rollups in one DAG that runs daily, then have the non-daily tasks
> > skip
> > most
> > of the time (e.g., weekly only actually executes on Sundays and
> > is
> > parameterized to look at the last 7 days).
> >
> > You could implement that not running part a few ways, but one
> > idea
> > is a
> > sensor in front of the weekly rollup task.  Imagine a
> > SundaySensor
> > like
> > return
> > execution_date.weekday() == 6.  One thing to keep in mind here is
> > dependence on the DAG's cron schedule being more granular than
> > the
> > tasks.
> >
> > I think this could generalize into a DayOfWeekSensor /
> > DayOfMonthSensor
> > that would be nice to have.
> >
> > Of course this does mean some scheduler inefficiency on the skip
> > days,
> > but
> > as long as those skips are fast and the overall number of tasks
> > is
> > small, I
> > can accept that.
> >
> > *Taylor Edmiston*
> > Blog <https://blog.tedmiston.com/> | CV
> > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > <https://angel.co/taylor> | Stack Overflow
> > <https://stackoverflow.com/users/149428/taylor-edmiston>
> >
> >
> > On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk
> > <gs...@dropbox.com.invalid>
> >
> > wrote:
> >
> > Hello Airflow community,
> >
> > I have a basic question about how best to model a common data
> > pipeline
> > pattern here at Dropbox.
> >
> > At Dropbox, all of our logs are ingested and written into Hive
> > in
> > hourly
> > and/or daily rollups. On top of this data we build many weekly
> > and
> > monthly
> > rollups, which typically run on a daily cadence and compute
> > results
> > over
> > a
> > rolling window.
> >
> > If we have a metric X, it seems natural to put the daily,
> > weekly,
> > and
> > monthly rollups for metric X all in the same DAG.
> >
> > However, the different rollups have different dependency
> > structures.
> > The
> > daily job only depends on a single day partition, whereas the
> > weekly
> > job
> > depends on 7, the monthly on 28.
> >
> > In Airflow, it seems the two paradigms for modeling
> > dependencies
> > are:
> > 1) Depend on a *single run of a task* within the same DAG
> > 2) Depend on *multiple runs of task* by using an
> > ExternalTaskSensor
> >
> > I'm not sure how I could possibly model this scenario using
> > approach
> > #1,
> > and I'm not sure approach #2 is the most elegant or performant
> > way
> > to
> > model
> > this scenario.
> >
> > Any thoughts or suggestions?
> >
> >
> >
> >
> >
> >
> >
> >
>

Re: Basic modeling question

Posted by Alexis Rolland <al...@ubisoft.com>.
Actually no it would not be a sensor, you can just add a trigger_rule attribute to a task so that it executes only if it’s direct parent succeeds/fails/completes...

You might want to check this:
https://stackoverflow.com/questions/43678408/how-to-create-a-conditional-task-in-airflow

It is a different use case than yours but it shows how to use trigger rules.

Cheers
Alexis

On 9 Aug 2018, at 07:27, Gabriel Silk <gs...@dropbox.com.INVALID>> wrote:

Alexis, do you mean you would have done this using an ExternalTaskSensor?
Or is there some other way to depend on a range of tasks?

On Wed, Aug 8, 2018 at 3:35 PM, Alexis Rolland <al...@ubisoft.com>>
wrote:

Not sure if it’s optimal compared to what James proposes, but I would have
simply made the weekly and monthly rollup tasks as downstream tasks of the
daily log ingestion tasks they depend on. Then I would have used trigger
rules ‘all_done’ to ensure those rollup tasks start when their parent tasks
are completed.

https://airflow.incubator.apache.org/concepts.html#trigger-rules

(daily log ingestion) > (daily rollup)
(daily log ingestion) > (weekly rollup + TriggerRule.all_done)
(daily log ingestion) > (monthly rollup + TriggerRule.all_done)

Cheers

Alexis

On 9 Aug 2018, at 02:57, James Meickle <jm...@quantopian.com>.
INVALID<ma...@quantopian.com.INVALID>> wrote:

It sounds like you want something like this?

root_operator = DummyOperator()

def offset_operator(i):
my_sql_query = "SELECT * FROM {{{{ds_add(execution_date, {offset})
}}}};".format(offset=i)
sql_operator = SQLOperator(task_id="offset_by_{}".format(i)",
query=my_sql_query)
return sql_operator

offset_operators = list(offset_operator(i) for i in range(7))
root_operator >> offset_operators

# Daily just waits on today, no offset
do_daily_work = DummyOperator()
offset_operators[0] >> do_daily_work

# Weekly waits on today AND the six prior offsets
do_weekly_work = DummyOperator()
offset_operators >> do_weekly_work

IOW, every day you wait for that day's data to be available, and then run
the daily job; you also wait for the previous six days data to be
available, and when it is, run the weekly job.

n.b. - if you do it this way you will have up to 7 tasks polling the "same"
data point, which is slightly wasteful. But it's also not much code or
mental effort to write it this way.

On Wed, Aug 8, 2018 at 2:44 PM Gabriel Silk <gs...@dropbox.com.invalid><
mailto:gsilk@dropbox.com.invalid>>
wrote:

My main concern is how to express the fact that the weekly rollup depends
on the previous 7 days worth of data, and ensure that it does not run until
the tasks that generate those 7 days of data have run, assuming that tasks
can run non-sequentially.

It's easy enough when you have the following situation:

(daily log ingestion) <-- (daily rollup)

In any given DAG run, you are guaranteed to have the data needed for (daily
rollup), because the dependency that generated its data just ran.

But I'm not sure how best to model it when you have all of the following:

(daily log ingestion) <-- (daily rollup)
(daily log ingestion) <-- (weekly rollup)
(daily log ingestion) <-- (monthly rollup)



On Wed, Aug 8, 2018 at 11:29 AM, Taylor Edmiston <te...@gmail.com>
<ma...@gmail.com>>
wrote:

Gabriel -

Ah, I missed your earlier comment about weekly/monthly rollups also being
on a daily cadence.  So is your concern e.g., more about reducing the
redundant process of the weekly rollup tasks for the days of that range
that already processed in the previous DAG run(s)?  Or mainly about the
dependency of not executing the first weekly at all until the first 7
daily
rollups worth of data have built up?

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>


On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jm...@quantopian.com><
mailto:jmeickle@quantopian.com>.
invalid> wrote:

If you want to run (daily, rolling weekly, rolling monthly) backups on
a
daily basis, and they're mostly the same but have some additional
dependencies, you can write a DAG factory method, which you call three
times. Certain nodes only get added to the longer-than-daily backups.

On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gs...@dropbox.com.invalid><
mailto:gsilk@dropbox.com.invalid>

wrote:

Thanks Andy and Taylor for the suggestions --

I see how that would work for the case where you want a weekly rollup
that
runs on a weekly cadence.

But what about a rolling weekly or monthly rollup that runs each day?

On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper <
andy.cooper@astronomer.io<ma...@astronomer.io>>
wrote:

To expand on Taylor's idea

I recently wrote a ScheduleBlackoutSensor that would allow you to
prevent a
task from running if it meets the criteria provided. It accepts an
array
of
args for any number of the criteria so you could leverage this
sensor
to
provide "blackout" runs for a range of days of the week.

https://github.com/apache/incubator-airflow/pull/3702/files

For example,

task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)

Would prevent a task from running Monday - Saturday, allowing it to
run
on
Sunday.

You could leverage this Sensor as you would any other sensor or you
could
invert the logic so that you would only need to specify

task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)

To "whitelist" a task to run on Sundays.


Let me know if you have any questions

On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston <
tedmiston@gmail.com<ma...@gmail.com>>
wrote:

Gabriel -

One approach I've seen for a similar use case is to have multiple
related
rollups in one DAG that runs daily, then have the non-daily tasks
skip
most
of the time (e.g., weekly only actually executes on Sundays and
is
parameterized to look at the last 7 days).

You could implement that not running part a few ways, but one
idea
is a
sensor in front of the weekly rollup task.  Imagine a
SundaySensor
like
return
execution_date.weekday() == 6.  One thing to keep in mind here is
dependence on the DAG's cron schedule being more granular than
the
tasks.

I think this could generalize into a DayOfWeekSensor /
DayOfMonthSensor
that would be nice to have.

Of course this does mean some scheduler inefficiency on the skip
days,
but
as long as those skips are fast and the overall number of tasks
is
small, I
can accept that.

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>


On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk
<gs...@dropbox.com.invalid>

wrote:

Hello Airflow community,

I have a basic question about how best to model a common data
pipeline
pattern here at Dropbox.

At Dropbox, all of our logs are ingested and written into Hive
in
hourly
and/or daily rollups. On top of this data we build many weekly
and
monthly
rollups, which typically run on a daily cadence and compute
results
over
a
rolling window.

If we have a metric X, it seems natural to put the daily,
weekly,
and
monthly rollups for metric X all in the same DAG.

However, the different rollups have different dependency
structures.
The
daily job only depends on a single day partition, whereas the
weekly
job
depends on 7, the monthly on 28.

In Airflow, it seems the two paradigms for modeling
dependencies
are:
1) Depend on a *single run of a task* within the same DAG
2) Depend on *multiple runs of task* by using an
ExternalTaskSensor

I'm not sure how I could possibly model this scenario using
approach
#1,
and I'm not sure approach #2 is the most elegant or performant
way
to
model
this scenario.

Any thoughts or suggestions?









Re: Basic modeling question

Posted by Gabriel Silk <gs...@dropbox.com.INVALID>.
Alexis, do you mean you would have done this using an ExternalTaskSensor?
Or is there some other way to depend on a range of tasks?

On Wed, Aug 8, 2018 at 3:35 PM, Alexis Rolland <al...@ubisoft.com>
wrote:

> Not sure if it’s optimal compared to what James proposes, but I would have
> simply made the weekly and monthly rollup tasks as downstream tasks of the
> daily log ingestion tasks they depend on. Then I would have used trigger
> rules ‘all_done’ to ensure those rollup tasks start when their parent tasks
> are completed.
>
> https://airflow.incubator.apache.org/concepts.html#trigger-rules
>
> (daily log ingestion) > (daily rollup)
> (daily log ingestion) > (weekly rollup + TriggerRule.all_done)
> (daily log ingestion) > (monthly rollup + TriggerRule.all_done)
>
> Cheers
>
> Alexis
>
> On 9 Aug 2018, at 02:57, James Meickle <jmeickle@quantopian.com.
> INVALID<ma...@quantopian.com.INVALID>> wrote:
>
> It sounds like you want something like this?
>
> root_operator = DummyOperator()
>
> def offset_operator(i):
>  my_sql_query = "SELECT * FROM {{{{ds_add(execution_date, {offset})
> }}}};".format(offset=i)
>  sql_operator = SQLOperator(task_id="offset_by_{}".format(i)",
> query=my_sql_query)
>  return sql_operator
>
> offset_operators = list(offset_operator(i) for i in range(7))
> root_operator >> offset_operators
>
> # Daily just waits on today, no offset
> do_daily_work = DummyOperator()
> offset_operators[0] >> do_daily_work
>
> # Weekly waits on today AND the six prior offsets
> do_weekly_work = DummyOperator()
> offset_operators >> do_weekly_work
>
> IOW, every day you wait for that day's data to be available, and then run
> the daily job; you also wait for the previous six days data to be
> available, and when it is, run the weekly job.
>
> n.b. - if you do it this way you will have up to 7 tasks polling the "same"
> data point, which is slightly wasteful. But it's also not much code or
> mental effort to write it this way.
>
> On Wed, Aug 8, 2018 at 2:44 PM Gabriel Silk <gsilk@dropbox.com.invalid<
> mailto:gsilk@dropbox.com.invalid>>
> wrote:
>
> My main concern is how to express the fact that the weekly rollup depends
> on the previous 7 days worth of data, and ensure that it does not run until
> the tasks that generate those 7 days of data have run, assuming that tasks
> can run non-sequentially.
>
> It's easy enough when you have the following situation:
>
> (daily log ingestion) <-- (daily rollup)
>
> In any given DAG run, you are guaranteed to have the data needed for (daily
> rollup), because the dependency that generated its data just ran.
>
> But I'm not sure how best to model it when you have all of the following:
>
> (daily log ingestion) <-- (daily rollup)
> (daily log ingestion) <-- (weekly rollup)
> (daily log ingestion) <-- (monthly rollup)
>
>
>
> On Wed, Aug 8, 2018 at 11:29 AM, Taylor Edmiston <tedmiston@gmail.com
> <ma...@gmail.com>>
> wrote:
>
> Gabriel -
>
> Ah, I missed your earlier comment about weekly/monthly rollups also being
> on a daily cadence.  So is your concern e.g., more about reducing the
> redundant process of the weekly rollup tasks for the days of that range
> that already processed in the previous DAG run(s)?  Or mainly about the
> dependency of not executing the first weekly at all until the first 7
> daily
> rollups worth of data have built up?
>
> *Taylor Edmiston*
> Blog <https://blog.tedmiston.com/> | CV
> <https://stackoverflow.com/cv/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor> | Stack Overflow
> <https://stackoverflow.com/users/149428/taylor-edmiston>
>
>
> On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jmeickle@quantopian.com<
> mailto:jmeickle@quantopian.com>.
> invalid> wrote:
>
> If you want to run (daily, rolling weekly, rolling monthly) backups on
> a
> daily basis, and they're mostly the same but have some additional
> dependencies, you can write a DAG factory method, which you call three
> times. Certain nodes only get added to the longer-than-daily backups.
>
> On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gsilk@dropbox.com.invalid<
> mailto:gsilk@dropbox.com.invalid>
>
> wrote:
>
> Thanks Andy and Taylor for the suggestions --
>
> I see how that would work for the case where you want a weekly rollup
> that
> runs on a weekly cadence.
>
> But what about a rolling weekly or monthly rollup that runs each day?
>
> On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper <
> andy.cooper@astronomer.io<ma...@astronomer.io>>
> wrote:
>
> To expand on Taylor's idea
>
> I recently wrote a ScheduleBlackoutSensor that would allow you to
> prevent a
> task from running if it meets the criteria provided. It accepts an
> array
> of
> args for any number of the criteria so you could leverage this
> sensor
> to
> provide "blackout" runs for a range of days of the week.
>
> https://github.com/apache/incubator-airflow/pull/3702/files
>
> For example,
>
> task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)
>
> Would prevent a task from running Monday - Saturday, allowing it to
> run
> on
> Sunday.
>
> You could leverage this Sensor as you would any other sensor or you
> could
> invert the logic so that you would only need to specify
>
> task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)
>
> To "whitelist" a task to run on Sundays.
>
>
> Let me know if you have any questions
>
> On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston <
> tedmiston@gmail.com<ma...@gmail.com>>
> wrote:
>
> Gabriel -
>
> One approach I've seen for a similar use case is to have multiple
> related
> rollups in one DAG that runs daily, then have the non-daily tasks
> skip
> most
> of the time (e.g., weekly only actually executes on Sundays and
> is
> parameterized to look at the last 7 days).
>
> You could implement that not running part a few ways, but one
> idea
> is a
> sensor in front of the weekly rollup task.  Imagine a
> SundaySensor
> like
> return
> execution_date.weekday() == 6.  One thing to keep in mind here is
> dependence on the DAG's cron schedule being more granular than
> the
> tasks.
>
> I think this could generalize into a DayOfWeekSensor /
> DayOfMonthSensor
> that would be nice to have.
>
> Of course this does mean some scheduler inefficiency on the skip
> days,
> but
> as long as those skips are fast and the overall number of tasks
> is
> small, I
> can accept that.
>
> *Taylor Edmiston*
> Blog <https://blog.tedmiston.com/> | CV
> <https://stackoverflow.com/cv/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor> | Stack Overflow
> <https://stackoverflow.com/users/149428/taylor-edmiston>
>
>
> On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk
> <gs...@dropbox.com.invalid>
>
> wrote:
>
> Hello Airflow community,
>
> I have a basic question about how best to model a common data
> pipeline
> pattern here at Dropbox.
>
> At Dropbox, all of our logs are ingested and written into Hive
> in
> hourly
> and/or daily rollups. On top of this data we build many weekly
> and
> monthly
> rollups, which typically run on a daily cadence and compute
> results
> over
> a
> rolling window.
>
> If we have a metric X, it seems natural to put the daily,
> weekly,
> and
> monthly rollups for metric X all in the same DAG.
>
> However, the different rollups have different dependency
> structures.
> The
> daily job only depends on a single day partition, whereas the
> weekly
> job
> depends on 7, the monthly on 28.
>
> In Airflow, it seems the two paradigms for modeling
> dependencies
> are:
> 1) Depend on a *single run of a task* within the same DAG
> 2) Depend on *multiple runs of task* by using an
> ExternalTaskSensor
>
> I'm not sure how I could possibly model this scenario using
> approach
> #1,
> and I'm not sure approach #2 is the most elegant or performant
> way
> to
> model
> this scenario.
>
> Any thoughts or suggestions?
>
>
>
>
>
>
>
>

Re: Basic modeling question

Posted by Alexis Rolland <al...@ubisoft.com>.
Not sure if it’s optimal compared to what James proposes, but I would have simply made the weekly and monthly rollup tasks as downstream tasks of the daily log ingestion tasks they depend on. Then I would have used trigger rules ‘all_done’ to ensure those rollup tasks start when their parent tasks are completed.

https://airflow.incubator.apache.org/concepts.html#trigger-rules

(daily log ingestion) > (daily rollup)
(daily log ingestion) > (weekly rollup + TriggerRule.all_done)
(daily log ingestion) > (monthly rollup + TriggerRule.all_done)

Cheers

Alexis

On 9 Aug 2018, at 02:57, James Meickle <jm...@quantopian.com.INVALID>> wrote:

It sounds like you want something like this?

root_operator = DummyOperator()

def offset_operator(i):
 my_sql_query = "SELECT * FROM {{{{ds_add(execution_date, {offset})
}}}};".format(offset=i)
 sql_operator = SQLOperator(task_id="offset_by_{}".format(i)",
query=my_sql_query)
 return sql_operator

offset_operators = list(offset_operator(i) for i in range(7))
root_operator >> offset_operators

# Daily just waits on today, no offset
do_daily_work = DummyOperator()
offset_operators[0] >> do_daily_work

# Weekly waits on today AND the six prior offsets
do_weekly_work = DummyOperator()
offset_operators >> do_weekly_work

IOW, every day you wait for that day's data to be available, and then run
the daily job; you also wait for the previous six days data to be
available, and when it is, run the weekly job.

n.b. - if you do it this way you will have up to 7 tasks polling the "same"
data point, which is slightly wasteful. But it's also not much code or
mental effort to write it this way.

On Wed, Aug 8, 2018 at 2:44 PM Gabriel Silk <gs...@dropbox.com.invalid>>
wrote:

My main concern is how to express the fact that the weekly rollup depends
on the previous 7 days worth of data, and ensure that it does not run until
the tasks that generate those 7 days of data have run, assuming that tasks
can run non-sequentially.

It's easy enough when you have the following situation:

(daily log ingestion) <-- (daily rollup)

In any given DAG run, you are guaranteed to have the data needed for (daily
rollup), because the dependency that generated its data just ran.

But I'm not sure how best to model it when you have all of the following:

(daily log ingestion) <-- (daily rollup)
(daily log ingestion) <-- (weekly rollup)
(daily log ingestion) <-- (monthly rollup)



On Wed, Aug 8, 2018 at 11:29 AM, Taylor Edmiston <te...@gmail.com>>
wrote:

Gabriel -

Ah, I missed your earlier comment about weekly/monthly rollups also being
on a daily cadence.  So is your concern e.g., more about reducing the
redundant process of the weekly rollup tasks for the days of that range
that already processed in the previous DAG run(s)?  Or mainly about the
dependency of not executing the first weekly at all until the first 7
daily
rollups worth of data have built up?

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>


On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jm...@quantopian.com>.
invalid> wrote:

If you want to run (daily, rolling weekly, rolling monthly) backups on
a
daily basis, and they're mostly the same but have some additional
dependencies, you can write a DAG factory method, which you call three
times. Certain nodes only get added to the longer-than-daily backups.

On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gs...@dropbox.com.invalid>

wrote:

Thanks Andy and Taylor for the suggestions --

I see how that would work for the case where you want a weekly rollup
that
runs on a weekly cadence.

But what about a rolling weekly or monthly rollup that runs each day?

On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper <
andy.cooper@astronomer.io<ma...@astronomer.io>>
wrote:

To expand on Taylor's idea

I recently wrote a ScheduleBlackoutSensor that would allow you to
prevent a
task from running if it meets the criteria provided. It accepts an
array
of
args for any number of the criteria so you could leverage this
sensor
to
provide "blackout" runs for a range of days of the week.

https://github.com/apache/incubator-airflow/pull/3702/files

For example,

task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)

Would prevent a task from running Monday - Saturday, allowing it to
run
on
Sunday.

You could leverage this Sensor as you would any other sensor or you
could
invert the logic so that you would only need to specify

task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)

To "whitelist" a task to run on Sundays.


Let me know if you have any questions

On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston <
tedmiston@gmail.com<ma...@gmail.com>>
wrote:

Gabriel -

One approach I've seen for a similar use case is to have multiple
related
rollups in one DAG that runs daily, then have the non-daily tasks
skip
most
of the time (e.g., weekly only actually executes on Sundays and
is
parameterized to look at the last 7 days).

You could implement that not running part a few ways, but one
idea
is a
sensor in front of the weekly rollup task.  Imagine a
SundaySensor
like
return
execution_date.weekday() == 6.  One thing to keep in mind here is
dependence on the DAG's cron schedule being more granular than
the
tasks.

I think this could generalize into a DayOfWeekSensor /
DayOfMonthSensor
that would be nice to have.

Of course this does mean some scheduler inefficiency on the skip
days,
but
as long as those skips are fast and the overall number of tasks
is
small, I
can accept that.

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>


On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk
<gs...@dropbox.com.invalid>

wrote:

Hello Airflow community,

I have a basic question about how best to model a common data
pipeline
pattern here at Dropbox.

At Dropbox, all of our logs are ingested and written into Hive
in
hourly
and/or daily rollups. On top of this data we build many weekly
and
monthly
rollups, which typically run on a daily cadence and compute
results
over
a
rolling window.

If we have a metric X, it seems natural to put the daily,
weekly,
and
monthly rollups for metric X all in the same DAG.

However, the different rollups have different dependency
structures.
The
daily job only depends on a single day partition, whereas the
weekly
job
depends on 7, the monthly on 28.

In Airflow, it seems the two paradigms for modeling
dependencies
are:
1) Depend on a *single run of a task* within the same DAG
2) Depend on *multiple runs of task* by using an
ExternalTaskSensor

I'm not sure how I could possibly model this scenario using
approach
#1,
and I'm not sure approach #2 is the most elegant or performant
way
to
model
this scenario.

Any thoughts or suggestions?








Re: Basic modeling question

Posted by James Meickle <jm...@quantopian.com.INVALID>.
It sounds like you want something like this?

root_operator = DummyOperator()

def offset_operator(i):
  my_sql_query = "SELECT * FROM {{{{ds_add(execution_date, {offset})
}}}};".format(offset=i)
  sql_operator = SQLOperator(task_id="offset_by_{}".format(i)",
query=my_sql_query)
  return sql_operator

offset_operators = list(offset_operator(i) for i in range(7))
root_operator >> offset_operators

# Daily just waits on today, no offset
do_daily_work = DummyOperator()
offset_operators[0] >> do_daily_work

# Weekly waits on today AND the six prior offsets
do_weekly_work = DummyOperator()
offset_operators >> do_weekly_work

IOW, every day you wait for that day's data to be available, and then run
the daily job; you also wait for the previous six days data to be
available, and when it is, run the weekly job.

n.b. - if you do it this way you will have up to 7 tasks polling the "same"
data point, which is slightly wasteful. But it's also not much code or
mental effort to write it this way.

On Wed, Aug 8, 2018 at 2:44 PM Gabriel Silk <gs...@dropbox.com.invalid>
wrote:

> My main concern is how to express the fact that the weekly rollup depends
> on the previous 7 days worth of data, and ensure that it does not run until
> the tasks that generate those 7 days of data have run, assuming that tasks
> can run non-sequentially.
>
> It's easy enough when you have the following situation:
>
> (daily log ingestion) <-- (daily rollup)
>
> In any given DAG run, you are guaranteed to have the data needed for (daily
> rollup), because the dependency that generated its data just ran.
>
> But I'm not sure how best to model it when you have all of the following:
>
> (daily log ingestion) <-- (daily rollup)
> (daily log ingestion) <-- (weekly rollup)
> (daily log ingestion) <-- (monthly rollup)
>
>
>
> On Wed, Aug 8, 2018 at 11:29 AM, Taylor Edmiston <te...@gmail.com>
> wrote:
>
> > Gabriel -
> >
> > Ah, I missed your earlier comment about weekly/monthly rollups also being
> > on a daily cadence.  So is your concern e.g., more about reducing the
> > redundant process of the weekly rollup tasks for the days of that range
> > that already processed in the previous DAG run(s)?  Or mainly about the
> > dependency of not executing the first weekly at all until the first 7
> daily
> > rollups worth of data have built up?
> >
> > *Taylor Edmiston*
> > Blog <https://blog.tedmiston.com/> | CV
> > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > <https://angel.co/taylor> | Stack Overflow
> > <https://stackoverflow.com/users/149428/taylor-edmiston>
> >
> >
> > On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jmeickle@quantopian.com.
> > invalid> wrote:
> >
> > > If you want to run (daily, rolling weekly, rolling monthly) backups on
> a
> > > daily basis, and they're mostly the same but have some additional
> > > dependencies, you can write a DAG factory method, which you call three
> > > times. Certain nodes only get added to the longer-than-daily backups.
> > >
> > > On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gsilk@dropbox.com.invalid
> >
> > > wrote:
> > >
> > > > Thanks Andy and Taylor for the suggestions --
> > > >
> > > > I see how that would work for the case where you want a weekly rollup
> > > that
> > > > runs on a weekly cadence.
> > > >
> > > > But what about a rolling weekly or monthly rollup that runs each day?
> > > >
> > > > On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper <
> > andy.cooper@astronomer.io>
> > > > wrote:
> > > >
> > > > > To expand on Taylor's idea
> > > > >
> > > > > I recently wrote a ScheduleBlackoutSensor that would allow you to
> > > > prevent a
> > > > > task from running if it meets the criteria provided. It accepts an
> > > array
> > > > of
> > > > > args for any number of the criteria so you could leverage this
> sensor
> > > to
> > > > > provide "blackout" runs for a range of days of the week.
> > > > >
> > > > > https://github.com/apache/incubator-airflow/pull/3702/files
> > > > >
> > > > > For example,
> > > > >
> > > > > task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)
> > > > >
> > > > > Would prevent a task from running Monday - Saturday, allowing it to
> > run
> > > > on
> > > > > Sunday.
> > > > >
> > > > > You could leverage this Sensor as you would any other sensor or you
> > > could
> > > > > invert the logic so that you would only need to specify
> > > > >
> > > > > task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)
> > > > >
> > > > > To "whitelist" a task to run on Sundays.
> > > > >
> > > > >
> > > > > Let me know if you have any questions
> > > > >
> > > > > On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston <
> tedmiston@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Gabriel -
> > > > > >
> > > > > > One approach I've seen for a similar use case is to have multiple
> > > > related
> > > > > > rollups in one DAG that runs daily, then have the non-daily tasks
> > > skip
> > > > > most
> > > > > > of the time (e.g., weekly only actually executes on Sundays and
> is
> > > > > > parameterized to look at the last 7 days).
> > > > > >
> > > > > > You could implement that not running part a few ways, but one
> idea
> > > is a
> > > > > > sensor in front of the weekly rollup task.  Imagine a
> SundaySensor
> > > like
> > > > > > return
> > > > > > execution_date.weekday() == 6.  One thing to keep in mind here is
> > > > > > dependence on the DAG's cron schedule being more granular than
> the
> > > > tasks.
> > > > > >
> > > > > > I think this could generalize into a DayOfWeekSensor /
> > > DayOfMonthSensor
> > > > > > that would be nice to have.
> > > > > >
> > > > > > Of course this does mean some scheduler inefficiency on the skip
> > > days,
> > > > > but
> > > > > > as long as those skips are fast and the overall number of tasks
> is
> > > > > small, I
> > > > > > can accept that.
> > > > > >
> > > > > > *Taylor Edmiston*
> > > > > > Blog <https://blog.tedmiston.com/> | CV
> > > > > > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > > > > > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > > > > <https://angel.co/taylor> | Stack Overflow
> > > > > > <https://stackoverflow.com/users/149428/taylor-edmiston>
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk
> > > <gsilk@dropbox.com.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hello Airflow community,
> > > > > > >
> > > > > > > I have a basic question about how best to model a common data
> > > > pipeline
> > > > > > > pattern here at Dropbox.
> > > > > > >
> > > > > > > At Dropbox, all of our logs are ingested and written into Hive
> in
> > > > > hourly
> > > > > > > and/or daily rollups. On top of this data we build many weekly
> > and
> > > > > > monthly
> > > > > > > rollups, which typically run on a daily cadence and compute
> > results
> > > > > over
> > > > > > a
> > > > > > > rolling window.
> > > > > > >
> > > > > > > If we have a metric X, it seems natural to put the daily,
> weekly,
> > > and
> > > > > > > monthly rollups for metric X all in the same DAG.
> > > > > > >
> > > > > > > However, the different rollups have different dependency
> > > structures.
> > > > > The
> > > > > > > daily job only depends on a single day partition, whereas the
> > > weekly
> > > > > job
> > > > > > > depends on 7, the monthly on 28.
> > > > > > >
> > > > > > > In Airflow, it seems the two paradigms for modeling
> dependencies
> > > are:
> > > > > > > 1) Depend on a *single run of a task* within the same DAG
> > > > > > > 2) Depend on *multiple runs of task* by using an
> > ExternalTaskSensor
> > > > > > >
> > > > > > > I'm not sure how I could possibly model this scenario using
> > > approach
> > > > > #1,
> > > > > > > and I'm not sure approach #2 is the most elegant or performant
> > way
> > > to
> > > > > > model
> > > > > > > this scenario.
> > > > > > >
> > > > > > > Any thoughts or suggestions?
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Basic modeling question

Posted by Gabriel Silk <gs...@dropbox.com.INVALID>.
My main concern is how to express the fact that the weekly rollup depends
on the previous 7 days worth of data, and ensure that it does not run until
the tasks that generate those 7 days of data have run, assuming that tasks
can run non-sequentially.

It's easy enough when you have the following situation:

(daily log ingestion) <-- (daily rollup)

In any given DAG run, you are guaranteed to have the data needed for (daily
rollup), because the dependency that generated its data just ran.

But I'm not sure how best to model it when you have all of the following:

(daily log ingestion) <-- (daily rollup)
(daily log ingestion) <-- (weekly rollup)
(daily log ingestion) <-- (monthly rollup)



On Wed, Aug 8, 2018 at 11:29 AM, Taylor Edmiston <te...@gmail.com>
wrote:

> Gabriel -
>
> Ah, I missed your earlier comment about weekly/monthly rollups also being
> on a daily cadence.  So is your concern e.g., more about reducing the
> redundant process of the weekly rollup tasks for the days of that range
> that already processed in the previous DAG run(s)?  Or mainly about the
> dependency of not executing the first weekly at all until the first 7 daily
> rollups worth of data have built up?
>
> *Taylor Edmiston*
> Blog <https://blog.tedmiston.com/> | CV
> <https://stackoverflow.com/cv/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor> | Stack Overflow
> <https://stackoverflow.com/users/149428/taylor-edmiston>
>
>
> On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jmeickle@quantopian.com.
> invalid> wrote:
>
> > If you want to run (daily, rolling weekly, rolling monthly) backups on a
> > daily basis, and they're mostly the same but have some additional
> > dependencies, you can write a DAG factory method, which you call three
> > times. Certain nodes only get added to the longer-than-daily backups.
> >
> > On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gs...@dropbox.com.invalid>
> > wrote:
> >
> > > Thanks Andy and Taylor for the suggestions --
> > >
> > > I see how that would work for the case where you want a weekly rollup
> > that
> > > runs on a weekly cadence.
> > >
> > > But what about a rolling weekly or monthly rollup that runs each day?
> > >
> > > On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper <
> andy.cooper@astronomer.io>
> > > wrote:
> > >
> > > > To expand on Taylor's idea
> > > >
> > > > I recently wrote a ScheduleBlackoutSensor that would allow you to
> > > prevent a
> > > > task from running if it meets the criteria provided. It accepts an
> > array
> > > of
> > > > args for any number of the criteria so you could leverage this sensor
> > to
> > > > provide "blackout" runs for a range of days of the week.
> > > >
> > > > https://github.com/apache/incubator-airflow/pull/3702/files
> > > >
> > > > For example,
> > > >
> > > > task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)
> > > >
> > > > Would prevent a task from running Monday - Saturday, allowing it to
> run
> > > on
> > > > Sunday.
> > > >
> > > > You could leverage this Sensor as you would any other sensor or you
> > could
> > > > invert the logic so that you would only need to specify
> > > >
> > > > task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)
> > > >
> > > > To "whitelist" a task to run on Sundays.
> > > >
> > > >
> > > > Let me know if you have any questions
> > > >
> > > > On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston <te...@gmail.com>
> > > > wrote:
> > > >
> > > > > Gabriel -
> > > > >
> > > > > One approach I've seen for a similar use case is to have multiple
> > > related
> > > > > rollups in one DAG that runs daily, then have the non-daily tasks
> > skip
> > > > most
> > > > > of the time (e.g., weekly only actually executes on Sundays and is
> > > > > parameterized to look at the last 7 days).
> > > > >
> > > > > You could implement that not running part a few ways, but one idea
> > is a
> > > > > sensor in front of the weekly rollup task.  Imagine a SundaySensor
> > like
> > > > > return
> > > > > execution_date.weekday() == 6.  One thing to keep in mind here is
> > > > > dependence on the DAG's cron schedule being more granular than the
> > > tasks.
> > > > >
> > > > > I think this could generalize into a DayOfWeekSensor /
> > DayOfMonthSensor
> > > > > that would be nice to have.
> > > > >
> > > > > Of course this does mean some scheduler inefficiency on the skip
> > days,
> > > > but
> > > > > as long as those skips are fast and the overall number of tasks is
> > > > small, I
> > > > > can accept that.
> > > > >
> > > > > *Taylor Edmiston*
> > > > > Blog <https://blog.tedmiston.com/> | CV
> > > > > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > > > > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > > > <https://angel.co/taylor> | Stack Overflow
> > > > > <https://stackoverflow.com/users/149428/taylor-edmiston>
> > > > >
> > > > >
> > > > > On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk
> > <gsilk@dropbox.com.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hello Airflow community,
> > > > > >
> > > > > > I have a basic question about how best to model a common data
> > > pipeline
> > > > > > pattern here at Dropbox.
> > > > > >
> > > > > > At Dropbox, all of our logs are ingested and written into Hive in
> > > > hourly
> > > > > > and/or daily rollups. On top of this data we build many weekly
> and
> > > > > monthly
> > > > > > rollups, which typically run on a daily cadence and compute
> results
> > > > over
> > > > > a
> > > > > > rolling window.
> > > > > >
> > > > > > If we have a metric X, it seems natural to put the daily, weekly,
> > and
> > > > > > monthly rollups for metric X all in the same DAG.
> > > > > >
> > > > > > However, the different rollups have different dependency
> > structures.
> > > > The
> > > > > > daily job only depends on a single day partition, whereas the
> > weekly
> > > > job
> > > > > > depends on 7, the monthly on 28.
> > > > > >
> > > > > > In Airflow, it seems the two paradigms for modeling dependencies
> > are:
> > > > > > 1) Depend on a *single run of a task* within the same DAG
> > > > > > 2) Depend on *multiple runs of task* by using an
> ExternalTaskSensor
> > > > > >
> > > > > > I'm not sure how I could possibly model this scenario using
> > approach
> > > > #1,
> > > > > > and I'm not sure approach #2 is the most elegant or performant
> way
> > to
> > > > > model
> > > > > > this scenario.
> > > > > >
> > > > > > Any thoughts or suggestions?
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Basic modeling question

Posted by Taylor Edmiston <te...@gmail.com>.
Gabriel -

Ah, I missed your earlier comment about weekly/monthly rollups also being
on a daily cadence.  So is your concern e.g., more about reducing the
redundant process of the weekly rollup tasks for the days of that range
that already processed in the previous DAG run(s)?  Or mainly about the
dependency of not executing the first weekly at all until the first 7 daily
rollups worth of data have built up?

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>


On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jmeickle@quantopian.com.
invalid> wrote:

> If you want to run (daily, rolling weekly, rolling monthly) backups on a
> daily basis, and they're mostly the same but have some additional
> dependencies, you can write a DAG factory method, which you call three
> times. Certain nodes only get added to the longer-than-daily backups.
>
> On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gs...@dropbox.com.invalid>
> wrote:
>
> > Thanks Andy and Taylor for the suggestions --
> >
> > I see how that would work for the case where you want a weekly rollup
> that
> > runs on a weekly cadence.
> >
> > But what about a rolling weekly or monthly rollup that runs each day?
> >
> > On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper <an...@astronomer.io>
> > wrote:
> >
> > > To expand on Taylor's idea
> > >
> > > I recently wrote a ScheduleBlackoutSensor that would allow you to
> > prevent a
> > > task from running if it meets the criteria provided. It accepts an
> array
> > of
> > > args for any number of the criteria so you could leverage this sensor
> to
> > > provide "blackout" runs for a range of days of the week.
> > >
> > > https://github.com/apache/incubator-airflow/pull/3702/files
> > >
> > > For example,
> > >
> > > task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)
> > >
> > > Would prevent a task from running Monday - Saturday, allowing it to run
> > on
> > > Sunday.
> > >
> > > You could leverage this Sensor as you would any other sensor or you
> could
> > > invert the logic so that you would only need to specify
> > >
> > > task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)
> > >
> > > To "whitelist" a task to run on Sundays.
> > >
> > >
> > > Let me know if you have any questions
> > >
> > > On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston <te...@gmail.com>
> > > wrote:
> > >
> > > > Gabriel -
> > > >
> > > > One approach I've seen for a similar use case is to have multiple
> > related
> > > > rollups in one DAG that runs daily, then have the non-daily tasks
> skip
> > > most
> > > > of the time (e.g., weekly only actually executes on Sundays and is
> > > > parameterized to look at the last 7 days).
> > > >
> > > > You could implement that not running part a few ways, but one idea
> is a
> > > > sensor in front of the weekly rollup task.  Imagine a SundaySensor
> like
> > > > return
> > > > execution_date.weekday() == 6.  One thing to keep in mind here is
> > > > dependence on the DAG's cron schedule being more granular than the
> > tasks.
> > > >
> > > > I think this could generalize into a DayOfWeekSensor /
> DayOfMonthSensor
> > > > that would be nice to have.
> > > >
> > > > Of course this does mean some scheduler inefficiency on the skip
> days,
> > > but
> > > > as long as those skips are fast and the overall number of tasks is
> > > small, I
> > > > can accept that.
> > > >
> > > > *Taylor Edmiston*
> > > > Blog <https://blog.tedmiston.com/> | CV
> > > > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > > > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > > <https://angel.co/taylor> | Stack Overflow
> > > > <https://stackoverflow.com/users/149428/taylor-edmiston>
> > > >
> > > >
> > > > On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk
> <gsilk@dropbox.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > Hello Airflow community,
> > > > >
> > > > > I have a basic question about how best to model a common data
> > pipeline
> > > > > pattern here at Dropbox.
> > > > >
> > > > > At Dropbox, all of our logs are ingested and written into Hive in
> > > hourly
> > > > > and/or daily rollups. On top of this data we build many weekly and
> > > > monthly
> > > > > rollups, which typically run on a daily cadence and compute results
> > > over
> > > > a
> > > > > rolling window.
> > > > >
> > > > > If we have a metric X, it seems natural to put the daily, weekly,
> and
> > > > > monthly rollups for metric X all in the same DAG.
> > > > >
> > > > > However, the different rollups have different dependency
> structures.
> > > The
> > > > > daily job only depends on a single day partition, whereas the
> weekly
> > > job
> > > > > depends on 7, the monthly on 28.
> > > > >
> > > > > In Airflow, it seems the two paradigms for modeling dependencies
> are:
> > > > > 1) Depend on a *single run of a task* within the same DAG
> > > > > 2) Depend on *multiple runs of task* by using an ExternalTaskSensor
> > > > >
> > > > > I'm not sure how I could possibly model this scenario using
> approach
> > > #1,
> > > > > and I'm not sure approach #2 is the most elegant or performant way
> to
> > > > model
> > > > > this scenario.
> > > > >
> > > > > Any thoughts or suggestions?
> > > > >
> > > >
> > >
> >
>

Re: Basic modeling question

Posted by James Meickle <jm...@quantopian.com.INVALID>.
If you want to run (daily, rolling weekly, rolling monthly) backups on a
daily basis, and they're mostly the same but have some additional
dependencies, you can write a DAG factory method, which you call three
times. Certain nodes only get added to the longer-than-daily backups.

On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gs...@dropbox.com.invalid>
wrote:

> Thanks Andy and Taylor for the suggestions --
>
> I see how that would work for the case where you want a weekly rollup that
> runs on a weekly cadence.
>
> But what about a rolling weekly or monthly rollup that runs each day?
>
> On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper <an...@astronomer.io>
> wrote:
>
> > To expand on Taylor's idea
> >
> > I recently wrote a ScheduleBlackoutSensor that would allow you to
> prevent a
> > task from running if it meets the criteria provided. It accepts an array
> of
> > args for any number of the criteria so you could leverage this sensor to
> > provide "blackout" runs for a range of days of the week.
> >
> > https://github.com/apache/incubator-airflow/pull/3702/files
> >
> > For example,
> >
> > task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)
> >
> > Would prevent a task from running Monday - Saturday, allowing it to run
> on
> > Sunday.
> >
> > You could leverage this Sensor as you would any other sensor or you could
> > invert the logic so that you would only need to specify
> >
> > task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)
> >
> > To "whitelist" a task to run on Sundays.
> >
> >
> > Let me know if you have any questions
> >
> > On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston <te...@gmail.com>
> > wrote:
> >
> > > Gabriel -
> > >
> > > One approach I've seen for a similar use case is to have multiple
> related
> > > rollups in one DAG that runs daily, then have the non-daily tasks skip
> > most
> > > of the time (e.g., weekly only actually executes on Sundays and is
> > > parameterized to look at the last 7 days).
> > >
> > > You could implement that not running part a few ways, but one idea is a
> > > sensor in front of the weekly rollup task.  Imagine a SundaySensor like
> > > return
> > > execution_date.weekday() == 6.  One thing to keep in mind here is
> > > dependence on the DAG's cron schedule being more granular than the
> tasks.
> > >
> > > I think this could generalize into a DayOfWeekSensor / DayOfMonthSensor
> > > that would be nice to have.
> > >
> > > Of course this does mean some scheduler inefficiency on the skip days,
> > but
> > > as long as those skips are fast and the overall number of tasks is
> > small, I
> > > can accept that.
> > >
> > > *Taylor Edmiston*
> > > Blog <https://blog.tedmiston.com/> | CV
> > > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > <https://angel.co/taylor> | Stack Overflow
> > > <https://stackoverflow.com/users/149428/taylor-edmiston>
> > >
> > >
> > > On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk <gsilk@dropbox.com.invalid
> >
> > > wrote:
> > >
> > > > Hello Airflow community,
> > > >
> > > > I have a basic question about how best to model a common data
> pipeline
> > > > pattern here at Dropbox.
> > > >
> > > > At Dropbox, all of our logs are ingested and written into Hive in
> > hourly
> > > > and/or daily rollups. On top of this data we build many weekly and
> > > monthly
> > > > rollups, which typically run on a daily cadence and compute results
> > over
> > > a
> > > > rolling window.
> > > >
> > > > If we have a metric X, it seems natural to put the daily, weekly, and
> > > > monthly rollups for metric X all in the same DAG.
> > > >
> > > > However, the different rollups have different dependency structures.
> > The
> > > > daily job only depends on a single day partition, whereas the weekly
> > job
> > > > depends on 7, the monthly on 28.
> > > >
> > > > In Airflow, it seems the two paradigms for modeling dependencies are:
> > > > 1) Depend on a *single run of a task* within the same DAG
> > > > 2) Depend on *multiple runs of task* by using an ExternalTaskSensor
> > > >
> > > > I'm not sure how I could possibly model this scenario using approach
> > #1,
> > > > and I'm not sure approach #2 is the most elegant or performant way to
> > > model
> > > > this scenario.
> > > >
> > > > Any thoughts or suggestions?
> > > >
> > >
> >
>

Re: Basic modeling question

Posted by Gabriel Silk <gs...@dropbox.com.INVALID>.
Thanks Andy and Taylor for the suggestions --

I see how that would work for the case where you want a weekly rollup that
runs on a weekly cadence.

But what about a rolling weekly or monthly rollup that runs each day?

On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper <an...@astronomer.io>
wrote:

> To expand on Taylor's idea
>
> I recently wrote a ScheduleBlackoutSensor that would allow you to prevent a
> task from running if it meets the criteria provided. It accepts an array of
> args for any number of the criteria so you could leverage this sensor to
> provide "blackout" runs for a range of days of the week.
>
> https://github.com/apache/incubator-airflow/pull/3702/files
>
> For example,
>
> task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)
>
> Would prevent a task from running Monday - Saturday, allowing it to run on
> Sunday.
>
> You could leverage this Sensor as you would any other sensor or you could
> invert the logic so that you would only need to specify
>
> task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)
>
> To "whitelist" a task to run on Sundays.
>
>
> Let me know if you have any questions
>
> On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston <te...@gmail.com>
> wrote:
>
> > Gabriel -
> >
> > One approach I've seen for a similar use case is to have multiple related
> > rollups in one DAG that runs daily, then have the non-daily tasks skip
> most
> > of the time (e.g., weekly only actually executes on Sundays and is
> > parameterized to look at the last 7 days).
> >
> > You could implement that not running part a few ways, but one idea is a
> > sensor in front of the weekly rollup task.  Imagine a SundaySensor like
> > return
> > execution_date.weekday() == 6.  One thing to keep in mind here is
> > dependence on the DAG's cron schedule being more granular than the tasks.
> >
> > I think this could generalize into a DayOfWeekSensor / DayOfMonthSensor
> > that would be nice to have.
> >
> > Of course this does mean some scheduler inefficiency on the skip days,
> but
> > as long as those skips are fast and the overall number of tasks is
> small, I
> > can accept that.
> >
> > *Taylor Edmiston*
> > Blog <https://blog.tedmiston.com/> | CV
> > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > <https://angel.co/taylor> | Stack Overflow
> > <https://stackoverflow.com/users/149428/taylor-edmiston>
> >
> >
> > On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk <gs...@dropbox.com.invalid>
> > wrote:
> >
> > > Hello Airflow community,
> > >
> > > I have a basic question about how best to model a common data pipeline
> > > pattern here at Dropbox.
> > >
> > > At Dropbox, all of our logs are ingested and written into Hive in
> hourly
> > > and/or daily rollups. On top of this data we build many weekly and
> > monthly
> > > rollups, which typically run on a daily cadence and compute results
> over
> > a
> > > rolling window.
> > >
> > > If we have a metric X, it seems natural to put the daily, weekly, and
> > > monthly rollups for metric X all in the same DAG.
> > >
> > > However, the different rollups have different dependency structures.
> The
> > > daily job only depends on a single day partition, whereas the weekly
> job
> > > depends on 7, the monthly on 28.
> > >
> > > In Airflow, it seems the two paradigms for modeling dependencies are:
> > > 1) Depend on a *single run of a task* within the same DAG
> > > 2) Depend on *multiple runs of task* by using an ExternalTaskSensor
> > >
> > > I'm not sure how I could possibly model this scenario using approach
> #1,
> > > and I'm not sure approach #2 is the most elegant or performant way to
> > model
> > > this scenario.
> > >
> > > Any thoughts or suggestions?
> > >
> >
>

Re: Basic modeling question

Posted by Andy Cooper <an...@astronomer.io>.
To expand on Taylor's idea

I recently wrote a ScheduleBlackoutSensor that would allow you to prevent a
task from running if it meets the criteria provided. It accepts an array of
args for any number of the criteria so you could leverage this sensor to
provide "blackout" runs for a range of days of the week.

https://github.com/apache/incubator-airflow/pull/3702/files

For example,

task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)

Would prevent a task from running Monday - Saturday, allowing it to run on
Sunday.

You could leverage this Sensor as you would any other sensor or you could
invert the logic so that you would only need to specify

task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)

To "whitelist" a task to run on Sundays.


Let me know if you have any questions

On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston <te...@gmail.com> wrote:

> Gabriel -
>
> One approach I've seen for a similar use case is to have multiple related
> rollups in one DAG that runs daily, then have the non-daily tasks skip most
> of the time (e.g., weekly only actually executes on Sundays and is
> parameterized to look at the last 7 days).
>
> You could implement that not running part a few ways, but one idea is a
> sensor in front of the weekly rollup task.  Imagine a SundaySensor like
> return
> execution_date.weekday() == 6.  One thing to keep in mind here is
> dependence on the DAG's cron schedule being more granular than the tasks.
>
> I think this could generalize into a DayOfWeekSensor / DayOfMonthSensor
> that would be nice to have.
>
> Of course this does mean some scheduler inefficiency on the skip days, but
> as long as those skips are fast and the overall number of tasks is small, I
> can accept that.
>
> *Taylor Edmiston*
> Blog <https://blog.tedmiston.com/> | CV
> <https://stackoverflow.com/cv/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor> | Stack Overflow
> <https://stackoverflow.com/users/149428/taylor-edmiston>
>
>
> On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk <gs...@dropbox.com.invalid>
> wrote:
>
> > Hello Airflow community,
> >
> > I have a basic question about how best to model a common data pipeline
> > pattern here at Dropbox.
> >
> > At Dropbox, all of our logs are ingested and written into Hive in hourly
> > and/or daily rollups. On top of this data we build many weekly and
> monthly
> > rollups, which typically run on a daily cadence and compute results over
> a
> > rolling window.
> >
> > If we have a metric X, it seems natural to put the daily, weekly, and
> > monthly rollups for metric X all in the same DAG.
> >
> > However, the different rollups have different dependency structures. The
> > daily job only depends on a single day partition, whereas the weekly job
> > depends on 7, the monthly on 28.
> >
> > In Airflow, it seems the two paradigms for modeling dependencies are:
> > 1) Depend on a *single run of a task* within the same DAG
> > 2) Depend on *multiple runs of task* by using an ExternalTaskSensor
> >
> > I'm not sure how I could possibly model this scenario using approach #1,
> > and I'm not sure approach #2 is the most elegant or performant way to
> model
> > this scenario.
> >
> > Any thoughts or suggestions?
> >
>

Re: Basic modeling question

Posted by Taylor Edmiston <te...@gmail.com>.
Gabriel -

One approach I've seen for a similar use case is to have multiple related
rollups in one DAG that runs daily, then have the non-daily tasks skip most
of the time (e.g., weekly only actually executes on Sundays and is
parameterized to look at the last 7 days).

You could implement that not running part a few ways, but one idea is a
sensor in front of the weekly rollup task.  Imagine a SundaySensor like return
execution_date.weekday() == 6.  One thing to keep in mind here is
dependence on the DAG's cron schedule being more granular than the tasks.

I think this could generalize into a DayOfWeekSensor / DayOfMonthSensor
that would be nice to have.

Of course this does mean some scheduler inefficiency on the skip days, but
as long as those skips are fast and the overall number of tasks is small, I
can accept that.

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>


On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk <gs...@dropbox.com.invalid>
wrote:

> Hello Airflow community,
>
> I have a basic question about how best to model a common data pipeline
> pattern here at Dropbox.
>
> At Dropbox, all of our logs are ingested and written into Hive in hourly
> and/or daily rollups. On top of this data we build many weekly and monthly
> rollups, which typically run on a daily cadence and compute results over a
> rolling window.
>
> If we have a metric X, it seems natural to put the daily, weekly, and
> monthly rollups for metric X all in the same DAG.
>
> However, the different rollups have different dependency structures. The
> daily job only depends on a single day partition, whereas the weekly job
> depends on 7, the monthly on 28.
>
> In Airflow, it seems the two paradigms for modeling dependencies are:
> 1) Depend on a *single run of a task* within the same DAG
> 2) Depend on *multiple runs of task* by using an ExternalTaskSensor
>
> I'm not sure how I could possibly model this scenario using approach #1,
> and I'm not sure approach #2 is the most elegant or performant way to model
> this scenario.
>
> Any thoughts or suggestions?
>