You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:47:20 UTC
[airflow] 36/37: Annotate and simplify code samples in DAGs doc (#29027)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c2dd921d5c540bfe7b68fb72abb8393d12e11834
Author: Bas Harenslak <Ba...@users.noreply.github.com>
AuthorDate: Fri Jan 20 16:48:58 2023 +0100
Annotate and simplify code samples in DAGs doc (#29027)
(cherry picked from commit 80dbfbc7ad8f63db8565baefa282bc01146803fe)
---
docs/apache-airflow/core-concepts/dags.rst | 118 ++++++++++++++++++++---------
1 file changed, 82 insertions(+), 36 deletions(-)
diff --git a/docs/apache-airflow/core-concepts/dags.rst b/docs/apache-airflow/core-concepts/dags.rst
index 527965b9e6..b5cf27361b 100644
--- a/docs/apache-airflow/core-concepts/dags.rst
+++ b/docs/apache-airflow/core-concepts/dags.rst
@@ -35,29 +35,60 @@ Declaring a DAG
---------------
There are three ways to declare a DAG - either you can use a context manager,
-which will add the DAG to anything inside it implicitly::
+which will add the DAG to anything inside it implicitly:
+
+.. code-block:: python
+ :emphasize-lines: 6-10
+
+ import datetime
+
+ from airflow import DAG
+ from airflow.operators.empty import EmptyOperator
with DAG(
- "my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
- schedule="@daily", catchup=False
+ dag_id="my_dag_name",
+ start_date=datetime.datetime(2021, 1, 1),
+ schedule="@daily",
):
- op = EmptyOperator(task_id="task")
+ EmptyOperator(task_id="task")
+
+
+Or, you can use a standard constructor, passing the DAG into any operators you use:
+
+.. code-block:: python
+ :emphasize-lines: 6-11
+
+ import datetime
+
+ from airflow import DAG
+ from airflow.operators.empty import EmptyOperator
+
+ my_dag = DAG(
+ dag_id="my_dag_name",
+ start_date=datetime.datetime(2021, 1, 1),
+ schedule="@daily",
+ )
+ EmptyOperator(task_id="task", dag=my_dag)
+
+
+Or, you can use the ``@dag`` decorator to :ref:`turn a function into a DAG generator <concepts-dag-decorator>`:
+
+.. code-block:: python
+ :emphasize-lines: 7,8,12
-Or, you can use a standard constructor, passing the dag into any
-operators you use::
+ import datetime
- my_dag = DAG("my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
- schedule="@daily", catchup=False)
- op = EmptyOperator(task_id="task", dag=my_dag)
+ from airflow.decorators import dag
+ from airflow.operators.empty import EmptyOperator
-Or, you can use the ``@dag`` decorator to :ref:`turn a function into a DAG generator <concepts-dag-decorator>`::
- @dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
- schedule="@daily", catchup=False)
+ @dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
- op = EmptyOperator(task_id="task")
+ EmptyOperator(task_id="task")
+
+
+ generate_dag()
- dag = generate_dag()
DAGs are nothing without :doc:`tasks` to run, and those will usually come in the form of either :doc:`operators`, :doc:`sensors` or :doc:`taskflow`.
@@ -214,19 +245,20 @@ Otherwise, you must pass it into each Operator with ``dag=``.
Default Arguments
-----------------
-Often, many Operators inside a DAG need the same set of default arguments (such as their ``retries``). Rather than having to specify this individually for every Operator, you can instead pass ``default_args`` to the DAG when you create it, and it will auto-apply them to any operator tied to it::
+Often, many Operators inside a DAG need the same set of default arguments (such as their ``retries``). Rather than having to specify this individually for every Operator, you can instead pass ``default_args`` to the DAG when you create it, and it will auto-apply them to any operator tied to it:
+.. code-block:: python
+ :emphasize-lines: 7
import pendulum
with DAG(
- dag_id='my_dag',
- start_date=pendulum.datetime(2016, 1, 1, tz="UTC"),
- schedule='@daily',
- catchup=False,
- default_args={'retries': 2},
- ) as dag:
- op = BashOperator(task_id='dummy', bash_command='Hello World!')
+ dag_id="my_dag",
+ start_date=pendulum.datetime(2016, 1, 1),
+ schedule="@daily",
+ default_args={"retries": 2},
+ ):
+ op = BashOperator(task_id="dummy", bash_command="Hello World!")
print(op.retries) # 2
@@ -448,9 +480,12 @@ Dynamic DAGs
Since a DAG is defined by Python code, there is no need for it to be purely declarative; you are free to use loops, functions, and more to define your DAG.
-For example, here is a DAG that uses a ``for`` loop to define some Tasks::
+For example, here is a DAG that uses a ``for`` loop to define some tasks:
+
+.. code-block:: python
+ :emphasize-lines: 7
- with DAG("loop_example") as dag:
+ with DAG("loop_example", ...):
first = EmptyOperator(task_id="first")
last = EmptyOperator(task_id="last")
@@ -487,39 +522,50 @@ Unlike :ref:`concepts:subdags`, TaskGroups are purely a UI grouping concept. Tas
.. image:: /img/task_group.gif
-Dependency relationships can be applied across all tasks in a TaskGroup with the ``>>`` and ``<<`` operators. For example, the following code puts ``task1`` and ``task2`` in TaskGroup ``group1`` and then puts both tasks upstream of ``task3``::
+Dependency relationships can be applied across all tasks in a TaskGroup with the ``>>`` and ``<<`` operators. For example, the following code puts ``task1`` and ``task2`` in TaskGroup ``group1`` and then puts both tasks upstream of ``task3``:
+
+.. code-block:: python
+ :emphasize-lines: 10
from airflow.decorators import task_group
+
@task_group()
def group1():
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
+
task3 = EmptyOperator(task_id="task3")
group1() >> task3
-TaskGroup also supports ``default_args`` like DAG, it will overwrite the ``default_args`` in DAG level::
+TaskGroup also supports ``default_args`` like DAG, it will overwrite the ``default_args`` in DAG level:
- import pendulum
+.. code-block:: python
+ :emphasize-lines: 15
+
+ import datetime
+ from airflow import DAG
from airflow.decorators import task_group
+ from airflow.operators.bash import BashOperator
+ from airflow.operators.empty import EmptyOperator
with DAG(
- dag_id='dag1',
- start_date=pendulum.datetime(2016, 1, 1, tz="UTC"),
+ dag_id="dag1",
+ start_date=datetime.datetime(2016, 1, 1),
schedule="@daily",
- catchup=False,
- default_args={'retries': 1},
+ default_args={"retries": 1},
):
- @task_group(default_args={'retries': 3}):
+
+ @task_group(default_args={"retries": 3})
def group1():
"""This docstring will become the tooltip for the TaskGroup."""
- task1 = EmptyOperator(task_id='task1')
- task2 = BashOperator(task_id='task2', bash_command='echo Hello World!', retries=2)
- print(task1.retries) # 3
- print(task2.retries) # 2
+ task1 = EmptyOperator(task_id="task1")
+ task2 = BashOperator(task_id="task2", bash_command="echo Hello World!", retries=2)
+ print(task1.retries) # 3
+ print(task2.retries) # 2
If you want to see a more advanced use of TaskGroup, you can look at the ``example_task_group_decorator.py`` example DAG that comes with Airflow.