You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Chris Palmer <ch...@crpalmer.com> on 2019/05/13 17:54:18 UTC

Tasks that run just once

I'm trying to design a set of DAGs to do a one create and backfill of a set
of tables in BigQuery and then perform periodic loads into those tables. I
can't quite get it to work the way I want to and I'm wondering if other
people have solved similar problems.

The parameters are as follows:

I have a list of customers: cust1, cust2, etc
I have a list of sources: source1, source2, etc
For each pairing of customer and source I want a task that runs hourly to
load new data into a unique table for that pair (loads are done via a
single BigQuery query).

So far that's easy to solve: I have a single DAG and just loop through the
two lists creating a task for each pairing.

However, I also want to create the tables themselves via Airflow and run a
one time backfill for each table. Backfilling hourly is proving to be a
very inefficient process.

Finally, I also want to be able to add new customers and/or new sources to
the lists and have it work. I know I could achieve this with one or more
DAGs per customer/source pair, but I had hoped to avoid that explosion in
the number of DAGs.

The closest I can get is to have two DAGs. The first has a schedule of
'@once' and for each customer/source pair has a CreateTable task and a
downstream BackfillTable task. The second DAG runs '@hourly' and just has a
LoadTable task for each customer/source pair.

This works fine for the initial lists, but once the first DAG has run once,
it's DagRun is marked as 'success'. If I then add a new customer or source,
then the new tasks get in that first task, but the DagRun is never checked
again. If I manually switch the DagRun back to 'running' then it picks up
the new tasks.

Is there some other setup that I'm missing that get's me what I want?

Thanks
Chris

Re: Tasks that run just once

Posted by Daniel Standish <dp...@gmail.com>.
>
> However, I also want to create the tables themselves via Airflow and run a
> one time backfill for each table. Backfilling hourly is proving to be a
> very inefficient process.
>

So, one thing, I have implemented alternative "initial load" behavior
inside the execute method of an operator by checking if there is a previous
task instance. E.g.:

try:
    prev_ti = getattr(context.get('ti'), 'previous_ti')
except AttributeError:
    prev_ti = None
if prev_ti is None:
    self.initial_load = True

I don't know what happens when there are prior dag runs and it's just a
task that has never been executed (as opposed to a brand new dag run).  I
you can see the empty boxes but I think that's just for looks and there are
not really any TIs behind it, probably still works.
But if not, at a minimum you could use xcoms to log and detect with
certainty if something has been run.
Perhaps you can define in your operator an alternate execute method that
will "backfill" your customer / source combo in a reasonable way on the
first run.  I.e. create the tables, and load with the full shebang or batch
it appropriately.








On Mon, May 13, 2019 at 10:54 AM Chris Palmer <ch...@crpalmer.com> wrote:

> I'm trying to design a set of DAGs to do a one create and backfill of a set
> of tables in BigQuery and then perform periodic loads into those tables. I
> can't quite get it to work the way I want to and I'm wondering if other
> people have solved similar problems.
>
> The parameters are as follows:
>
> I have a list of customers: cust1, cust2, etc
> I have a list of sources: source1, source2, etc
> For each pairing of customer and source I want a task that runs hourly to
> load new data into a unique table for that pair (loads are done via a
> single BigQuery query).
>
> So far that's easy to solve: I have a single DAG and just loop through the
> two lists creating a task for each pairing.
>
> However, I also want to create the tables themselves via Airflow and run a
> one time backfill for each table. Backfilling hourly is proving to be a
> very inefficient process.
>
> Finally, I also want to be able to add new customers and/or new sources to
> the lists and have it work. I know I could achieve this with one or more
> DAGs per customer/source pair, but I had hoped to avoid that explosion in
> the number of DAGs.
>
> The closest I can get is to have two DAGs. The first has a schedule of
> '@once' and for each customer/source pair has a CreateTable task and a
> downstream BackfillTable task. The second DAG runs '@hourly' and just has a
> LoadTable task for each customer/source pair.
>
> This works fine for the initial lists, but once the first DAG has run once,
> it's DagRun is marked as 'success'. If I then add a new customer or source,
> then the new tasks get in that first task, but the DagRun is never checked
> again. If I manually switch the DagRun back to 'running' then it picks up
> the new tasks.
>
> Is there some other setup that I'm missing that get's me what I want?
>
> Thanks
> Chris
>