You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/20 12:16:39 UTC

[airflow] branch master updated: Use DAG context manager in examples (#13297)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9923d60  Use DAG context manager in examples (#13297)
9923d60 is described below

commit 9923d606d2887c52390a30639fc1ee0d4000149c
Author: Jennifer Melot <jt...@gmail.com>
AuthorDate: Wed Jan 20 07:16:12 2021 -0500

    Use DAG context manager in examples (#13297)
---
 airflow/example_dags/example_bash_operator.py      |  54 ++++-----
 airflow/example_dags/example_branch_operator.py    |  51 ++++-----
 .../example_branch_python_dop_operator_3.py        |  31 +++--
 airflow/example_dags/example_latest_only.py        |  10 +-
 .../example_latest_only_with_trigger.py            |  18 +--
 .../example_passing_params_via_test_command.py     |  66 +++++------
 airflow/example_dags/example_python_operator.py    | 127 ++++++++++-----------
 .../example_dags/example_short_circuit_operator.py |  30 +++--
 airflow/example_dags/example_skip_dag.py           |   6 +-
 airflow/example_dags/example_subdag_operator.py    |  45 ++++----
 .../example_dags/example_trigger_controller_dag.py |  15 ++-
 airflow/example_dags/example_trigger_target_dag.py |  29 +++--
 airflow/example_dags/example_xcom.py               |  45 ++++----
 airflow/example_dags/test_utils.py                 |  15 ++-
 airflow/example_dags/tutorial.py                   |  92 +++++++--------
 airflow/example_dags/tutorial_etl_dag.py           |  41 ++++---
 docs/apache-airflow/executor/kubernetes.rst        |   2 +
 docs/apache-airflow/howto/operator/bash.rst        |   2 +
 .../howto/operator/external_task_sensor.rst        |   2 +
 docs/apache-airflow/howto/operator/python.rst      |   3 +
 docs/apache-airflow/tutorial.rst                   |   3 +
 docs/apache-airflow/tutorial_taskflow_api.rst      |   6 +
 22 files changed, 344 insertions(+), 349 deletions(-)

diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py
index 1c22fff..0665971 100644
--- a/airflow/example_dags/example_bash_operator.py
+++ b/airflow/example_dags/example_bash_operator.py
@@ -29,7 +29,7 @@ args = {
     'owner': 'airflow',
 }
 
-dag = DAG(
+with DAG(
     dag_id='example_bash_operator',
     default_args=args,
     schedule_interval='0 0 * * *',
@@ -37,39 +37,35 @@ dag = DAG(
     dagrun_timeout=timedelta(minutes=60),
     tags=['example', 'example2'],
     params={"example_key": "example_value"},
-)
+) as dag:
 
-run_this_last = DummyOperator(
-    task_id='run_this_last',
-    dag=dag,
-)
+    run_this_last = DummyOperator(
+        task_id='run_this_last',
+    )
+
+    # [START howto_operator_bash]
+    run_this = BashOperator(
+        task_id='run_after_loop',
+        bash_command='echo 1',
+    )
+    # [END howto_operator_bash]
 
-# [START howto_operator_bash]
-run_this = BashOperator(
-    task_id='run_after_loop',
-    bash_command='echo 1',
-    dag=dag,
-)
-# [END howto_operator_bash]
+    run_this >> run_this_last
 
-run_this >> run_this_last
+    for i in range(3):
+        task = BashOperator(
+            task_id='runme_' + str(i),
+            bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
+        )
+        task >> run_this
 
-for i in range(3):
-    task = BashOperator(
-        task_id='runme_' + str(i),
-        bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
-        dag=dag,
+    # [START howto_operator_bash_template]
+    also_run_this = BashOperator(
+        task_id='also_run_this',
+        bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
     )
-    task >> run_this
-
-# [START howto_operator_bash_template]
-also_run_this = BashOperator(
-    task_id='also_run_this',
-    bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
-    dag=dag,
-)
-# [END howto_operator_bash_template]
-also_run_this >> run_this_last
+    # [END howto_operator_bash_template]
+    also_run_this >> run_this_last
 
 if __name__ == "__main__":
     dag.cli()
diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py
index 50eb328..7c5e166 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -29,43 +29,38 @@ args = {
     'owner': 'airflow',
 }
 
-dag = DAG(
+with DAG(
     dag_id='example_branch_operator',
     default_args=args,
     start_date=days_ago(2),
     schedule_interval="@daily",
     tags=['example', 'example2'],
-)
+) as dag:
 
-run_this_first = DummyOperator(
-    task_id='run_this_first',
-    dag=dag,
-)
-
-options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']
-
-branching = BranchPythonOperator(
-    task_id='branching',
-    python_callable=lambda: random.choice(options),
-    dag=dag,
-)
-run_this_first >> branching
+    run_this_first = DummyOperator(
+        task_id='run_this_first',
+    )
 
-join = DummyOperator(
-    task_id='join',
-    trigger_rule='none_failed_or_skipped',
-    dag=dag,
-)
+    options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']
 
-for option in options:
-    t = DummyOperator(
-        task_id=option,
-        dag=dag,
+    branching = BranchPythonOperator(
+        task_id='branching',
+        python_callable=lambda: random.choice(options),
     )
+    run_this_first >> branching
 
-    dummy_follow = DummyOperator(
-        task_id='follow_' + option,
-        dag=dag,
+    join = DummyOperator(
+        task_id='join',
+        trigger_rule='none_failed_or_skipped',
     )
 
-    branching >> t >> dummy_follow >> join
+    for option in options:
+        t = DummyOperator(
+            task_id=option,
+        )
+
+        dummy_follow = DummyOperator(
+            task_id='follow_' + option,
+        )
+
+        branching >> t >> dummy_follow >> join
diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py
index f01fc50..badad5a 100644
--- a/airflow/example_dags/example_branch_python_dop_operator_3.py
+++ b/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -31,14 +31,6 @@ args = {
     'depends_on_past': True,
 }
 
-dag = DAG(
-    dag_id='example_branch_dop_operator_v3',
-    schedule_interval='*/1 * * * *',
-    start_date=days_ago(2),
-    default_args=args,
-    tags=['example'],
-)
-
 
 def should_run(**kwargs):
     """
@@ -59,12 +51,19 @@ def should_run(**kwargs):
         return "dummy_task_2"
 
 
-cond = BranchPythonOperator(
-    task_id='condition',
-    python_callable=should_run,
-    dag=dag,
-)
+with DAG(
+    dag_id='example_branch_dop_operator_v3',
+    schedule_interval='*/1 * * * *',
+    start_date=days_ago(2),
+    default_args=args,
+    tags=['example'],
+) as dag:
+
+    cond = BranchPythonOperator(
+        task_id='condition',
+        python_callable=should_run,
+    )
 
-dummy_task_1 = DummyOperator(task_id='dummy_task_1', dag=dag)
-dummy_task_2 = DummyOperator(task_id='dummy_task_2', dag=dag)
-cond >> [dummy_task_1, dummy_task_2]
+    dummy_task_1 = DummyOperator(task_id='dummy_task_1')
+    dummy_task_2 = DummyOperator(task_id='dummy_task_2')
+    cond >> [dummy_task_1, dummy_task_2]
diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py
index 272a054..d0d5db0 100644
--- a/airflow/example_dags/example_latest_only.py
+++ b/airflow/example_dags/example_latest_only.py
@@ -25,14 +25,14 @@ from airflow.operators.dummy import DummyOperator
 from airflow.operators.latest_only import LatestOnlyOperator
 from airflow.utils.dates import days_ago
 
-dag = DAG(
+with DAG(
     dag_id='latest_only',
     schedule_interval=dt.timedelta(hours=4),
     start_date=days_ago(2),
     tags=['example2', 'example3'],
-)
+) as dag:
 
-latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
-task1 = DummyOperator(task_id='task1', dag=dag)
+    latest_only = LatestOnlyOperator(task_id='latest_only')
+    task1 = DummyOperator(task_id='task1')
 
-latest_only >> task1
+    latest_only >> task1
diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py
index 9178278..a8e96e7 100644
--- a/airflow/example_dags/example_latest_only_with_trigger.py
+++ b/airflow/example_dags/example_latest_only_with_trigger.py
@@ -28,19 +28,19 @@ from airflow.operators.latest_only import LatestOnlyOperator
 from airflow.utils.dates import days_ago
 from airflow.utils.trigger_rule import TriggerRule
 
-dag = DAG(
+with DAG(
     dag_id='latest_only_with_trigger',
     schedule_interval=dt.timedelta(hours=4),
     start_date=days_ago(2),
     tags=['example3'],
-)
+) as dag:
 
-latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
-task1 = DummyOperator(task_id='task1', dag=dag)
-task2 = DummyOperator(task_id='task2', dag=dag)
-task3 = DummyOperator(task_id='task3', dag=dag)
-task4 = DummyOperator(task_id='task4', dag=dag, trigger_rule=TriggerRule.ALL_DONE)
+    latest_only = LatestOnlyOperator(task_id='latest_only')
+    task1 = DummyOperator(task_id='task1')
+    task2 = DummyOperator(task_id='task2')
+    task3 = DummyOperator(task_id='task3')
+    task4 = DummyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE)
 
-latest_only >> task1 >> [task3, task4]
-task2 >> [task3, task4]
+    latest_only >> task1 >> [task3, task4]
+    task2 >> [task3, task4]
 # [END example]
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py
index 8eaadd7..2c930cc 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -20,23 +20,13 @@
 
 import os
 from datetime import timedelta
+from textwrap import dedent
 
 from airflow import DAG
 from airflow.operators.bash import BashOperator
 from airflow.operators.python import PythonOperator
 from airflow.utils.dates import days_ago
 
-dag = DAG(
-    "example_passing_params_via_test_command",
-    default_args={
-        "owner": "airflow",
-    },
-    schedule_interval='*/1 * * * *',
-    start_date=days_ago(1),
-    dagrun_timeout=timedelta(minutes=4),
-    tags=['example'],
-)
-
 
 def my_py_command(test_mode, params):
     """
@@ -56,26 +46,6 @@ def my_py_command(test_mode, params):
     return 1
 
 
-my_templated_command = """
-    echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} "
-    echo " 'miff was passed in via BashOperator with value {{ params.miff }} "
-"""
-
-run_this = PythonOperator(
-    task_id='run_this',
-    python_callable=my_py_command,
-    params={"miff": "agg"},
-    dag=dag,
-)
-
-also_run_this = BashOperator(
-    task_id='also_run_this',
-    bash_command=my_templated_command,
-    params={"miff": "agg"},
-    dag=dag,
-)
-
-
 def print_env_vars(test_mode):
     """
     Print out the "foo" param passed in via
@@ -87,6 +57,36 @@ def print_env_vars(test_mode):
         print("AIRFLOW_TEST_MODE={}".format(os.environ.get('AIRFLOW_TEST_MODE')))
 
 
-env_var_test_task = PythonOperator(task_id='env_var_test_task', python_callable=print_env_vars, dag=dag)
+with DAG(
+    "example_passing_params_via_test_command",
+    default_args={
+        "owner": "airflow",
+    },
+    schedule_interval='*/1 * * * *',
+    start_date=days_ago(1),
+    dagrun_timeout=timedelta(minutes=4),
+    tags=['example'],
+) as dag:
+
+    my_templated_command = dedent(
+        """
+        echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} "
+        echo " 'miff was passed in via BashOperator with value {{ params.miff }} "
+    """
+    )
+
+    run_this = PythonOperator(
+        task_id='run_this',
+        python_callable=my_py_command,
+        params={"miff": "agg"},
+    )
+
+    also_run_this = BashOperator(
+        task_id='also_run_this',
+        bash_command=my_templated_command,
+        params={"miff": "agg"},
+    )
+
+    env_var_test_task = PythonOperator(task_id='env_var_test_task', python_callable=print_env_vars)
 
-run_this >> also_run_this
+    run_this >> also_run_this
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index d5e16a5..a9db342 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -28,77 +28,68 @@ args = {
     'owner': 'airflow',
 }
 
-dag = DAG(
+with DAG(
     dag_id='example_python_operator',
     default_args=args,
     schedule_interval=None,
     start_date=days_ago(2),
     tags=['example'],
-)
-
-
-# [START howto_operator_python]
-def print_context(ds, **kwargs):
-    """Print the Airflow context and ds variable from the context."""
-    pprint(kwargs)
-    print(ds)
-    return 'Whatever you return gets printed in the logs'
-
-
-run_this = PythonOperator(
-    task_id='print_the_context',
-    python_callable=print_context,
-    dag=dag,
-)
-# [END howto_operator_python]
-
-
-# [START howto_operator_python_kwargs]
-def my_sleeping_function(random_base):
-    """This is a function that will run within the DAG execution"""
-    time.sleep(random_base)
-
-
-# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
-for i in range(5):
-    task = PythonOperator(
-        task_id='sleep_for_' + str(i),
-        python_callable=my_sleeping_function,
-        op_kwargs={'random_base': float(i) / 10},
-        dag=dag,
+) as dag:
+
+    # [START howto_operator_python]
+    def print_context(ds, **kwargs):
+        """Print the Airflow context and ds variable from the context."""
+        pprint(kwargs)
+        print(ds)
+        return 'Whatever you return gets printed in the logs'
+
+    run_this = PythonOperator(
+        task_id='print_the_context',
+        python_callable=print_context,
     )
-
-    run_this >> task
-# [END howto_operator_python_kwargs]
-
-
-# [START howto_operator_python_venv]
-def callable_virtualenv():
-    """
-    Example function that will be performed in a virtual environment.
-
-    Importing at the module level ensures that it will not attempt to import the
-    library before it is installed.
-    """
-    from time import sleep
-
-    from colorama import Back, Fore, Style
-
-    print(Fore.RED + 'some red text')
-    print(Back.GREEN + 'and with a green background')
-    print(Style.DIM + 'and in dim text')
-    print(Style.RESET_ALL)
-    for _ in range(10):
-        print(Style.DIM + 'Please wait...', flush=True)
-        sleep(10)
-    print('Finished')
-
-
-virtualenv_task = PythonVirtualenvOperator(
-    task_id="virtualenv_python",
-    python_callable=callable_virtualenv,
-    requirements=["colorama==0.4.0"],
-    system_site_packages=False,
-    dag=dag,
-)
-# [END howto_operator_python_venv]
+    # [END howto_operator_python]
+
+    # [START howto_operator_python_kwargs]
+    def my_sleeping_function(random_base):
+        """This is a function that will run within the DAG execution"""
+        time.sleep(random_base)
+
+    # Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
+    for i in range(5):
+        task = PythonOperator(
+            task_id='sleep_for_' + str(i),
+            python_callable=my_sleeping_function,
+            op_kwargs={'random_base': float(i) / 10},
+        )
+
+        run_this >> task
+    # [END howto_operator_python_kwargs]
+
+    # [START howto_operator_python_venv]
+    def callable_virtualenv():
+        """
+        Example function that will be performed in a virtual environment.
+
+        Importing at the module level ensures that it will not attempt to import the
+        library before it is installed.
+        """
+        from time import sleep
+
+        from colorama import Back, Fore, Style
+
+        print(Fore.RED + 'some red text')
+        print(Back.GREEN + 'and with a green background')
+        print(Style.DIM + 'and in dim text')
+        print(Style.RESET_ALL)
+        for _ in range(10):
+            print(Style.DIM + 'Please wait...', flush=True)
+            sleep(10)
+        print('Finished')
+
+    virtualenv_task = PythonVirtualenvOperator(
+        task_id="virtualenv_python",
+        python_callable=callable_virtualenv,
+        requirements=["colorama==0.4.0"],
+        system_site_packages=False,
+    )
+    # [END howto_operator_python_venv]
diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py
index 38163a0..3836ef9 100644
--- a/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow/example_dags/example_short_circuit_operator.py
@@ -27,27 +27,25 @@ args = {
     'owner': 'airflow',
 }
 
-dag = DAG(
+with DAG(
     dag_id='example_short_circuit_operator',
     default_args=args,
     start_date=dates.days_ago(2),
     tags=['example'],
-)
+) as dag:
 
-cond_true = ShortCircuitOperator(
-    task_id='condition_is_True',
-    python_callable=lambda: True,
-    dag=dag,
-)
+    cond_true = ShortCircuitOperator(
+        task_id='condition_is_True',
+        python_callable=lambda: True,
+    )
 
-cond_false = ShortCircuitOperator(
-    task_id='condition_is_False',
-    python_callable=lambda: False,
-    dag=dag,
-)
+    cond_false = ShortCircuitOperator(
+        task_id='condition_is_False',
+        python_callable=lambda: False,
+    )
 
-ds_true = [DummyOperator(task_id='true_' + str(i), dag=dag) for i in [1, 2]]
-ds_false = [DummyOperator(task_id='false_' + str(i), dag=dag) for i in [1, 2]]
+    ds_true = [DummyOperator(task_id='true_' + str(i)) for i in [1, 2]]
+    ds_false = [DummyOperator(task_id='false_' + str(i)) for i in [1, 2]]
 
-chain(cond_true, *ds_true)
-chain(cond_false, *ds_false)
+    chain(cond_true, *ds_true)
+    chain(cond_false, *ds_false)
diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py
index 633dc5e..77fbf4a 100644
--- a/airflow/example_dags/example_skip_dag.py
+++ b/airflow/example_dags/example_skip_dag.py
@@ -56,6 +56,6 @@ def create_test_pipeline(suffix, trigger_rule, dag_):
     join >> final
 
 
-dag = DAG(dag_id='example_skip_dag', default_args=args, start_date=days_ago(2), tags=['example'])
-create_test_pipeline('1', 'all_success', dag)
-create_test_pipeline('2', 'one_success', dag)
+with DAG(dag_id='example_skip_dag', default_args=args, start_date=days_ago(2), tags=['example']) as dag:
+    create_test_pipeline('1', 'all_success', dag)
+    create_test_pipeline('2', 'one_success', dag)
diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py
index be88281..f27aec7 100644
--- a/airflow/example_dags/example_subdag_operator.py
+++ b/airflow/example_dags/example_subdag_operator.py
@@ -31,36 +31,31 @@ args = {
     'owner': 'airflow',
 }
 
-dag = DAG(
+with DAG(
     dag_id=DAG_NAME, default_args=args, start_date=days_ago(2), schedule_interval="@once", tags=['example']
-)
+) as dag:
 
-start = DummyOperator(
-    task_id='start',
-    dag=dag,
-)
+    start = DummyOperator(
+        task_id='start',
+    )
 
-section_1 = SubDagOperator(
-    task_id='section-1',
-    subdag=subdag(DAG_NAME, 'section-1', args),
-    dag=dag,
-)
+    section_1 = SubDagOperator(
+        task_id='section-1',
+        subdag=subdag(DAG_NAME, 'section-1', args),
+    )
 
-some_other_task = DummyOperator(
-    task_id='some-other-task',
-    dag=dag,
-)
+    some_other_task = DummyOperator(
+        task_id='some-other-task',
+    )
 
-section_2 = SubDagOperator(
-    task_id='section-2',
-    subdag=subdag(DAG_NAME, 'section-2', args),
-    dag=dag,
-)
+    section_2 = SubDagOperator(
+        task_id='section-2',
+        subdag=subdag(DAG_NAME, 'section-2', args),
+    )
 
-end = DummyOperator(
-    task_id='end',
-    dag=dag,
-)
+    end = DummyOperator(
+        task_id='end',
+    )
 
-start >> section_1 >> some_other_task >> section_2 >> end
+    start >> section_1 >> some_other_task >> section_2 >> end
 # [END example_subdag_operator]
diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py
index 0f706c7..9d02399 100644
--- a/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow/example_dags/example_trigger_controller_dag.py
@@ -25,17 +25,16 @@ from airflow import DAG
 from airflow.operators.trigger_dagrun import TriggerDagRunOperator
 from airflow.utils.dates import days_ago
 
-dag = DAG(
+with DAG(
     dag_id="example_trigger_controller_dag",
     default_args={"owner": "airflow"},
     start_date=days_ago(2),
     schedule_interval="@once",
     tags=['example'],
-)
+) as dag:
 
-trigger = TriggerDagRunOperator(
-    task_id="test_trigger_dagrun",
-    trigger_dag_id="example_trigger_target_dag",  # Ensure this equals the dag_id of the DAG to trigger
-    conf={"message": "Hello World"},
-    dag=dag,
-)
+    trigger = TriggerDagRunOperator(
+        task_id="test_trigger_dagrun",
+        trigger_dag_id="example_trigger_target_dag",  # Ensure this equals the dag_id of the DAG to trigger
+        conf={"message": "Hello World"},
+    )
diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py
index 0355275..39ecefc 100644
--- a/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow/example_dags/example_trigger_target_dag.py
@@ -27,14 +27,6 @@ from airflow.operators.bash import BashOperator
 from airflow.operators.python import PythonOperator
 from airflow.utils.dates import days_ago
 
-dag = DAG(
-    dag_id="example_trigger_target_dag",
-    default_args={"owner": "airflow"},
-    start_date=days_ago(2),
-    schedule_interval=None,
-    tags=['example'],
-)
-
 
 def run_this_func(**context):
     """
@@ -46,11 +38,18 @@ def run_this_func(**context):
     print("Remotely received value of {} for key=message".format(context["dag_run"].conf["message"]))
 
 
-run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag)
+with DAG(
+    dag_id="example_trigger_target_dag",
+    default_args={"owner": "airflow"},
+    start_date=days_ago(2),
+    schedule_interval=None,
+    tags=['example'],
+) as dag:
+
+    run_this = PythonOperator(task_id="run_this", python_callable=run_this_func)
 
-bash_task = BashOperator(
-    task_id="bash_task",
-    bash_command='echo "Here is the message: $message"',
-    env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
-    dag=dag,
-)
+    bash_task = BashOperator(
+        task_id="bash_task",
+        bash_command='echo "Here is the message: $message"',
+        env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
+    )
diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py
index 779e392..03f85d9 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -21,14 +21,6 @@ from airflow import DAG
 from airflow.operators.python import PythonOperator
 from airflow.utils.dates import days_ago
 
-dag = DAG(
-    'example_xcom',
-    schedule_interval="@once",
-    start_date=days_ago(2),
-    default_args={'owner': 'airflow'},
-    tags=['example'],
-)
-
 value_1 = [1, 2, 3]
 value_2 = {'a': 'b'}
 
@@ -65,22 +57,27 @@ def puller(**kwargs):
         raise ValueError(f'The two values differ {pulled_value_2} and {value_2}')
 
 
-push1 = PythonOperator(
-    task_id='push',
-    dag=dag,
-    python_callable=push,
-)
+with DAG(
+    'example_xcom',
+    schedule_interval="@once",
+    start_date=days_ago(2),
+    default_args={'owner': 'airflow'},
+    tags=['example'],
+) as dag:
+
+    push1 = PythonOperator(
+        task_id='push',
+        python_callable=push,
+    )
 
-push2 = PythonOperator(
-    task_id='push_by_returning',
-    dag=dag,
-    python_callable=push_by_returning,
-)
+    push2 = PythonOperator(
+        task_id='push_by_returning',
+        python_callable=push_by_returning,
+    )
 
-pull = PythonOperator(
-    task_id='puller',
-    dag=dag,
-    python_callable=puller,
-)
+    pull = PythonOperator(
+        task_id='puller',
+        python_callable=puller,
+    )
 
-pull << [push1, push2]
+    pull << [push1, push2]
diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py
index a1a2ed0..0211dfb 100644
--- a/airflow/example_dags/test_utils.py
+++ b/airflow/example_dags/test_utils.py
@@ -20,12 +20,11 @@ from airflow import DAG
 from airflow.operators.bash import BashOperator
 from airflow.utils.dates import days_ago
 
-dag = DAG(dag_id='test_utils', schedule_interval=None, tags=['example'])
+with DAG(dag_id='test_utils', schedule_interval=None, tags=['example']) as dag:
 
-task = BashOperator(
-    task_id='sleeps_forever',
-    dag=dag,
-    bash_command="sleep 10000000000",
-    start_date=days_ago(2),
-    owner='airflow',
-)
+    task = BashOperator(
+        task_id='sleeps_forever',
+        bash_command="sleep 10000000000",
+        start_date=days_ago(2),
+        owner='airflow',
+    )
diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py
index a00051c..518c801 100644
--- a/airflow/example_dags/tutorial.py
+++ b/airflow/example_dags/tutorial.py
@@ -24,6 +24,7 @@ Documentation that goes along with the Airflow tutorial located
 # [START tutorial]
 # [START import_module]
 from datetime import timedelta
+from textwrap import dedent
 
 # The DAG object; we'll need this to instantiate a DAG
 from airflow import DAG
@@ -62,62 +63,63 @@ default_args = {
 # [END default_args]
 
 # [START instantiate_dag]
-dag = DAG(
+with DAG(
     'tutorial',
     default_args=default_args,
     description='A simple tutorial DAG',
     schedule_interval=timedelta(days=1),
     start_date=days_ago(2),
     tags=['example'],
-)
-# [END instantiate_dag]
+) as dag:
+    # [END instantiate_dag]
 
-# t1, t2 and t3 are examples of tasks created by instantiating operators
-# [START basic_task]
-t1 = BashOperator(
-    task_id='print_date',
-    bash_command='date',
-    dag=dag,
-)
+    # t1, t2 and t3 are examples of tasks created by instantiating operators
+    # [START basic_task]
+    t1 = BashOperator(
+        task_id='print_date',
+        bash_command='date',
+    )
 
-t2 = BashOperator(
-    task_id='sleep',
-    depends_on_past=False,
-    bash_command='sleep 5',
-    retries=3,
-    dag=dag,
-)
-# [END basic_task]
+    t2 = BashOperator(
+        task_id='sleep',
+        depends_on_past=False,
+        bash_command='sleep 5',
+        retries=3,
+    )
+    # [END basic_task]
 
-# [START documentation]
-dag.doc_md = __doc__
+    # [START documentation]
+    dag.doc_md = __doc__
 
-t1.doc_md = """\
-#### Task Documentation
-You can document your task using the attributes `doc_md` (markdown),
-`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
-rendered in the UI's Task Instance Details page.
-![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
-"""
-# [END documentation]
+    t1.doc_md = dedent(
+        """\
+    #### Task Documentation
+    You can document your task using the attributes `doc_md` (markdown),
+    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
+    rendered in the UI's Task Instance Details page.
+    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
+    """
+    )
+    # [END documentation]
 
-# [START jinja_template]
-templated_command = """
-{% for i in range(5) %}
-    echo "{{ ds }}"
-    echo "{{ macros.ds_add(ds, 7)}}"
-    echo "{{ params.my_param }}"
-{% endfor %}
-"""
+    # [START jinja_template]
+    templated_command = dedent(
+        """
+    {% for i in range(5) %}
+        echo "{{ ds }}"
+        echo "{{ macros.ds_add(ds, 7)}}"
+        echo "{{ params.my_param }}"
+    {% endfor %}
+    """
+    )
 
-t3 = BashOperator(
-    task_id='templated',
-    depends_on_past=False,
-    bash_command=templated_command,
-    params={'my_param': 'Parameter I passed in'},
-    dag=dag,
-)
-# [END jinja_template]
+    t3 = BashOperator(
+        task_id='templated',
+        depends_on_past=False,
+        bash_command=templated_command,
+        params={'my_param': 'Parameter I passed in'},
+    )
+    # [END jinja_template]
 
-t1 >> [t2, t3]
+    t1 >> [t2, t3]
 # [END tutorial]
diff --git a/airflow/example_dags/tutorial_etl_dag.py b/airflow/example_dags/tutorial_etl_dag.py
index 48b519b..8b45600 100644
--- a/airflow/example_dags/tutorial_etl_dag.py
+++ b/airflow/example_dags/tutorial_etl_dag.py
@@ -27,6 +27,7 @@ as part of the documentation that goes along with the Airflow Functional DAG tut
 # [START tutorial]
 # [START import_module]
 import json
+from textwrap import dedent
 
 # The DAG object; we'll need this to instantiate a DAG
 from airflow import DAG
@@ -98,33 +99,39 @@ with DAG(
         task_id='extract',
         python_callable=extract,
     )
-    extract_task.doc_md = """\
-#### Extract task
-A simple Extract task to get data ready for the rest of the data pipeline.
-In this case, getting data is simulated by reading from a hardcoded JSON string.
-This data is then put into xcom, so that it can be processed by the next task.
-"""
+    extract_task.doc_md = dedent(
+        """\
+    #### Extract task
+    A simple Extract task to get data ready for the rest of the data pipeline.
+    In this case, getting data is simulated by reading from a hardcoded JSON string.
+    This data is then put into xcom, so that it can be processed by the next task.
+    """
+    )
 
     transform_task = PythonOperator(
         task_id='transform',
         python_callable=transform,
     )
-    transform_task.doc_md = """\
-#### Transform task
-A simple Transform task which takes in the collection of order data from xcom
-and computes the total order value.
-This computed value is then put into xcom, so that it can be processed by the next task.
-"""
+    transform_task.doc_md = dedent(
+        """\
+    #### Transform task
+    A simple Transform task which takes in the collection of order data from xcom
+    and computes the total order value.
+    This computed value is then put into xcom, so that it can be processed by the next task.
+    """
+    )
 
     load_task = PythonOperator(
         task_id='load',
         python_callable=load,
     )
-    load_task.doc_md = """\
-#### Load task
-A simple Load task which takes in the result of the Transform task, by reading it
-from xcom and instead of saving it to end user review, just prints it out.
-"""
+    load_task.doc_md = dedent(
+        """\
+    #### Load task
+    A simple Load task which takes in the result of the Transform task, by reading it
+    from xcom and instead of saving it to end user review, just prints it out.
+    """
+    )
 
     extract_task >> transform_task >> load_task
 
diff --git a/docs/apache-airflow/executor/kubernetes.rst b/docs/apache-airflow/executor/kubernetes.rst
index 9b774cf..a0df9db 100644
--- a/docs/apache-airflow/executor/kubernetes.rst
+++ b/docs/apache-airflow/executor/kubernetes.rst
@@ -120,6 +120,7 @@ name ``base`` and a second container containing your desired sidecar.
 
 .. exampleinclude:: /../../airflow/example_dags/example_kubernetes_executor_config.py
     :language: python
+    :dedent: 8
     :start-after: [START task_with_sidecar]
     :end-before: [END task_with_sidecar]
 
@@ -130,6 +131,7 @@ Here is an example of a task with both features:
 
 .. exampleinclude:: /../../airflow/example_dags/example_kubernetes_executor_config.py
     :language: python
+    :dedent: 8
     :start-after: [START task_with_template]
     :end-before: [END task_with_template]
 
diff --git a/docs/apache-airflow/howto/operator/bash.rst b/docs/apache-airflow/howto/operator/bash.rst
index c8a923f..3d2195f 100644
--- a/docs/apache-airflow/howto/operator/bash.rst
+++ b/docs/apache-airflow/howto/operator/bash.rst
@@ -27,6 +27,7 @@ commands in a `Bash <https://www.gnu.org/software/bash/>`__ shell.
 
 .. exampleinclude:: /../../airflow/example_dags/example_bash_operator.py
     :language: python
+    :dedent: 4
     :start-after: [START howto_operator_bash]
     :end-before: [END howto_operator_bash]
 
@@ -38,6 +39,7 @@ You can use :ref:`Jinja templates <jinja-templating>` to parameterize the
 
 .. exampleinclude:: /../../airflow/example_dags/example_bash_operator.py
     :language: python
+    :dedent: 4
     :start-after: [START howto_operator_bash_template]
     :end-before: [END howto_operator_bash_template]
 
diff --git a/docs/apache-airflow/howto/operator/external_task_sensor.rst b/docs/apache-airflow/howto/operator/external_task_sensor.rst
index eec8074..420bd13 100644
--- a/docs/apache-airflow/howto/operator/external_task_sensor.rst
+++ b/docs/apache-airflow/howto/operator/external_task_sensor.rst
@@ -46,6 +46,7 @@ via ``allowed_states`` and ``failed_states`` parameters.
 
 .. exampleinclude:: /../../airflow/example_dags/example_external_task_marker_dag.py
     :language: python
+    :dedent: 4
     :start-after: [START howto_operator_external_task_sensor]
     :end-before: [END howto_operator_external_task_sensor]
 
@@ -60,5 +61,6 @@ user clears ``parent_task``.
 
 .. exampleinclude:: /../../airflow/example_dags/example_external_task_marker_dag.py
     :language: python
+    :dedent: 4
     :start-after: [START howto_operator_external_task_marker]
     :end-before: [END howto_operator_external_task_marker]
diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst
index 7f4d2b8..4a59df6 100644
--- a/docs/apache-airflow/howto/operator/python.rst
+++ b/docs/apache-airflow/howto/operator/python.rst
@@ -27,6 +27,7 @@ Python callables.
 
 .. exampleinclude:: /../../airflow/example_dags/example_python_operator.py
     :language: python
+    :dedent: 4
     :start-after: [START howto_operator_python]
     :end-before: [END howto_operator_python]
 
@@ -38,6 +39,7 @@ to the Python callable.
 
 .. exampleinclude:: /../../airflow/example_dags/example_python_operator.py
     :language: python
+    :dedent: 4
     :start-after: [START howto_operator_python_kwargs]
     :end-before: [END howto_operator_python_kwargs]
 
@@ -63,6 +65,7 @@ Python callables inside a new Python virtual environment.
 
 .. exampleinclude:: /../../airflow/example_dags/example_python_operator.py
     :language: python
+    :dedent: 4
     :start-after: [START howto_operator_python_venv]
     :end-before: [END howto_operator_python_venv]
 
diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index 9324014..3a6b7ce9 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -109,6 +109,7 @@ instantiated from an operator is called a task. The first argument
 
 .. exampleinclude:: /../../airflow/example_dags/tutorial.py
     :language: python
+    :dedent: 4
     :start-after: [START basic_task]
     :end-before: [END basic_task]
 
@@ -144,6 +145,7 @@ stamp").
 
 .. exampleinclude:: /../../airflow/example_dags/tutorial.py
     :language: python
+    :dedent: 4
     :start-after: [START jinja_template]
     :end-before: [END jinja_template]
 
@@ -186,6 +188,7 @@ json, yaml.
 
 .. exampleinclude:: /../../airflow/example_dags/tutorial.py
     :language: python
+    :dedent: 4
     :start-after: [START documentation]
     :end-before: [END documentation]
 
diff --git a/docs/apache-airflow/tutorial_taskflow_api.rst b/docs/apache-airflow/tutorial_taskflow_api.rst
index 6c51f16..e389a4c 100644
--- a/docs/apache-airflow/tutorial_taskflow_api.rst
+++ b/docs/apache-airflow/tutorial_taskflow_api.rst
@@ -69,6 +69,7 @@ as shown below. The function name acts as a unique identifier for the task.
 
 .. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
     :language: python
+    :dedent: 4
     :start-after: [START extract]
     :end-before: [END extract]
 
@@ -83,6 +84,7 @@ we can move to the main part of the DAG.
 
 .. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
     :language: python
+    :dedent: 4
     :start-after: [START main_flow]
     :end-before: [END main_flow]
 
@@ -119,6 +121,7 @@ in the middle of the data pipeline. In Airflow 1.x, this task is defined as show
 
 .. exampleinclude:: /../../airflow/example_dags/tutorial_etl_dag.py
     :language: python
+    :dedent: 4
     :start-after: [START transform_function]
     :end-before: [END transform_function]
 
@@ -130,6 +133,7 @@ Contrasting that with Taskflow API in Airflow 2.0 as shown below.
 
 .. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
     :language: python
+    :dedent: 4
     :start-after: [START transform]
     :end-before: [END transform]
 
@@ -143,6 +147,7 @@ dependencies specified as shown below.
 
 .. exampleinclude:: /../../airflow/example_dags/tutorial_etl_dag.py
     :language: python
+    :dedent: 4
     :start-after: [START main_flow]
     :end-before: [END main_flow]
 
@@ -151,6 +156,7 @@ the dependencies as shown below.
 
 .. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
     :language: python
+    :dedent: 4
     :start-after: [START main_flow]
     :end-before: [END main_flow]