You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by Vishu Murundi <vi...@gmail.com> on 2022/03/14 13:37:49 UTC

Can airflow tracks each dag run based on arguments passed into it

Hi folks,
New to airflow, trying to investigate a scenario and looking to know if
there are functionalities in airflow to handle it.

We have 2 dags: *dag_1* and *dag_2*.
*dag_1*: Looks into a sql table and any record for the query we have, picks
the record(ex: some
value in column Id) and trigger *dag_2* by passing it and *dag_1* job is
done and will
be keep running and looking for entries matching the query.

*dag_2*: Does some job with that provided arguments (i.e Id) and this might
take some time to complete.

Now if *dag_1* gets a new record in sql with same value (Id) as
previous(which is valid scenario), it will trigger new *dag_2* run even
while previous run is in progress, this will mess up few things.

Is there are way we can configure that if there is a dag run on-going with
same provided parameter(i.e id in this case), that we skip the new run or
cancel the previous so the new one keeps going.
If provided parameter(i.e id) is different it can keep triggering/run the
*dag_2*, this is only if the Id parameter passed is same as previous.

**Id here means the column value in the sql record pass on as a
parameter/arguments.

Thank You
Vishu

Re: Can airflow tracks each dag run based on arguments passed into it

Posted by Vishu Murundi <vi...@gmail.com>.
Thank You Tal and Daniel.
Wil try the suggested approach.

On Mon, Mar 14, 2022 at 7:18 AM Daniel Standish <dp...@gmail.com>
wrote:

> I think Tal made good suggestions.  Just adding a few more ideas:
> * In dag 1 you could add a task using ExternalTaskSensor  so that it waits
> for dag 2 to complete.
> * In dag 1 you could first check whether there is still a dag 2 running
> (using either airflow API or querying the metastore like ExternalTaskSensor
> does), and if so immediately raise AirflowSkipException.  Then dag 1 will
> check again next time it runs.
>
>

Re: Can airflow tracks each dag run based on arguments passed into it

Posted by Daniel Standish <dp...@gmail.com>.
I think Tal made good suggestions.  Just adding a few more ideas:
* In dag 1 you could add a task using ExternalTaskSensor  so that it waits
for dag 2 to complete.
* In dag 1 you could first check whether there is still a dag 2 running
(using either airflow API or querying the metastore like ExternalTaskSensor
does), and if so immediately raise AirflowSkipException.  Then dag 1 will
check again next time it runs.

RE: Can airflow tracks each dag run based on arguments passed into it

Posted by Tal Nagar <Ta...@evogene.com>.
Hi Vishu,
I can suggest two approaches that worked for us in similar situations:

  1.  DB level - Add another column to the sql table that can indicate the status of the task (queued, running etc.)

Have dag2 only pick up tasks that are in queued state and update the record with the running status.

  1.  Airflow API - Use the airflow API to trigger dags and get dag runs.

You can give the run_id an indicative name and then query dagruns (GET /dags/{dag_id}/dagRuns) to check if a run with same id already ran.

Using this endpoint will also return the dagrun conf so you'll be able to query that against your parameters.

Best

Tal Nagar

System Architect

T

+972 8 9311974

M

+972 52 4022305


[http://www.oztopia.co.il/Evogene/signature/evogene-logo.jpg]<https://www.evogene.com/>

[http://www.oztopia.co.il/Evogene/signature/in.png]<https://www.linkedin.com/company/evogene>

[http://www.oztopia.co.il/Evogene/signature/f.png]<https://www.facebook.com/evogene123/>







From: Vishu Murundi <vi...@gmail.com>
Sent: Monday, March 14, 2022 3:38 PM
To: users@airflow.apache.org
Subject: Can airflow tracks each dag run based on arguments passed into it


Hi folks,
New to airflow, trying to investigate a scenario and looking to know if there are functionalities in airflow to handle it.

We have 2 dags: dag_1 and dag_2.
dag_1: Looks into a sql table and any record for the query we have, picks the record(ex: some
value in column Id) and trigger dag_2 by passing it and dag_1 job is done and will
be keep running and looking for entries matching the query.

dag_2: Does some job with that provided arguments (i.e Id) and this might take some time to complete.

Now if dag_1 gets a new record in sql with same value (Id) as previous(which is valid scenario), it will trigger new dag_2 run even while previous run is in progress, this will mess up few things.

Is there are way we can configure that if there is a dag run on-going with same provided parameter(i.e id in this case), that we skip the new run or cancel the previous so the new one keeps going.
If provided parameter(i.e id) is different it can keep triggering/run the dag_2, this is only if the Id parameter passed is same as previous.

**Id here means the column value in the sql record pass on as a parameter/arguments.

Thank You
Vishu

This message contains information that may be confidential. If you are not the intended recipient, please delete it immediately and notify us at info@evogene.com. Please note that any disclosure or copying of its content is forbidden