You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by Alexander Mikhaylov <am...@picarro.com> on 2022/04/14 16:14:53 UTC

how to create and run DAG in Airflow 2?

Hi, I've tried to create a simple DAG with Airflow 2 and launch it, with a program like this -

-----
import os

# with open('../airflow/dags/uniqueId14.py', 'w') as fp:
#     fcont = """

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    dag_id='uniqueId15',
    default_args={
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2022, 4, 11),
    catchup=False,
    tags=['example'],
) as dag:

    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )

    t1 >> t2

    globals()["uniqueId15"] = dag
# """
#     fp.write(fcont)

os.system('airflow dags trigger uniqueId15')
-----

I've also tried to save the content of DAG file into Airflow dags folder. In both cases the last command, to launch the DAG, returns something like

airflow.exceptions.DagNotFound: Dag id uniqueId15 not found in DagModel

Can it be done? If yes, how?

Thanks,

Alex

Re: how to create and run DAG in Airflow 2?

Posted by Jarek Potiuk <ja...@potiuk.com>.
Note - for DebugExecutor you do not need webserver nor UI (in fact it
does not even use the DB to record the runs so you wouldn't see
anything there).

On Thu, Apr 14, 2022 at 8:25 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>
> Hey Alexander,
>
> Airflow does not work like that for "regular" use - but you can use
> DebugExecutor which does what you want
> https://airflow.apache.org/docs/apache-airflow/stable/executor/debug.html?highlight=debugexecutor
>
> But note it's just for debugging. Airflow is not supposed to be used
> to trigger dags immediately - it's a scheduler and it's optimized for
> that. We do have discussions about enabling such cases in the future
> but this not were Airflow use case is today. It's used for scheduling.
>
> J
>
> On Thu, Apr 14, 2022 at 8:02 PM Daniel Standish
> <da...@astronomer.io> wrote:
> >
> > I see.  Ok well yeah, the scheduler won't register the dag immediately.  There are settings which control how frequently it checks for new files.
> >
> > You might try shortening the time for this.  I don't know the setting off top of head but look at the config.
> >
> > You could consider using the API to check whether the dag was picked up before triggering: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dag
> >
> > You could also consider not writing out the dag file dynamically in this way and instead use "dag factory" functions, or task groups -- these are sometimes used for similar goals.  And in 2.3.0 (beta coming soon) there will be support for task mapping which can also achieve similar things.

Re: how to create and run DAG in Airflow 2?

Posted by Jarek Potiuk <ja...@potiuk.com>.
Hey Alexander,

Airflow does not work like that for "regular" use - but you can use
DebugExecutor which does what you want
https://airflow.apache.org/docs/apache-airflow/stable/executor/debug.html?highlight=debugexecutor

But note it's just for debugging. Airflow is not supposed to be used
to trigger dags immediately - it's a scheduler and it's optimized for
that. We do have discussions about enabling such cases in the future
but this not were Airflow use case is today. It's used for scheduling.

J

On Thu, Apr 14, 2022 at 8:02 PM Daniel Standish
<da...@astronomer.io> wrote:
>
> I see.  Ok well yeah, the scheduler won't register the dag immediately.  There are settings which control how frequently it checks for new files.
>
> You might try shortening the time for this.  I don't know the setting off top of head but look at the config.
>
> You could consider using the API to check whether the dag was picked up before triggering: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dag
>
> You could also consider not writing out the dag file dynamically in this way and instead use "dag factory" functions, or task groups -- these are sometimes used for similar goals.  And in 2.3.0 (beta coming soon) there will be support for task mapping which can also achieve similar things.

Re: how to create and run DAG in Airflow 2?

Posted by Daniel Standish <da...@astronomer.io>.
I see.  Ok well yeah, the scheduler won't register the dag immediately.
There are settings which control how frequently it checks for new files.

You might try shortening the time for this.  I don't know the setting off
top of head but look at the config.

You could consider using the API to check whether the dag was picked up
before triggering:
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dag

You could also consider not writing out the dag file dynamically in this
way and instead use "dag factory" functions, or task groups -- these are
sometimes used for similar goals.  And in 2.3.0 (beta coming soon) there
will be support for task mapping which can also achieve similar things.

Re: how to create and run DAG in Airflow 2?

Posted by Alexander Mikhaylov <am...@picarro.com>.
Yes, my script as shown tries to have os.system() call right after dag definition part - sorry, I meant I had dag generation code (some commented out parts) and then triggering the dag. Sorry for the confusion.

A better example would be

-----
import os

with open('../airflow/dags/uniqueId.py', 'w') as fp:
    fcont = """
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    dag_id='uniqueId13',
    default_args={
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2022, 4, 11),
    catchup=False,
    tags=['example'],
) as dag:

    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )

    t1 >> t2
"""
    fp.write(fcont)

os.system('airflow dags trigger uniqueId13')
-----

Thanks,

Alex

________________________________________
From: Alexander Mikhaylov <am...@picarro.com>
Sent: Thursday, April 14, 2022 10:15 AM
To: users@airflow.apache.org
Subject: Re: how to create and run DAG in Airflow 2?

No, Daniel, I'm trying to do a different thing.

I'm trying to have a program which would 1) create a DAG and then 2) launch this created DAG. Preferably immediately after creation.

To have it, I started with trying to create DAG from the Python program, programmatically - that is, I create DAG() object and then add it to globals() list. There are examples (Astronomer website) where the author claims DAGs created this way are showing in the list of DAGs of Airflow UI. I suspect it worked with Airflow version 1.x, but it doesn't work for me with Airflow 2.2.4 , at least I don't see them.

Next, I've tried to generate a python file - a full dag file, with imports, operators, >> chains etc. - and store it into the dags folder of Airflow. The storing works - I have the file there, and that file later is picked up by Airflow, and I see that generated DAG in UI, can launch it, that's ok. The problem is that I want my program - which generates that mydag.py script - to trigger the dag. This dag can be triggered from command line (airflow dags trigger mydagid --conf '{}'), or using REST API (curl -X POST -d '{}' -H 'Content-Type...' -H 'Authorization...' http://localhost:8080/api/dags...) - but my original program, the one which generates the dag file, can't launch it, producing

airflow.exceptions.DagNotFound: Dag id mydagid not found in DagModel

(dag id is dag-specific). I've tried to call os.system('sleep 30') and os.system('airflow dags trigger mydagid') - still getting the above error message.

Again, os.system() calls are not in the dag file, but in the Python program which generates dag file.

What can be changed here?..

Thanks,

Alex

________________________________________
From: Daniel Standish <da...@astronomer.io>
Sent: Thursday, April 14, 2022 10:03 AM
To: users@airflow.apache.org
Subject: Re: how to create and run DAG in Airflow 2?

Ok so your dag should not do any triggering.  dag file should just define dag.

so remove any `os.*` stuff.

don't need to mess with globals.

make sure your dag file is in dags folder

in terminal you can do `airflow info | grep dags_folder` to see what the current configured dags folder is

you can test that your dag parses correctly with `python /path/dagfile.py`  if no errors there, and it's in your dag folder you should be good.

next check do `airflow dags list`. you should see it there.

next you can trigger from CLI with `airflow dags trigger` or something like that.  but scheduler must be running already for this to work.  otherwise trigger from UI.  before starting webserver, in the terminal wher you will start it, check that you can see the dag with airflow dags list.

don't trigger the dag from the dag file itself.











Re: how to create and run DAG in Airflow 2?

Posted by Alexander Mikhaylov <am...@picarro.com>.
No, Daniel, I'm trying to do a different thing.

I'm trying to have a program which would 1) create a DAG and then 2) launch this created DAG. Preferably immediately after creation.

To have it, I started with trying to create DAG from the Python program, programmatically - that is, I create DAG() object and then add it to globals() list. There are examples (Astronomer website) where the author claims DAGs created this way are showing in the list of DAGs of Airflow UI. I suspect it worked with Airflow version 1.x, but it doesn't work for me with Airflow 2.2.4 , at least I don't see them.

Next, I've tried to generate a python file - a full dag file, with imports, operators, >> chains etc. - and store it into the dags folder of Airflow. The storing works - I have the file there, and that file later is picked up by Airflow, and I see that generated DAG in UI, can launch it, that's ok. The problem is that I want my program - which generates that mydag.py script - to trigger the dag. This dag can be triggered from command line (airflow dags trigger mydagid --conf '{}'), or using REST API (curl -X POST -d '{}' -H 'Content-Type...' -H 'Authorization...' http://localhost:8080/api/dags...) - but my original program, the one which generates the dag file, can't launch it, producing

airflow.exceptions.DagNotFound: Dag id mydagid not found in DagModel

(dag id is dag-specific). I've tried to call os.system('sleep 30') and os.system('airflow dags trigger mydagid') - still getting the above error message.

Again, os.system() calls are not in the dag file, but in the Python program which generates dag file.

What can be changed here?..

Thanks,

Alex

________________________________________
From: Daniel Standish <da...@astronomer.io>
Sent: Thursday, April 14, 2022 10:03 AM
To: users@airflow.apache.org
Subject: Re: how to create and run DAG in Airflow 2?

Ok so your dag should not do any triggering.  dag file should just define dag.

so remove any `os.*` stuff.

don't need to mess with globals.

make sure your dag file is in dags folder

in terminal you can do `airflow info | grep dags_folder` to see what the current configured dags folder is

you can test that your dag parses correctly with `python /path/dagfile.py`  if no errors there, and it's in your dag folder you should be good.

next check do `airflow dags list`. you should see it there.

next you can trigger from CLI with `airflow dags trigger` or something like that.  but scheduler must be running already for this to work.  otherwise trigger from UI.  before starting webserver, in the terminal wher you will start it, check that you can see the dag with airflow dags list.

don't trigger the dag from the dag file itself.











Re: how to create and run DAG in Airflow 2?

Posted by Daniel Standish <da...@astronomer.io>.
Ok so your dag should not do any triggering.  dag file should just define
dag.

so remove any `os.*` stuff.

don't need to mess with globals.

make sure your dag file is in dags folder

in terminal you can do `airflow info | grep dags_folder` to see what the
current configured dags folder is

you can test that your dag parses correctly with `python /path/dagfile.py`
if no errors there, and it's in your dag folder you should be good.

next check do `airflow dags list`. you should see it there.

next you can trigger from CLI with `airflow dags trigger` or something like
that.  but scheduler must be running already for this to work.  otherwise
trigger from UI.  before starting webserver, in the terminal wher you will
start it, check that you can see the dag with airflow dags list.

don't trigger the dag from the dag file itself.

Re: how to create and run DAG in Airflow 2?

Posted by Alexander Mikhaylov <am...@picarro.com>.
I'm running scheduler. Frankly, I'm a beginner in Airflow, so I have two terminal windows for airflow webserver and airflow scheduler, and launch python scripts in the third window. I do run "python path/to/script.py" command.

I've tried both creating DAG using globals() and storing a file into the dags folder. The first variant doesn't lead to a visible DAG in UI, and no DAG is launched. File creation is successful and later Airflow accepts the DAG from that file, it can be manually successfully launched, but I want to do that programmatically. It also seems that some time needs to pass before Airflow accepts the file from dags folder, but e.g. "sleep 30" command in python script doesn't help.

Thanks,

Alex

________________________________________
From: Daniel Standish <da...@astronomer.io>
Sent: Thursday, April 14, 2022 9:43 AM
To: users@airflow.apache.org
Subject: Re: how to create and run DAG in Airflow 2?

Are you running the scheduler, and you want to trigger the dag for the scheduler to run?  Or are you trying to run the dag in a single process, without a scheduler, e.g. with `python /path/to/dagfile.py`?


Re: how to create and run DAG in Airflow 2?

Posted by Daniel Standish <da...@astronomer.io>.
Are you running the scheduler, and you want to trigger the dag for the
scheduler to run?  Or are you trying to run the dag in a single process,
without a scheduler, e.g. with `python /path/to/dagfile.py`?