You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by Vito De Tullio <vi...@finconsgroup.com> on 2019/12/11 14:26:32 UTC

Spread multiple DAG runs from a single operator

Hi.

I'm new to airflow, and I'm trying to understand if this tool can help us in our work.

In my case I need to translate in DAGs this kind of behavior:

  *   monitor a local directory
  *   if 1,2,10... "new" files are detected
  *   for each of them it's necessary to
     *   open the file and extract an (unbound) list of items
     *   for each of them it's necessary to
        *   call a pair of rest endpoint to retrieve additional metadata
        *   write the retrieved data and the original items inside an SQL db
     *   if it's all right, call a POST to notify that the single file has been worked on
     *   move the file in a "DONE" directory

I'm struggling to understand how to map those two "for each" behaviors.

for what I understood the "monitoring" should be done via some "sensor", that, basically "polls" for conditions at a fixed rate, so it's possible to detect more than one "new file" per check.
I have a similar scenario inside each file (it's basically a list of "jobs"), but now I also need to do actions if all of the jobs have ended correctly.

The main problem I see is that I cannot find an operator that can "spread" the work in the DAG (or, a "magical dynamic DAG", where I can have step 1, step 2 and then step 3.a, 3.b, ..., 3.x with a different x per run)
I saw TriggerDagRunOperator, but it basically can trigger (eventually) 1 external DAG.
I also found some "TriggerMultiDagRunOperator" implementations (for example one on https://github.com/mastak/airflow_multi_dagrun ) but I would like to know if this is a good usage of airflow, if there is some other better approach, or if it's better to look at other tools.

Thanks


Vito De Tullio

Senior developer

FINCONS SPA
Via Torri Bianche 10 - Pal. Betulla

20871 Vimercate (MB)

Tel. +39 039657081

Fax  +39 0396570877

[cid:e2e10b70-bcf3-4b53-8948-8001a8743536]

********** Informativa GDPR di riservatezza *************
Il presente messaggio corredato dei relativi allegati, può contenere informazioni da considerarsi strettamente riservate e destinate esclusivamente ai destinatari sopra indicati, i quali sono gli unici autorizzati ad usarle, copiarle e, sotto la propria responsabilità, diffonderle.
La diffusione, distribuzione e/o copiatura del documento trasmesso da parte di qualsiasi soggetto diverso dai destinatari è proibita.
Chiunque ricevesse questo messaggio per errore o comunque lo leggesse senza esserne legittimato è avvertito che trattenerlo, copiarlo, divulgarlo, distribuirlo a persone diverse dal destinatario è severamente proibito sia ai sensi dell’art. 616 c.p. , che ai sensi del D.Lgs. n. 101 del 10/08/2018, ed è pregato di rinviarlo immediatamente al mittente distruggendo permanentemente l’originale e qualsiasi copia della presente nonché qualsiasi stampa della stesso.

********** GDPR CONFIDENTIALITY NOTICE *************
The contents of this e-mail message and any attachments may contain strictly confidential information and are intended solely for the above indicated recipient(s), allowed to use, copy and disclose it on their own responsibility.
It is strictly forbidden to disclose, copy and/or forward or in any way reveal the contents sent by any individual or entity other than the intended recipient(s).
If you are not the intended recipient of this message, or if this message has been addressed to you in error, please immediately alert the sender and then delete this message and any attachments.
If you are not the intended recipient, you are hereby notified that any use, dissemination, copying, or storage of this message or its attachments is strictly prohibited, according to art. 616 Italian Criminal Code and Legislative Decree N. 101/2018.


R: Spread multiple DAG runs from a single operator

Posted by Vito De Tullio <vi...@finconsgroup.com>.
Hi

On https://www.astronomer.io/guides/dynamically-generating-dags/ I found another possible approach.
Basically the idea is to create a new DAG for each item in each file and trigger it programmatically

Do you think this is a good approach? Does it "pollute" the DAG list?

________________________________
Da: Shaw, Damian P. <da...@credit-suisse.com>
Inviato: mercoledì 11 dicembre 2019 16:18
A: users@airflow.apache.org <us...@airflow.apache.org>
Oggetto: RE: Spread multiple DAG runs from a single operator


Hi Vito,



I would suggest evaluate why you want to use Airflow vs. other tools in the space (Prefect, Luigi, etc…). For us we wanted a solid scheduling engine, multi-time zone support, and a UI for our users to manage tasks. You have to accept that Airflow is not very dynamic when it comes to building DAGs (they are created and given a schedule before they are run), this can be seen as both a disadvantage or an advantage.



We also have many tasks that are based on multiple files, we have a number of strategies to use Airflow for this task. One is to have operators that can handle multiple files, in particular we have many custom sensors that wait on a list of possible files and take heavy advantage of reschedule mode so that multiple tasks aren’t running at the time. Another is where it’s a manageable number of files, e.g. less than 10, then you can have a task chain for each individual file (these could be in different DAGs or the same DAG it’s up to you).



Damian







From: Vito De Tullio <vi...@finconsgroup.com>
Sent: Wednesday, December 11, 2019 10:08
To: users@airflow.apache.org
Subject: R: Spread multiple DAG runs from a single operator



Hi.



So, what do you suggest about my use case?

  *   to write custom operators that each can work with multiple files / items as a whole?
  *   develop some "master/slave" external program to deal with the variable number of items?
  *   split the workflow into multiple DAGs?
  *   ...ditch airflow? do you know a more useful tool?

I'm trying to understand how to use it at its best



Thanks



________________________________

Da: Kamil Breguła <ka...@polidea.com>>
Inviato: mercoledì 11 dicembre 2019 15:49
A: users@airflow.apache.org<ma...@airflow.apache.org> <us...@airflow.apache.org>>
Oggetto: Re: Spread multiple DAG runs from a single operator



Airflow should have a known DAG structure before starting to execute. Otherwise, it may behave unexpectedly.  Airflow poorly supports dynamic DAGs because it doesn't store the history of the DAG structure.







On Wed, Dec 11, 2019 at 3:26 PM Vito De Tullio <vi...@finconsgroup.com>> wrote:

Hi.



I'm new to airflow, and I'm trying to understand if this tool can help us in our work.



In my case I need to translate in DAGs this kind of behavior:

  *   monitor a local directory
  *   if 1,2,10... "new" files are detected
  *   for each of them it's necessary to

     *   open the file and extract an (unbound) list of items
     *   for each of them it's necessary to

        *   call a pair of rest endpoint to retrieve additional metadata
        *   write the retrieved data and the original items inside an SQL db

     *   if it's all right, call a POST to notify that the single file has been worked on
     *   move the file in a "DONE" directory

I'm struggling to understand how to map those two "for each" behaviors.



for what I understood the "monitoring" should be done via some "sensor", that, basically "polls" for conditions at a fixed rate, so it's possible to detect more than one "new file" per check.

I have a similar scenario inside each file (it's basically a list of "jobs"), but now I also need to do actions if all of the jobs have ended correctly.



The main problem I see is that I cannot find an operator that can "spread" the work in the DAG (or, a "magical dynamic DAG", where I can have step 1, step 2 and then step 3.a, 3.b, ..., 3.x with a different x per run)

I saw TriggerDagRunOperator, but it basically can trigger (eventually) 1 external DAG.

I also found some "TriggerMultiDagRunOperator" implementations (for example one on https://github.com/mastak/airflow_multi_dagrun ) but I would like to know if this is a good usage of airflow, if there is some other better approach, or if it's better to look at other tools.



Thanks





Vito De Tullio

Senior developer

FINCONS SPA
Via Torri Bianche 10 - Pal. Betulla

20871 Vimercate (MB)

Tel. +39 039657081

Fax  +39 0396570877

[cid:16ef56ca733863b3c471]

********** Informativa GDPR di riservatezza *************
Il presente messaggio corredato dei relativi allegati, può contenere informazioni da considerarsi strettamente riservate e destinate esclusivamente ai destinatari sopra indicati, i quali sono gli unici autorizzati ad usarle, copiarle e, sotto la propria responsabilità, diffonderle.
La diffusione, distribuzione e/o copiatura del documento trasmesso da parte di qualsiasi soggetto diverso dai destinatari è proibita.
Chiunque ricevesse questo messaggio per errore o comunque lo leggesse senza esserne legittimato è avvertito che trattenerlo, copiarlo, divulgarlo, distribuirlo a persone diverse dal destinatario è severamente proibito sia ai sensi dell’art. 616 c.p. , che ai sensi del D.Lgs. n. 101 del 10/08/2018, ed è pregato di rinviarlo immediatamente al mittente distruggendo permanentemente l’originale e qualsiasi copia della presente nonché qualsiasi stampa della stesso.

********** GDPR CONFIDENTIALITY NOTICE *************
The contents of this e-mail message and any attachments may contain strictly confidential information and are intended solely for the above indicated recipient(s), allowed to use, copy and disclose it on their own responsibility.
It is strictly forbidden to disclose, copy and/or forward or in any way reveal the contents sent by any individual or entity other than the intended recipient(s).
If you are not the intended recipient of this message, or if this message has been addressed to you in error, please immediately alert the sender and then delete this message and any attachments.
If you are not the intended recipient, you are hereby notified that any use, dissemination, copying, or storage of this message or its attachments is strictly prohibited, according to art. 616 Italian Criminal Code and Legislative Decree N. 101/2018.






==============================================================================
Please access the attached hyperlink for an important electronic communications disclaimer:
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
==============================================================================

RE: Spread multiple DAG runs from a single operator

Posted by "Shaw, Damian P. " <da...@credit-suisse.com>.
Hi Vito,

I would suggest evaluate why you want to use Airflow vs. other tools in the space (Prefect, Luigi, etc…). For us we wanted a solid scheduling engine, multi-time zone support, and a UI for our users to manage tasks. You have to accept that Airflow is not very dynamic when it comes to building DAGs (they are created and given a schedule before they are run), this can be seen as both a disadvantage or an advantage.

We also have many tasks that are based on multiple files, we have a number of strategies to use Airflow for this task. One is to have operators that can handle multiple files, in particular we have many custom sensors that wait on a list of possible files and take heavy advantage of reschedule mode so that multiple tasks aren’t running at the time. Another is where it’s a manageable number of files, e.g. less than 10, then you can have a task chain for each individual file (these could be in different DAGs or the same DAG it’s up to you).

Damian



From: Vito De Tullio <vi...@finconsgroup.com>
Sent: Wednesday, December 11, 2019 10:08
To: users@airflow.apache.org
Subject: R: Spread multiple DAG runs from a single operator

Hi.

So, what do you suggest about my use case?

  *   to write custom operators that each can work with multiple files / items as a whole?
  *   develop some "master/slave" external program to deal with the variable number of items?
  *   split the workflow into multiple DAGs?
  *   ...ditch airflow? do you know a more useful tool?
I'm trying to understand how to use it at its best

Thanks

________________________________
Da: Kamil Breguła <ka...@polidea.com>>
Inviato: mercoledì 11 dicembre 2019 15:49
A: users@airflow.apache.org<ma...@airflow.apache.org> <us...@airflow.apache.org>>
Oggetto: Re: Spread multiple DAG runs from a single operator

Airflow should have a known DAG structure before starting to execute. Otherwise, it may behave unexpectedly.  Airflow poorly supports dynamic DAGs because it doesn't store the history of the DAG structure.



On Wed, Dec 11, 2019 at 3:26 PM Vito De Tullio <vi...@finconsgroup.com>> wrote:
Hi.

I'm new to airflow, and I'm trying to understand if this tool can help us in our work.

In my case I need to translate in DAGs this kind of behavior:

  *   monitor a local directory
  *   if 1,2,10... "new" files are detected
  *   for each of them it's necessary to

     *   open the file and extract an (unbound) list of items
     *   for each of them it's necessary to

        *   call a pair of rest endpoint to retrieve additional metadata
        *   write the retrieved data and the original items inside an SQL db

     *   if it's all right, call a POST to notify that the single file has been worked on
     *   move the file in a "DONE" directory
I'm struggling to understand how to map those two "for each" behaviors.

for what I understood the "monitoring" should be done via some "sensor", that, basically "polls" for conditions at a fixed rate, so it's possible to detect more than one "new file" per check.
I have a similar scenario inside each file (it's basically a list of "jobs"), but now I also need to do actions if all of the jobs have ended correctly.

The main problem I see is that I cannot find an operator that can "spread" the work in the DAG (or, a "magical dynamic DAG", where I can have step 1, step 2 and then step 3.a, 3.b, ..., 3.x with a different x per run)
I saw TriggerDagRunOperator, but it basically can trigger (eventually) 1 external DAG.
I also found some "TriggerMultiDagRunOperator" implementations (for example one on https://github.com/mastak/airflow_multi_dagrun ) but I would like to know if this is a good usage of airflow, if there is some other better approach, or if it's better to look at other tools.

Thanks


Vito De Tullio

Senior developer

FINCONS SPA
Via Torri Bianche 10 - Pal. Betulla

20871 Vimercate (MB)

Tel. +39 039657081

Fax  +39 0396570877
[cid:16ef56ca733863b3c471]

********** Informativa GDPR di riservatezza *************
Il presente messaggio corredato dei relativi allegati, può contenere informazioni da considerarsi strettamente riservate e destinate esclusivamente ai destinatari sopra indicati, i quali sono gli unici autorizzati ad usarle, copiarle e, sotto la propria responsabilità, diffonderle.
La diffusione, distribuzione e/o copiatura del documento trasmesso da parte di qualsiasi soggetto diverso dai destinatari è proibita.
Chiunque ricevesse questo messaggio per errore o comunque lo leggesse senza esserne legittimato è avvertito che trattenerlo, copiarlo, divulgarlo, distribuirlo a persone diverse dal destinatario è severamente proibito sia ai sensi dell’art. 616 c.p. , che ai sensi del D.Lgs. n. 101 del 10/08/2018, ed è pregato di rinviarlo immediatamente al mittente distruggendo permanentemente l’originale e qualsiasi copia della presente nonché qualsiasi stampa della stesso.

********** GDPR CONFIDENTIALITY NOTICE *************
The contents of this e-mail message and any attachments may contain strictly confidential information and are intended solely for the above indicated recipient(s), allowed to use, copy and disclose it on their own responsibility.
It is strictly forbidden to disclose, copy and/or forward or in any way reveal the contents sent by any individual or entity other than the intended recipient(s).
If you are not the intended recipient of this message, or if this message has been addressed to you in error, please immediately alert the sender and then delete this message and any attachments.
If you are not the intended recipient, you are hereby notified that any use, dissemination, copying, or storage of this message or its attachments is strictly prohibited, according to art. 616 Italian Criminal Code and Legislative Decree N. 101/2018.




=============================================================================== 
Please access the attached hyperlink for an important electronic communications disclaimer: 
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
=============================================================================== 

R: Spread multiple DAG runs from a single operator

Posted by Vito De Tullio <vi...@finconsgroup.com>.
Hi.

So, what do you suggest about my use case?

  *   to write custom operators that each can work with multiple files / items as a whole?
  *   develop some "master/slave" external program to deal with the variable number of items?
  *   split the workflow into multiple DAGs?
  *   ...ditch airflow? do you know a more useful tool?

I'm trying to understand how to use it at its best

Thanks

________________________________
Da: Kamil Breguła <ka...@polidea.com>
Inviato: mercoledì 11 dicembre 2019 15:49
A: users@airflow.apache.org <us...@airflow.apache.org>
Oggetto: Re: Spread multiple DAG runs from a single operator

Airflow should have a known DAG structure before starting to execute. Otherwise, it may behave unexpectedly.  Airflow poorly supports dynamic DAGs because it doesn't store the history of the DAG structure.



On Wed, Dec 11, 2019 at 3:26 PM Vito De Tullio <vi...@finconsgroup.com>> wrote:
Hi.

I'm new to airflow, and I'm trying to understand if this tool can help us in our work.

In my case I need to translate in DAGs this kind of behavior:

  *   monitor a local directory
  *   if 1,2,10... "new" files are detected
  *   for each of them it's necessary to
     *   open the file and extract an (unbound) list of items
     *   for each of them it's necessary to
        *   call a pair of rest endpoint to retrieve additional metadata
        *   write the retrieved data and the original items inside an SQL db
     *   if it's all right, call a POST to notify that the single file has been worked on
     *   move the file in a "DONE" directory

I'm struggling to understand how to map those two "for each" behaviors.

for what I understood the "monitoring" should be done via some "sensor", that, basically "polls" for conditions at a fixed rate, so it's possible to detect more than one "new file" per check.
I have a similar scenario inside each file (it's basically a list of "jobs"), but now I also need to do actions if all of the jobs have ended correctly.

The main problem I see is that I cannot find an operator that can "spread" the work in the DAG (or, a "magical dynamic DAG", where I can have step 1, step 2 and then step 3.a, 3.b, ..., 3.x with a different x per run)
I saw TriggerDagRunOperator, but it basically can trigger (eventually) 1 external DAG.
I also found some "TriggerMultiDagRunOperator" implementations (for example one on https://github.com/mastak/airflow_multi_dagrun ) but I would like to know if this is a good usage of airflow, if there is some other better approach, or if it's better to look at other tools.

Thanks


Vito De Tullio

Senior developer

FINCONS SPA
Via Torri Bianche 10 - Pal. Betulla

20871 Vimercate (MB)

Tel. +39 039657081

Fax  +39 0396570877

[cid:16ef56ca733863b3c471]

********** Informativa GDPR di riservatezza *************
Il presente messaggio corredato dei relativi allegati, può contenere informazioni da considerarsi strettamente riservate e destinate esclusivamente ai destinatari sopra indicati, i quali sono gli unici autorizzati ad usarle, copiarle e, sotto la propria responsabilità, diffonderle.
La diffusione, distribuzione e/o copiatura del documento trasmesso da parte di qualsiasi soggetto diverso dai destinatari è proibita.
Chiunque ricevesse questo messaggio per errore o comunque lo leggesse senza esserne legittimato è avvertito che trattenerlo, copiarlo, divulgarlo, distribuirlo a persone diverse dal destinatario è severamente proibito sia ai sensi dell’art. 616 c.p. , che ai sensi del D.Lgs. n. 101 del 10/08/2018, ed è pregato di rinviarlo immediatamente al mittente distruggendo permanentemente l’originale e qualsiasi copia della presente nonché qualsiasi stampa della stesso.

********** GDPR CONFIDENTIALITY NOTICE *************
The contents of this e-mail message and any attachments may contain strictly confidential information and are intended solely for the above indicated recipient(s), allowed to use, copy and disclose it on their own responsibility.
It is strictly forbidden to disclose, copy and/or forward or in any way reveal the contents sent by any individual or entity other than the intended recipient(s).
If you are not the intended recipient of this message, or if this message has been addressed to you in error, please immediately alert the sender and then delete this message and any attachments.
If you are not the intended recipient, you are hereby notified that any use, dissemination, copying, or storage of this message or its attachments is strictly prohibited, according to art. 616 Italian Criminal Code and Legislative Decree N. 101/2018.


Re: Spread multiple DAG runs from a single operator

Posted by Kamil Breguła <ka...@polidea.com>.
Airflow should have a known DAG structure before starting to execute.
Otherwise, it may behave unexpectedly.  Airflow poorly supports dynamic
DAGs because it doesn't store the history of the DAG structure.



On Wed, Dec 11, 2019 at 3:26 PM Vito De Tullio <
vito.detullio@finconsgroup.com> wrote:

> Hi.
>
> I'm new to airflow, and I'm trying to understand if this tool can help us
> in our work.
>
> In my case I need to translate in DAGs this kind of behavior:
>
>    - monitor a local directory
>    - if 1,2,10... "new" files are detected
>    - for each of them it's necessary to
>    - open the file and extract an (unbound) list of items
>    - for each of them it's necessary to
>          - call a pair of rest endpoint to retrieve additional metadata
>          - write the retrieved data and the original items inside an SQL
>          db
>       - if it's all right, call a POST to notify that the single file has
>       been worked on
>       - move the file in a "DONE" directory
>
> I'm struggling to understand how to map those two "for each" behaviors.
>
> for what I understood the "monitoring" should be done via some "sensor",
> that, basically "polls" for conditions at a fixed rate, so it's possible to
> detect more than one "new file" per check.
> I have a similar scenario inside each file (it's basically a list of
> "jobs"), but now I also need to do actions if all of the jobs have ended
> correctly.
>
> The main problem I see is that I cannot find an operator that can "spread"
> the work in the DAG (or, a "magical dynamic DAG", where I can have step 1,
> step 2 and then step 3.a, 3.b, ..., 3.x with a different x per run)
> I saw TriggerDagRunOperator, but it basically can trigger (eventually) 1
> external DAG.
> I also found some "TriggerMultiDagRunOperator" implementations (for
> example one on https://github.com/mastak/airflow_multi_dagrun ) but I
> would like to know if this is a good usage of airflow, if there is some
> other better approach, or if it's better to look at other tools.
>
> Thanks
>
>
> Vito De Tullio
>
> Senior developer
>
> FINCONS SPA
> Via Torri Bianche 10 - Pal. Betulla
>
> 20871 Vimercate (MB)
>
> Tel. +39 039657081
>
> Fax  +39 0396570877
>
> ********** Informativa GDPR di riservatezza *************
> Il presente messaggio corredato dei relativi allegati, può contenere
> informazioni da considerarsi strettamente riservate e destinate
> esclusivamente ai destinatari sopra indicati, i quali sono gli unici
> autorizzati ad usarle, copiarle e, sotto la propria responsabilità,
> diffonderle.
> La diffusione, distribuzione e/o copiatura del documento trasmesso da
> parte di qualsiasi soggetto diverso dai destinatari è proibita.
> Chiunque ricevesse questo messaggio per errore o comunque lo leggesse
> senza esserne legittimato è avvertito che trattenerlo, copiarlo,
> divulgarlo, distribuirlo a persone diverse dal destinatario è severamente
> proibito sia ai sensi dell’art. 616 c.p. , che ai sensi del D.Lgs. n. 101
> del 10/08/2018, ed è pregato di rinviarlo immediatamente al mittente
> distruggendo permanentemente l’originale e qualsiasi copia della presente
> nonché qualsiasi stampa della stesso.
>
> ********** GDPR CONFIDENTIALITY NOTICE *************
> The contents of this e-mail message and any attachments may contain
> strictly confidential information and are intended solely for the above
> indicated recipient(s), allowed to use, copy and disclose it on their own
> responsibility.
> It is strictly forbidden to disclose, copy and/or forward or in any way
> reveal the contents sent by any individual or entity other than the
> intended recipient(s).
> If you are not the intended recipient of this message, or if this message
> has been addressed to you in error, please immediately alert the sender and
> then delete this message and any attachments.
> If you are not the intended recipient, you are hereby notified that any
> use, dissemination, copying, or storage of this message or its attachments
> is strictly prohibited, according to art. 616 Italian Criminal Code and
> Legislative Decree N. 101/2018.
>
>