You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Javier Domingo Cansino <ja...@gmail.com> on 2018/05/18 07:51:56 UTC

Using Airflow with dataset dependant flows (not date)

Hello Guys,

First of all, I have submitted the idea to JIRA[1], and after speaking with
the guys at gitter,
they told me to bring the discussion here too.

Right now Airflow only understands of being a date based scheduler. It is
extremely complete on
that sense, and makes it really easy to populate and backfill your DAGs.
Monitoring is quite
decent, and can be improved through plugins. Everything is code, as opposed
to most of the
alternatives out there[2][3][4], and you may or not depend on files
existing to go to the next
step. There is an UI that lets you visualize the status of your systems and
trigger manually
jobs.

There is a limitation however on running on dates only, and is that
sometimes there are DAGs
that will not depend on the date, but on the dataset. Some examples I am
close to:

  * Bioinf pipeline, where you process samples

  * Media pipeline, where you may process different videos/audios in the
same way

Right now I am using Snakemake for the first ones, and bash scripts for the
second one, however
I have thought that maybe Airflow could be a solution to these two problems.

I have been reading the code, and although the term execution_date is quite
coupled, it seems
like it could be doable to abstract the datatype of this parametrization
variable (datetime) and
extend it to be something that could depend on something else (string).

After all, for what I have seen execution_date is just the parametrization
variable.

Questions I would like to ask:

  * Is this some need you have had? If so, how did you solve it? Is there
any other tool with the
    features I described that could help me on that?

  * How do you recommend solving this with Airflow?

    * In gitter people has proposed forgetting about execution_dates, just
triggering the DAGs
      and parametrizing the run through variables. However this has the
drawback to lose execution
      tracking, and make impossible to run several DAGs at the same time
for different datasets

    * There was also the proposal to instantiate subDAGs per dataset, and
have one DAG where the
      first step is to read what are the samples to run on. The problem I
see with this is that
      you lose tracking on which samples have been run, and you cannot have
per sample historic
      data.

    * Airflow works good when you have datasets that change, therefore,
other solution would be
      to instantiate one DAG per sample, and then have a single execution.
However this sounds a
      bit overkill to me, because you would just have one DAGRun per DAG.

  * If this is something that would be interesting to you, and you would
like to see this usecase
    solved within airflow, please tell, as I am interested on making a
proposal that is both
    simple and works for everyone


Right now the best idea I have is:

  * Rename execution_date to parametrization_value changing it's datatype
to string. We
    ensure backward compatibility because already existing execution_date
can be serialized.

  * Create a new entity called parametrization_group, where we could make
groups of these
    parameters for the scheduler to know that it needs to trigger a DAGRun
on every DAG that
    depends on such group.

  * Extend a bit the cli to let it modify these parametrization_group.

  * Extend the scheduler to understand what parametrization_group DAGs
depend on, and trigger
    all the DAGs to run when new parametrization_group elements are added
in.

  * Enable backill to run without --start-date and --end-date when the DAGs
depend on
    parametrization_group, and with an optional --parametrization-values
that accepts a list
    to work on.

How does all this sound to you? Any ideas?


Cheers, Javier


[1] JIRA ticket for dataset related execution:
https://issues.apache.org/jira/browse/AIRFLOW-2480
[2] Awesome 1: https://github.com/meirwah/awesome-workflow-engines
[3] Awesome 2: https://github.com/pawl/awesome-etl
[4] Awesome 3: https://github.com/pditommaso/awesome-

Re: Using Airflow with dataset dependant flows (not date)

Posted by Maxime Beauchemin <ma...@gmail.com>.
Hi,

Assuming the shape of your DAG is the same across runs, the prescribed way
is to go with the DAG with a schedule_interval=None and to create your DAG
Runs on demand. You can do so programmatically (using the ORM:
airflow.models.DagRun) (cli: airflow trigger_dag) or through REST.

If your DAG shape is different based on parameters, then conceptually it is
not a single DAG, its many, and you'd have to programmatically make
different DAG objects with a schedule_interval='@once'. Note that all these
external trigger options take a `run_id` argument, which as you guess is
some unique identifier for your run, stored in the DAG Run table. If you're
processing video files, that could be the path to the file you are
processing or whatever unique id makes sense to you.

(from memory) internally that `run_id` isn't really used as the key (though
there's a unique constraint on it in the DB), the arbitrary (dag_id,
execution_date) tuple is, because the system was built in a very
schedule-centric way at the beginning. BUT, the cli command that take the
`execution_date` argument could be retrofitted to also accept a `run_id`
instead, presumably easily.

Max

On Tue, May 29, 2018 at 5:59 PM Daniel (Daniel Lamblin) [BDP - Seoul] <
lamblin@coupang.com> wrote:

> Hi Javier;
> I'm afraid I'm not familiar enough with the overall architecture of
> Airflow to propose the right set of changes, and to decompose the work into
> PRs that are independently staged. But as dataset based processing is one
> of the items keeping some teams in my company on an internal scheduler tool
> rather than moving to Airflow, I'm generally in support of the idea that
> Airflow might support that (it also supports "schedules" defined on the
> completion of a prior DAGs).
>
> I hope that someone with the current roadmap in mind can chime in on
> whether datasets have been discussed and whether they are off the table for
> a reason, or have a known set of prerequisite changes. And if not, then
> maybe someone could posit the right kind of design for adding datasets
> Is the roadmap:
> https://cwiki.apache.org/confluence/display/AIRFLOW/Roadmap or
> https://cwiki.apache.org/confluence/display/AIRFLOW/2017+Roadmap+Items
> the roadmap label I can't seem to filter by.
>
> From: Javier Domingo Cansino <ja...@gmail.com>
> Date: Tuesday, May 29, 2018 at 5:47 PM
> To: Daniel ­ <la...@coupang.com>
> Cc: "dev@airflow.incubator.apache.org" <de...@airflow.incubator.apache.org>,
> "dev@airflow.apache.org" <de...@airflow.apache.org>
> Subject: Re: Using Airflow with dataset dependant flows (not date)
>
> Hello Daniel,
>
> Thanks for your answer, I have been able to try your suggested solution,
> and as expected it works fine. However I have found that because the
> parametrization always comes with an execution_date, it can be misleading
> to users to have all runs still depending on that parameter. I could
> generate a cli on top of airflow that would hide the fact that we are
> circumventing the usecase through that wildcard parameter, however the
> maintenance of such tool, in addition that it would be "our" way of doing
> things, could actually cause more harm than actually using other system in
> our case.
>
> After diving into the code for a few days, and reading your suggestions
> today, I agree that new web views would be required for this kind of runs,
> however I still think the parametrization would still need to be more
> controlled (as in part of the system) to feel comfortable with the
> stability of the solution. I also agree with you that we would need to have
> a new kind of schedulers too, such as Kafka messages based or database
> changes tracking ones.
>
> You mention that a lot would have to be changed for that. What steps do
> you think we could do to decompose the problem in smaller and more
> affordable steps?
>
> Cheers, Javier
>
> On Mon, May 28, 2018 at 10:28 AM Daniel (Daniel Lamblin) [BDP - Seoul] <
> lamblin@coupang.com<ma...@coupang.com>> wrote:
> This seemed like a very clear explanation of the JIRA ticket and the idea
> of making dagruns depend not on a schedule but the arrival of a dataset.
> I think a lot would have to change if the execution date was changed to a
> parameterized value, and that's not the only thing that would have to
> change to support a dataset trigger.
>
> Thinking about the video encoding example, it seem the airflow way to kind
> of do that would be to have dataset dags be dependent on a dag that is
> frequently scheduled to run just a TriggerDagOperator which contains a
> python callable polling for the new datasets (or subscribing to a queue of
> updates about them) which then decides which DAG ID to trigger for the
> particular dataset, and what dag_run_obj.payload should be to inform it of
> the right dataset to run on.
> You might want to write a plugin that give a different kind of tree view
> for these types of DAGs that get triggered this way so that you can easily
> see the dataset and payload specifics in the overview of the runs.
>
> There's an example of triggering a dag with an assigned payload:
>
> https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py
> And an example of the triggered dag using the payload:
>
> https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py
>
> The latter part works the same way as when a cli triggered dag accepts a
> conf object.
>
> The experimental API also contains a way of triggering with a conf object:
>
> https://github.com/apache/incubator-airflow/blob/master/airflow/www/api/experimental/endpoints.py#L42
> So if you wanted to skip the high-frequency trigger controller dag, and
> used a kind of queue, like an SQS queue to which you could subscribe a
> https trigger or something, then the queue system could trigger a target
> dag through the API.
>
> Does this help you in more concretely using Airflow for your needs or are
> you looking to fill in a feature for some part of the roadmap that doesn't
> yet exist?
> -Daniel
>
> On 5/18/18, 4:52 PM, "Javier Domingo Cansino" <javierdo1@gmail.com<mailto:
> javierdo1@gmail.com>> wrote:
>
>     Hello Guys,
>
>     First of all, I have submitted the idea to JIRA[1], and after speaking
> with
>     the guys at gitter,
>     they told me to bring the discussion here too.
>
>     Right now Airflow only understands of being a date based scheduler. It
> is
>     extremely complete on
>     that sense, and makes it really easy to populate and backfill your
> DAGs.
>     Monitoring is quite
>     decent, and can be improved through plugins. Everything is code, as
> opposed
>     to most of the
>     alternatives out there[2][3][4], and you may or not depend on files
>     existing to go to the next
>     step. There is an UI that lets you visualize the status of your
> systems and
>     trigger manually
>     jobs.
>
>     There is a limitation however on running on dates only, and is that
>     sometimes there are DAGs
>     that will not depend on the date, but on the dataset. Some examples I
> am
>     close to:
>
>       * Bioinf pipeline, where you process samples
>
>       * Media pipeline, where you may process different videos/audios in
> the
>     same way
>
>     Right now I am using Snakemake for the first ones, and bash scripts
> for the
>     second one, however
>     I have thought that maybe Airflow could be a solution to these two
> problems.
>
>     I have been reading the code, and although the term execution_date is
> quite
>     coupled, it seems
>     like it could be doable to abstract the datatype of this
> parametrization
>     variable (datetime) and
>     extend it to be something that could depend on something else (string).
>
>     After all, for what I have seen execution_date is just the
> parametrization
>     variable.
>
>     Questions I would like to ask:
>
>       * Is this some need you have had? If so, how did you solve it? Is
> there
>     any other tool with the
>         features I described that could help me on that?
>
>       * How do you recommend solving this with Airflow?
>
>         * In gitter people has proposed forgetting about execution_dates,
> just
>     triggering the DAGs
>           and parametrizing the run through variables. However this has the
>     drawback to lose execution
>           tracking, and make impossible to run several DAGs at the same
> time
>     for different datasets
>
>         * There was also the proposal to instantiate subDAGs per dataset,
> and
>     have one DAG where the
>           first step is to read what are the samples to run on. The
> problem I
>     see with this is that
>           you lose tracking on which samples have been run, and you cannot
> have
>     per sample historic
>           data.
>
>         * Airflow works good when you have datasets that change, therefore,
>     other solution would be
>           to instantiate one DAG per sample, and then have a single
> execution.
>     However this sounds a
>           bit overkill to me, because you would just have one DAGRun per
> DAG.
>
>       * If this is something that would be interesting to you, and you
> would
>     like to see this usecase
>         solved within airflow, please tell, as I am interested on making a
>     proposal that is both
>         simple and works for everyone
>
>
>     Right now the best idea I have is:
>
>       * Rename execution_date to parametrization_value changing it's
> datatype
>     to string. We
>         ensure backward compatibility because already existing
> execution_date
>     can be serialized.
>
>       * Create a new entity called parametrization_group, where we could
> make
>     groups of these
>         parameters for the scheduler to know that it needs to trigger a
> DAGRun
>     on every DAG that
>         depends on such group.
>
>       * Extend a bit the cli to let it modify these parametrization_group.
>
>       * Extend the scheduler to understand what parametrization_group DAGs
>     depend on, and trigger
>         all the DAGs to run when new parametrization_group elements are
> added
>     in.
>
>       * Enable backill to run without --start-date and --end-date when the
> DAGs
>     depend on
>         parametrization_group, and with an optional
> --parametrization-values
>     that accepts a list
>         to work on.
>
>     How does all this sound to you? Any ideas?
>
>
>     Cheers, Javier
>
>
>     [1] JIRA ticket for dataset related execution:
>     https://issues.apache.org/jira/browse/AIRFLOW-2480
>     [2] Awesome 1: https://github.com/meirwah/awesome-workflow-engines
>     [3] Awesome 2: https://github.com/pawl/awesome-etl
>     [4] Awesome 3: https://github.com/pditommaso/awesome-
>
>

Re: Using Airflow with dataset dependant flows (not date)

Posted by "Daniel (Daniel Lamblin) [BDP - Seoul]" <la...@coupang.com>.
Hi Javier;
I'm afraid I'm not familiar enough with the overall architecture of Airflow to propose the right set of changes, and to decompose the work into PRs that are independently staged. But as dataset based processing is one of the items keeping some teams in my company on an internal scheduler tool rather than moving to Airflow, I'm generally in support of the idea that Airflow might support that (it also supports "schedules" defined on the completion of a prior DAGs).

I hope that someone with the current roadmap in mind can chime in on whether datasets have been discussed and whether they are off the table for a reason, or have a known set of prerequisite changes. And if not, then maybe someone could posit the right kind of design for adding datasets
Is the roadmap: https://cwiki.apache.org/confluence/display/AIRFLOW/Roadmap or https://cwiki.apache.org/confluence/display/AIRFLOW/2017+Roadmap+Items the roadmap label I can't seem to filter by.

From: Javier Domingo Cansino <ja...@gmail.com>
Date: Tuesday, May 29, 2018 at 5:47 PM
To: Daniel ­ <la...@coupang.com>
Cc: "dev@airflow.incubator.apache.org" <de...@airflow.incubator.apache.org>, "dev@airflow.apache.org" <de...@airflow.apache.org>
Subject: Re: Using Airflow with dataset dependant flows (not date)

Hello Daniel,

Thanks for your answer, I have been able to try your suggested solution, and as expected it works fine. However I have found that because the parametrization always comes with an execution_date, it can be misleading to users to have all runs still depending on that parameter. I could generate a cli on top of airflow that would hide the fact that we are circumventing the usecase through that wildcard parameter, however the maintenance of such tool, in addition that it would be "our" way of doing things, could actually cause more harm than actually using other system in our case.

After diving into the code for a few days, and reading your suggestions today, I agree that new web views would be required for this kind of runs, however I still think the parametrization would still need to be more controlled (as in part of the system) to feel comfortable with the stability of the solution. I also agree with you that we would need to have a new kind of schedulers too, such as Kafka messages based or database changes tracking ones.

You mention that a lot would have to be changed for that. What steps do you think we could do to decompose the problem in smaller and more affordable steps?

Cheers, Javier

On Mon, May 28, 2018 at 10:28 AM Daniel (Daniel Lamblin) [BDP - Seoul] <la...@coupang.com>> wrote:
This seemed like a very clear explanation of the JIRA ticket and the idea of making dagruns depend not on a schedule but the arrival of a dataset.
I think a lot would have to change if the execution date was changed to a parameterized value, and that's not the only thing that would have to change to support a dataset trigger.

Thinking about the video encoding example, it seem the airflow way to kind of do that would be to have dataset dags be dependent on a dag that is frequently scheduled to run just a TriggerDagOperator which contains a python callable polling for the new datasets (or subscribing to a queue of updates about them) which then decides which DAG ID to trigger for the particular dataset, and what dag_run_obj.payload should be to inform it of the right dataset to run on.
You might want to write a plugin that give a different kind of tree view for these types of DAGs that get triggered this way so that you can easily see the dataset and payload specifics in the overview of the runs.

There's an example of triggering a dag with an assigned payload:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py
And an example of the triggered dag using the payload:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py

The latter part works the same way as when a cli triggered dag accepts a conf object.

The experimental API also contains a way of triggering with a conf object:
https://github.com/apache/incubator-airflow/blob/master/airflow/www/api/experimental/endpoints.py#L42
So if you wanted to skip the high-frequency trigger controller dag, and used a kind of queue, like an SQS queue to which you could subscribe a https trigger or something, then the queue system could trigger a target dag through the API.

Does this help you in more concretely using Airflow for your needs or are you looking to fill in a feature for some part of the roadmap that doesn't yet exist?
-Daniel

On 5/18/18, 4:52 PM, "Javier Domingo Cansino" <ja...@gmail.com>> wrote:

    Hello Guys,

    First of all, I have submitted the idea to JIRA[1], and after speaking with
    the guys at gitter,
    they told me to bring the discussion here too.

    Right now Airflow only understands of being a date based scheduler. It is
    extremely complete on
    that sense, and makes it really easy to populate and backfill your DAGs.
    Monitoring is quite
    decent, and can be improved through plugins. Everything is code, as opposed
    to most of the
    alternatives out there[2][3][4], and you may or not depend on files
    existing to go to the next
    step. There is an UI that lets you visualize the status of your systems and
    trigger manually
    jobs.

    There is a limitation however on running on dates only, and is that
    sometimes there are DAGs
    that will not depend on the date, but on the dataset. Some examples I am
    close to:

      * Bioinf pipeline, where you process samples

      * Media pipeline, where you may process different videos/audios in the
    same way

    Right now I am using Snakemake for the first ones, and bash scripts for the
    second one, however
    I have thought that maybe Airflow could be a solution to these two problems.

    I have been reading the code, and although the term execution_date is quite
    coupled, it seems
    like it could be doable to abstract the datatype of this parametrization
    variable (datetime) and
    extend it to be something that could depend on something else (string).

    After all, for what I have seen execution_date is just the parametrization
    variable.

    Questions I would like to ask:

      * Is this some need you have had? If so, how did you solve it? Is there
    any other tool with the
        features I described that could help me on that?

      * How do you recommend solving this with Airflow?

        * In gitter people has proposed forgetting about execution_dates, just
    triggering the DAGs
          and parametrizing the run through variables. However this has the
    drawback to lose execution
          tracking, and make impossible to run several DAGs at the same time
    for different datasets

        * There was also the proposal to instantiate subDAGs per dataset, and
    have one DAG where the
          first step is to read what are the samples to run on. The problem I
    see with this is that
          you lose tracking on which samples have been run, and you cannot have
    per sample historic
          data.

        * Airflow works good when you have datasets that change, therefore,
    other solution would be
          to instantiate one DAG per sample, and then have a single execution.
    However this sounds a
          bit overkill to me, because you would just have one DAGRun per DAG.

      * If this is something that would be interesting to you, and you would
    like to see this usecase
        solved within airflow, please tell, as I am interested on making a
    proposal that is both
        simple and works for everyone


    Right now the best idea I have is:

      * Rename execution_date to parametrization_value changing it's datatype
    to string. We
        ensure backward compatibility because already existing execution_date
    can be serialized.

      * Create a new entity called parametrization_group, where we could make
    groups of these
        parameters for the scheduler to know that it needs to trigger a DAGRun
    on every DAG that
        depends on such group.

      * Extend a bit the cli to let it modify these parametrization_group.

      * Extend the scheduler to understand what parametrization_group DAGs
    depend on, and trigger
        all the DAGs to run when new parametrization_group elements are added
    in.

      * Enable backill to run without --start-date and --end-date when the DAGs
    depend on
        parametrization_group, and with an optional --parametrization-values
    that accepts a list
        to work on.

    How does all this sound to you? Any ideas?


    Cheers, Javier


    [1] JIRA ticket for dataset related execution:
    https://issues.apache.org/jira/browse/AIRFLOW-2480
    [2] Awesome 1: https://github.com/meirwah/awesome-workflow-engines
    [3] Awesome 2: https://github.com/pawl/awesome-etl
    [4] Awesome 3: https://github.com/pditommaso/awesome-


Re: Using Airflow with dataset dependant flows (not date)

Posted by Javier Domingo Cansino <ja...@gmail.com>.
Hello Daniel,

Thanks for your answer, I have been able to try your suggested solution,
and as expected it works fine. However I have found that because the
parametrization always comes with an execution_date, it can be misleading
to users to have all runs still depending on that parameter. I could
generate a cli on top of airflow that would hide the fact that we are
circumventing the usecase through that wildcard parameter, however the
maintenance of such tool, in addition that it would be "our" way of doing
things, could actually cause more harm than actually using other system in
our case.

After diving into the code for a few days, and reading your suggestions
today, I agree that new web views would be required for this kind of runs,
however I still think the parametrization would still need to be more
controlled (as in part of the system) to feel comfortable with the
stability of the solution. I also agree with you that we would need to have
a new kind of schedulers too, such as Kafka messages based or database
changes tracking ones.

You mention that a lot would have to be changed for that. What steps do you
think we could do to decompose the problem in smaller and more affordable
steps?

Cheers, Javier

On Mon, May 28, 2018 at 10:28 AM Daniel (Daniel Lamblin) [BDP - Seoul] <
lamblin@coupang.com> wrote:

> This seemed like a very clear explanation of the JIRA ticket and the idea
> of making dagruns depend not on a schedule but the arrival of a dataset.
> I think a lot would have to change if the execution date was changed to a
> parameterized value, and that's not the only thing that would have to
> change to support a dataset trigger.
>
> Thinking about the video encoding example, it seem the airflow way to kind
> of do that would be to have dataset dags be dependent on a dag that is
> frequently scheduled to run just a TriggerDagOperator which contains a
> python callable polling for the new datasets (or subscribing to a queue of
> updates about them) which then decides which DAG ID to trigger for the
> particular dataset, and what dag_run_obj.payload should be to inform it of
> the right dataset to run on.
> You might want to write a plugin that give a different kind of tree view
> for these types of DAGs that get triggered this way so that you can easily
> see the dataset and payload specifics in the overview of the runs.
>
> There's an example of triggering a dag with an assigned payload:
>
> https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py
> And an example of the triggered dag using the payload:
>
> https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py
>
> The latter part works the same way as when a cli triggered dag accepts a
> conf object.
>
> The experimental API also contains a way of triggering with a conf object:
>
> https://github.com/apache/incubator-airflow/blob/master/airflow/www/api/experimental/endpoints.py#L42
> So if you wanted to skip the high-frequency trigger controller dag, and
> used a kind of queue, like an SQS queue to which you could subscribe a
> https trigger or something, then the queue system could trigger a target
> dag through the API.
>
> Does this help you in more concretely using Airflow for your needs or are
> you looking to fill in a feature for some part of the roadmap that doesn't
> yet exist?
> -Daniel
>
> On 5/18/18, 4:52 PM, "Javier Domingo Cansino" <ja...@gmail.com>
> wrote:
>
>     Hello Guys,
>
>     First of all, I have submitted the idea to JIRA[1], and after speaking
> with
>     the guys at gitter,
>     they told me to bring the discussion here too.
>
>     Right now Airflow only understands of being a date based scheduler. It
> is
>     extremely complete on
>     that sense, and makes it really easy to populate and backfill your
> DAGs.
>     Monitoring is quite
>     decent, and can be improved through plugins. Everything is code, as
> opposed
>     to most of the
>     alternatives out there[2][3][4], and you may or not depend on files
>     existing to go to the next
>     step. There is an UI that lets you visualize the status of your
> systems and
>     trigger manually
>     jobs.
>
>     There is a limitation however on running on dates only, and is that
>     sometimes there are DAGs
>     that will not depend on the date, but on the dataset. Some examples I
> am
>     close to:
>
>       * Bioinf pipeline, where you process samples
>
>       * Media pipeline, where you may process different videos/audios in
> the
>     same way
>
>     Right now I am using Snakemake for the first ones, and bash scripts
> for the
>     second one, however
>     I have thought that maybe Airflow could be a solution to these two
> problems.
>
>     I have been reading the code, and although the term execution_date is
> quite
>     coupled, it seems
>     like it could be doable to abstract the datatype of this
> parametrization
>     variable (datetime) and
>     extend it to be something that could depend on something else (string).
>
>     After all, for what I have seen execution_date is just the
> parametrization
>     variable.
>
>     Questions I would like to ask:
>
>       * Is this some need you have had? If so, how did you solve it? Is
> there
>     any other tool with the
>         features I described that could help me on that?
>
>       * How do you recommend solving this with Airflow?
>
>         * In gitter people has proposed forgetting about execution_dates,
> just
>     triggering the DAGs
>           and parametrizing the run through variables. However this has the
>     drawback to lose execution
>           tracking, and make impossible to run several DAGs at the same
> time
>     for different datasets
>
>         * There was also the proposal to instantiate subDAGs per dataset,
> and
>     have one DAG where the
>           first step is to read what are the samples to run on. The
> problem I
>     see with this is that
>           you lose tracking on which samples have been run, and you cannot
> have
>     per sample historic
>           data.
>
>         * Airflow works good when you have datasets that change, therefore,
>     other solution would be
>           to instantiate one DAG per sample, and then have a single
> execution.
>     However this sounds a
>           bit overkill to me, because you would just have one DAGRun per
> DAG.
>
>       * If this is something that would be interesting to you, and you
> would
>     like to see this usecase
>         solved within airflow, please tell, as I am interested on making a
>     proposal that is both
>         simple and works for everyone
>
>
>     Right now the best idea I have is:
>
>       * Rename execution_date to parametrization_value changing it's
> datatype
>     to string. We
>         ensure backward compatibility because already existing
> execution_date
>     can be serialized.
>
>       * Create a new entity called parametrization_group, where we could
> make
>     groups of these
>         parameters for the scheduler to know that it needs to trigger a
> DAGRun
>     on every DAG that
>         depends on such group.
>
>       * Extend a bit the cli to let it modify these parametrization_group.
>
>       * Extend the scheduler to understand what parametrization_group DAGs
>     depend on, and trigger
>         all the DAGs to run when new parametrization_group elements are
> added
>     in.
>
>       * Enable backill to run without --start-date and --end-date when the
> DAGs
>     depend on
>         parametrization_group, and with an optional
> --parametrization-values
>     that accepts a list
>         to work on.
>
>     How does all this sound to you? Any ideas?
>
>
>     Cheers, Javier
>
>
>     [1] JIRA ticket for dataset related execution:
>     https://issues.apache.org/jira/browse/AIRFLOW-2480
>     [2] Awesome 1: https://github.com/meirwah/awesome-workflow-engines
>     [3] Awesome 2: https://github.com/pawl/awesome-etl
>     [4] Awesome 3: https://github.com/pditommaso/awesome-
>
>
>

Re: Using Airflow with dataset dependant flows (not date)

Posted by "Daniel (Daniel Lamblin) [BDP - Seoul]" <la...@coupang.com>.
This seemed like a very clear explanation of the JIRA ticket and the idea of making dagruns depend not on a schedule but the arrival of a dataset.
I think a lot would have to change if the execution date was changed to a parameterized value, and that's not the only thing that would have to change to support a dataset trigger.

Thinking about the video encoding example, it seem the airflow way to kind of do that would be to have dataset dags be dependent on a dag that is frequently scheduled to run just a TriggerDagOperator which contains a python callable polling for the new datasets (or subscribing to a queue of updates about them) which then decides which DAG ID to trigger for the particular dataset, and what dag_run_obj.payload should be to inform it of the right dataset to run on.
You might want to write a plugin that give a different kind of tree view for these types of DAGs that get triggered this way so that you can easily see the dataset and payload specifics in the overview of the runs.

There's an example of triggering a dag with an assigned payload:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py
And an example of the triggered dag using the payload:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py 

The latter part works the same way as when a cli triggered dag accepts a conf object.

The experimental API also contains a way of triggering with a conf object:
https://github.com/apache/incubator-airflow/blob/master/airflow/www/api/experimental/endpoints.py#L42
So if you wanted to skip the high-frequency trigger controller dag, and used a kind of queue, like an SQS queue to which you could subscribe a https trigger or something, then the queue system could trigger a target dag through the API.

Does this help you in more concretely using Airflow for your needs or are you looking to fill in a feature for some part of the roadmap that doesn't yet exist?
-Daniel

On 5/18/18, 4:52 PM, "Javier Domingo Cansino" <ja...@gmail.com> wrote:

    Hello Guys,
    
    First of all, I have submitted the idea to JIRA[1], and after speaking with
    the guys at gitter,
    they told me to bring the discussion here too.
    
    Right now Airflow only understands of being a date based scheduler. It is
    extremely complete on
    that sense, and makes it really easy to populate and backfill your DAGs.
    Monitoring is quite
    decent, and can be improved through plugins. Everything is code, as opposed
    to most of the
    alternatives out there[2][3][4], and you may or not depend on files
    existing to go to the next
    step. There is an UI that lets you visualize the status of your systems and
    trigger manually
    jobs.
    
    There is a limitation however on running on dates only, and is that
    sometimes there are DAGs
    that will not depend on the date, but on the dataset. Some examples I am
    close to:
    
      * Bioinf pipeline, where you process samples
    
      * Media pipeline, where you may process different videos/audios in the
    same way
    
    Right now I am using Snakemake for the first ones, and bash scripts for the
    second one, however
    I have thought that maybe Airflow could be a solution to these two problems.
    
    I have been reading the code, and although the term execution_date is quite
    coupled, it seems
    like it could be doable to abstract the datatype of this parametrization
    variable (datetime) and
    extend it to be something that could depend on something else (string).
    
    After all, for what I have seen execution_date is just the parametrization
    variable.
    
    Questions I would like to ask:
    
      * Is this some need you have had? If so, how did you solve it? Is there
    any other tool with the
        features I described that could help me on that?
    
      * How do you recommend solving this with Airflow?
    
        * In gitter people has proposed forgetting about execution_dates, just
    triggering the DAGs
          and parametrizing the run through variables. However this has the
    drawback to lose execution
          tracking, and make impossible to run several DAGs at the same time
    for different datasets
    
        * There was also the proposal to instantiate subDAGs per dataset, and
    have one DAG where the
          first step is to read what are the samples to run on. The problem I
    see with this is that
          you lose tracking on which samples have been run, and you cannot have
    per sample historic
          data.
    
        * Airflow works good when you have datasets that change, therefore,
    other solution would be
          to instantiate one DAG per sample, and then have a single execution.
    However this sounds a
          bit overkill to me, because you would just have one DAGRun per DAG.
    
      * If this is something that would be interesting to you, and you would
    like to see this usecase
        solved within airflow, please tell, as I am interested on making a
    proposal that is both
        simple and works for everyone
    
    
    Right now the best idea I have is:
    
      * Rename execution_date to parametrization_value changing it's datatype
    to string. We
        ensure backward compatibility because already existing execution_date
    can be serialized.
    
      * Create a new entity called parametrization_group, where we could make
    groups of these
        parameters for the scheduler to know that it needs to trigger a DAGRun
    on every DAG that
        depends on such group.
    
      * Extend a bit the cli to let it modify these parametrization_group.
    
      * Extend the scheduler to understand what parametrization_group DAGs
    depend on, and trigger
        all the DAGs to run when new parametrization_group elements are added
    in.
    
      * Enable backill to run without --start-date and --end-date when the DAGs
    depend on
        parametrization_group, and with an optional --parametrization-values
    that accepts a list
        to work on.
    
    How does all this sound to you? Any ideas?
    
    
    Cheers, Javier
    
    
    [1] JIRA ticket for dataset related execution:
    https://issues.apache.org/jira/browse/AIRFLOW-2480
    [2] Awesome 1: https://github.com/meirwah/awesome-workflow-engines
    [3] Awesome 2: https://github.com/pawl/awesome-etl
    [4] Awesome 3: https://github.com/pditommaso/awesome-