You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/20 12:16:39 UTC
[airflow] branch master updated: Use DAG context manager in
examples (#13297)
This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 9923d60 Use DAG context manager in examples (#13297)
9923d60 is described below
commit 9923d606d2887c52390a30639fc1ee0d4000149c
Author: Jennifer Melot <jt...@gmail.com>
AuthorDate: Wed Jan 20 07:16:12 2021 -0500
Use DAG context manager in examples (#13297)
---
airflow/example_dags/example_bash_operator.py | 54 ++++-----
airflow/example_dags/example_branch_operator.py | 51 ++++-----
.../example_branch_python_dop_operator_3.py | 31 +++--
airflow/example_dags/example_latest_only.py | 10 +-
.../example_latest_only_with_trigger.py | 18 +--
.../example_passing_params_via_test_command.py | 66 +++++------
airflow/example_dags/example_python_operator.py | 127 ++++++++++-----------
.../example_dags/example_short_circuit_operator.py | 30 +++--
airflow/example_dags/example_skip_dag.py | 6 +-
airflow/example_dags/example_subdag_operator.py | 45 ++++----
.../example_dags/example_trigger_controller_dag.py | 15 ++-
airflow/example_dags/example_trigger_target_dag.py | 29 +++--
airflow/example_dags/example_xcom.py | 45 ++++----
airflow/example_dags/test_utils.py | 15 ++-
airflow/example_dags/tutorial.py | 92 +++++++--------
airflow/example_dags/tutorial_etl_dag.py | 41 ++++---
docs/apache-airflow/executor/kubernetes.rst | 2 +
docs/apache-airflow/howto/operator/bash.rst | 2 +
.../howto/operator/external_task_sensor.rst | 2 +
docs/apache-airflow/howto/operator/python.rst | 3 +
docs/apache-airflow/tutorial.rst | 3 +
docs/apache-airflow/tutorial_taskflow_api.rst | 6 +
22 files changed, 344 insertions(+), 349 deletions(-)
diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py
index 1c22fff..0665971 100644
--- a/airflow/example_dags/example_bash_operator.py
+++ b/airflow/example_dags/example_bash_operator.py
@@ -29,7 +29,7 @@ args = {
'owner': 'airflow',
}
-dag = DAG(
+with DAG(
dag_id='example_bash_operator',
default_args=args,
schedule_interval='0 0 * * *',
@@ -37,39 +37,35 @@ dag = DAG(
dagrun_timeout=timedelta(minutes=60),
tags=['example', 'example2'],
params={"example_key": "example_value"},
-)
+) as dag:
-run_this_last = DummyOperator(
- task_id='run_this_last',
- dag=dag,
-)
+ run_this_last = DummyOperator(
+ task_id='run_this_last',
+ )
+
+ # [START howto_operator_bash]
+ run_this = BashOperator(
+ task_id='run_after_loop',
+ bash_command='echo 1',
+ )
+ # [END howto_operator_bash]
-# [START howto_operator_bash]
-run_this = BashOperator(
- task_id='run_after_loop',
- bash_command='echo 1',
- dag=dag,
-)
-# [END howto_operator_bash]
+ run_this >> run_this_last
-run_this >> run_this_last
+ for i in range(3):
+ task = BashOperator(
+ task_id='runme_' + str(i),
+ bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
+ )
+ task >> run_this
-for i in range(3):
- task = BashOperator(
- task_id='runme_' + str(i),
- bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
- dag=dag,
+ # [START howto_operator_bash_template]
+ also_run_this = BashOperator(
+ task_id='also_run_this',
+ bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
)
- task >> run_this
-
-# [START howto_operator_bash_template]
-also_run_this = BashOperator(
- task_id='also_run_this',
- bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
- dag=dag,
-)
-# [END howto_operator_bash_template]
-also_run_this >> run_this_last
+ # [END howto_operator_bash_template]
+ also_run_this >> run_this_last
if __name__ == "__main__":
dag.cli()
diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py
index 50eb328..7c5e166 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -29,43 +29,38 @@ args = {
'owner': 'airflow',
}
-dag = DAG(
+with DAG(
dag_id='example_branch_operator',
default_args=args,
start_date=days_ago(2),
schedule_interval="@daily",
tags=['example', 'example2'],
-)
+) as dag:
-run_this_first = DummyOperator(
- task_id='run_this_first',
- dag=dag,
-)
-
-options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']
-
-branching = BranchPythonOperator(
- task_id='branching',
- python_callable=lambda: random.choice(options),
- dag=dag,
-)
-run_this_first >> branching
+ run_this_first = DummyOperator(
+ task_id='run_this_first',
+ )
-join = DummyOperator(
- task_id='join',
- trigger_rule='none_failed_or_skipped',
- dag=dag,
-)
+ options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']
-for option in options:
- t = DummyOperator(
- task_id=option,
- dag=dag,
+ branching = BranchPythonOperator(
+ task_id='branching',
+ python_callable=lambda: random.choice(options),
)
+ run_this_first >> branching
- dummy_follow = DummyOperator(
- task_id='follow_' + option,
- dag=dag,
+ join = DummyOperator(
+ task_id='join',
+ trigger_rule='none_failed_or_skipped',
)
- branching >> t >> dummy_follow >> join
+ for option in options:
+ t = DummyOperator(
+ task_id=option,
+ )
+
+ dummy_follow = DummyOperator(
+ task_id='follow_' + option,
+ )
+
+ branching >> t >> dummy_follow >> join
diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py
index f01fc50..badad5a 100644
--- a/airflow/example_dags/example_branch_python_dop_operator_3.py
+++ b/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -31,14 +31,6 @@ args = {
'depends_on_past': True,
}
-dag = DAG(
- dag_id='example_branch_dop_operator_v3',
- schedule_interval='*/1 * * * *',
- start_date=days_ago(2),
- default_args=args,
- tags=['example'],
-)
-
def should_run(**kwargs):
"""
@@ -59,12 +51,19 @@ def should_run(**kwargs):
return "dummy_task_2"
-cond = BranchPythonOperator(
- task_id='condition',
- python_callable=should_run,
- dag=dag,
-)
+with DAG(
+ dag_id='example_branch_dop_operator_v3',
+ schedule_interval='*/1 * * * *',
+ start_date=days_ago(2),
+ default_args=args,
+ tags=['example'],
+) as dag:
+
+ cond = BranchPythonOperator(
+ task_id='condition',
+ python_callable=should_run,
+ )
-dummy_task_1 = DummyOperator(task_id='dummy_task_1', dag=dag)
-dummy_task_2 = DummyOperator(task_id='dummy_task_2', dag=dag)
-cond >> [dummy_task_1, dummy_task_2]
+ dummy_task_1 = DummyOperator(task_id='dummy_task_1')
+ dummy_task_2 = DummyOperator(task_id='dummy_task_2')
+ cond >> [dummy_task_1, dummy_task_2]
diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py
index 272a054..d0d5db0 100644
--- a/airflow/example_dags/example_latest_only.py
+++ b/airflow/example_dags/example_latest_only.py
@@ -25,14 +25,14 @@ from airflow.operators.dummy import DummyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.dates import days_ago
-dag = DAG(
+with DAG(
dag_id='latest_only',
schedule_interval=dt.timedelta(hours=4),
start_date=days_ago(2),
tags=['example2', 'example3'],
-)
+) as dag:
-latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
-task1 = DummyOperator(task_id='task1', dag=dag)
+ latest_only = LatestOnlyOperator(task_id='latest_only')
+ task1 = DummyOperator(task_id='task1')
-latest_only >> task1
+ latest_only >> task1
diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py
index 9178278..a8e96e7 100644
--- a/airflow/example_dags/example_latest_only_with_trigger.py
+++ b/airflow/example_dags/example_latest_only_with_trigger.py
@@ -28,19 +28,19 @@ from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule
-dag = DAG(
+with DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
start_date=days_ago(2),
tags=['example3'],
-)
+) as dag:
-latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
-task1 = DummyOperator(task_id='task1', dag=dag)
-task2 = DummyOperator(task_id='task2', dag=dag)
-task3 = DummyOperator(task_id='task3', dag=dag)
-task4 = DummyOperator(task_id='task4', dag=dag, trigger_rule=TriggerRule.ALL_DONE)
+ latest_only = LatestOnlyOperator(task_id='latest_only')
+ task1 = DummyOperator(task_id='task1')
+ task2 = DummyOperator(task_id='task2')
+ task3 = DummyOperator(task_id='task3')
+ task4 = DummyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE)
-latest_only >> task1 >> [task3, task4]
-task2 >> [task3, task4]
+ latest_only >> task1 >> [task3, task4]
+ task2 >> [task3, task4]
# [END example]
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py
index 8eaadd7..2c930cc 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -20,23 +20,13 @@
import os
from datetime import timedelta
+from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
-dag = DAG(
- "example_passing_params_via_test_command",
- default_args={
- "owner": "airflow",
- },
- schedule_interval='*/1 * * * *',
- start_date=days_ago(1),
- dagrun_timeout=timedelta(minutes=4),
- tags=['example'],
-)
-
def my_py_command(test_mode, params):
"""
@@ -56,26 +46,6 @@ def my_py_command(test_mode, params):
return 1
-my_templated_command = """
- echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} "
- echo " 'miff was passed in via BashOperator with value {{ params.miff }} "
-"""
-
-run_this = PythonOperator(
- task_id='run_this',
- python_callable=my_py_command,
- params={"miff": "agg"},
- dag=dag,
-)
-
-also_run_this = BashOperator(
- task_id='also_run_this',
- bash_command=my_templated_command,
- params={"miff": "agg"},
- dag=dag,
-)
-
-
def print_env_vars(test_mode):
"""
Print out the "foo" param passed in via
@@ -87,6 +57,36 @@ def print_env_vars(test_mode):
print("AIRFLOW_TEST_MODE={}".format(os.environ.get('AIRFLOW_TEST_MODE')))
-env_var_test_task = PythonOperator(task_id='env_var_test_task', python_callable=print_env_vars, dag=dag)
+with DAG(
+ "example_passing_params_via_test_command",
+ default_args={
+ "owner": "airflow",
+ },
+ schedule_interval='*/1 * * * *',
+ start_date=days_ago(1),
+ dagrun_timeout=timedelta(minutes=4),
+ tags=['example'],
+) as dag:
+
+ my_templated_command = dedent(
+ """
+ echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} "
+ echo " 'miff was passed in via BashOperator with value {{ params.miff }} "
+ """
+ )
+
+ run_this = PythonOperator(
+ task_id='run_this',
+ python_callable=my_py_command,
+ params={"miff": "agg"},
+ )
+
+ also_run_this = BashOperator(
+ task_id='also_run_this',
+ bash_command=my_templated_command,
+ params={"miff": "agg"},
+ )
+
+ env_var_test_task = PythonOperator(task_id='env_var_test_task', python_callable=print_env_vars)
-run_this >> also_run_this
+ run_this >> also_run_this
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index d5e16a5..a9db342 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -28,77 +28,68 @@ args = {
'owner': 'airflow',
}
-dag = DAG(
+with DAG(
dag_id='example_python_operator',
default_args=args,
schedule_interval=None,
start_date=days_ago(2),
tags=['example'],
-)
-
-
-# [START howto_operator_python]
-def print_context(ds, **kwargs):
- """Print the Airflow context and ds variable from the context."""
- pprint(kwargs)
- print(ds)
- return 'Whatever you return gets printed in the logs'
-
-
-run_this = PythonOperator(
- task_id='print_the_context',
- python_callable=print_context,
- dag=dag,
-)
-# [END howto_operator_python]
-
-
-# [START howto_operator_python_kwargs]
-def my_sleeping_function(random_base):
- """This is a function that will run within the DAG execution"""
- time.sleep(random_base)
-
-
-# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
-for i in range(5):
- task = PythonOperator(
- task_id='sleep_for_' + str(i),
- python_callable=my_sleeping_function,
- op_kwargs={'random_base': float(i) / 10},
- dag=dag,
+) as dag:
+
+ # [START howto_operator_python]
+ def print_context(ds, **kwargs):
+ """Print the Airflow context and ds variable from the context."""
+ pprint(kwargs)
+ print(ds)
+ return 'Whatever you return gets printed in the logs'
+
+ run_this = PythonOperator(
+ task_id='print_the_context',
+ python_callable=print_context,
)
-
- run_this >> task
-# [END howto_operator_python_kwargs]
-
-
-# [START howto_operator_python_venv]
-def callable_virtualenv():
- """
- Example function that will be performed in a virtual environment.
-
- Importing at the module level ensures that it will not attempt to import the
- library before it is installed.
- """
- from time import sleep
-
- from colorama import Back, Fore, Style
-
- print(Fore.RED + 'some red text')
- print(Back.GREEN + 'and with a green background')
- print(Style.DIM + 'and in dim text')
- print(Style.RESET_ALL)
- for _ in range(10):
- print(Style.DIM + 'Please wait...', flush=True)
- sleep(10)
- print('Finished')
-
-
-virtualenv_task = PythonVirtualenvOperator(
- task_id="virtualenv_python",
- python_callable=callable_virtualenv,
- requirements=["colorama==0.4.0"],
- system_site_packages=False,
- dag=dag,
-)
-# [END howto_operator_python_venv]
+ # [END howto_operator_python]
+
+ # [START howto_operator_python_kwargs]
+ def my_sleeping_function(random_base):
+ """This is a function that will run within the DAG execution"""
+ time.sleep(random_base)
+
+ # Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
+ for i in range(5):
+ task = PythonOperator(
+ task_id='sleep_for_' + str(i),
+ python_callable=my_sleeping_function,
+ op_kwargs={'random_base': float(i) / 10},
+ )
+
+ run_this >> task
+ # [END howto_operator_python_kwargs]
+
+ # [START howto_operator_python_venv]
+ def callable_virtualenv():
+ """
+ Example function that will be performed in a virtual environment.
+
+ Importing at the module level ensures that it will not attempt to import the
+ library before it is installed.
+ """
+ from time import sleep
+
+ from colorama import Back, Fore, Style
+
+ print(Fore.RED + 'some red text')
+ print(Back.GREEN + 'and with a green background')
+ print(Style.DIM + 'and in dim text')
+ print(Style.RESET_ALL)
+ for _ in range(10):
+ print(Style.DIM + 'Please wait...', flush=True)
+ sleep(10)
+ print('Finished')
+
+ virtualenv_task = PythonVirtualenvOperator(
+ task_id="virtualenv_python",
+ python_callable=callable_virtualenv,
+ requirements=["colorama==0.4.0"],
+ system_site_packages=False,
+ )
+ # [END howto_operator_python_venv]
diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py
index 38163a0..3836ef9 100644
--- a/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow/example_dags/example_short_circuit_operator.py
@@ -27,27 +27,25 @@ args = {
'owner': 'airflow',
}
-dag = DAG(
+with DAG(
dag_id='example_short_circuit_operator',
default_args=args,
start_date=dates.days_ago(2),
tags=['example'],
-)
+) as dag:
-cond_true = ShortCircuitOperator(
- task_id='condition_is_True',
- python_callable=lambda: True,
- dag=dag,
-)
+ cond_true = ShortCircuitOperator(
+ task_id='condition_is_True',
+ python_callable=lambda: True,
+ )
-cond_false = ShortCircuitOperator(
- task_id='condition_is_False',
- python_callable=lambda: False,
- dag=dag,
-)
+ cond_false = ShortCircuitOperator(
+ task_id='condition_is_False',
+ python_callable=lambda: False,
+ )
-ds_true = [DummyOperator(task_id='true_' + str(i), dag=dag) for i in [1, 2]]
-ds_false = [DummyOperator(task_id='false_' + str(i), dag=dag) for i in [1, 2]]
+ ds_true = [DummyOperator(task_id='true_' + str(i)) for i in [1, 2]]
+ ds_false = [DummyOperator(task_id='false_' + str(i)) for i in [1, 2]]
-chain(cond_true, *ds_true)
-chain(cond_false, *ds_false)
+ chain(cond_true, *ds_true)
+ chain(cond_false, *ds_false)
diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py
index 633dc5e..77fbf4a 100644
--- a/airflow/example_dags/example_skip_dag.py
+++ b/airflow/example_dags/example_skip_dag.py
@@ -56,6 +56,6 @@ def create_test_pipeline(suffix, trigger_rule, dag_):
join >> final
-dag = DAG(dag_id='example_skip_dag', default_args=args, start_date=days_ago(2), tags=['example'])
-create_test_pipeline('1', 'all_success', dag)
-create_test_pipeline('2', 'one_success', dag)
+with DAG(dag_id='example_skip_dag', default_args=args, start_date=days_ago(2), tags=['example']) as dag:
+ create_test_pipeline('1', 'all_success', dag)
+ create_test_pipeline('2', 'one_success', dag)
diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py
index be88281..f27aec7 100644
--- a/airflow/example_dags/example_subdag_operator.py
+++ b/airflow/example_dags/example_subdag_operator.py
@@ -31,36 +31,31 @@ args = {
'owner': 'airflow',
}
-dag = DAG(
+with DAG(
dag_id=DAG_NAME, default_args=args, start_date=days_ago(2), schedule_interval="@once", tags=['example']
-)
+) as dag:
-start = DummyOperator(
- task_id='start',
- dag=dag,
-)
+ start = DummyOperator(
+ task_id='start',
+ )
-section_1 = SubDagOperator(
- task_id='section-1',
- subdag=subdag(DAG_NAME, 'section-1', args),
- dag=dag,
-)
+ section_1 = SubDagOperator(
+ task_id='section-1',
+ subdag=subdag(DAG_NAME, 'section-1', args),
+ )
-some_other_task = DummyOperator(
- task_id='some-other-task',
- dag=dag,
-)
+ some_other_task = DummyOperator(
+ task_id='some-other-task',
+ )
-section_2 = SubDagOperator(
- task_id='section-2',
- subdag=subdag(DAG_NAME, 'section-2', args),
- dag=dag,
-)
+ section_2 = SubDagOperator(
+ task_id='section-2',
+ subdag=subdag(DAG_NAME, 'section-2', args),
+ )
-end = DummyOperator(
- task_id='end',
- dag=dag,
-)
+ end = DummyOperator(
+ task_id='end',
+ )
-start >> section_1 >> some_other_task >> section_2 >> end
+ start >> section_1 >> some_other_task >> section_2 >> end
# [END example_subdag_operator]
diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py
index 0f706c7..9d02399 100644
--- a/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow/example_dags/example_trigger_controller_dag.py
@@ -25,17 +25,16 @@ from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
-dag = DAG(
+with DAG(
dag_id="example_trigger_controller_dag",
default_args={"owner": "airflow"},
start_date=days_ago(2),
schedule_interval="@once",
tags=['example'],
-)
+) as dag:
-trigger = TriggerDagRunOperator(
- task_id="test_trigger_dagrun",
- trigger_dag_id="example_trigger_target_dag", # Ensure this equals the dag_id of the DAG to trigger
- conf={"message": "Hello World"},
- dag=dag,
-)
+ trigger = TriggerDagRunOperator(
+ task_id="test_trigger_dagrun",
+ trigger_dag_id="example_trigger_target_dag", # Ensure this equals the dag_id of the DAG to trigger
+ conf={"message": "Hello World"},
+ )
diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py
index 0355275..39ecefc 100644
--- a/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow/example_dags/example_trigger_target_dag.py
@@ -27,14 +27,6 @@ from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
-dag = DAG(
- dag_id="example_trigger_target_dag",
- default_args={"owner": "airflow"},
- start_date=days_ago(2),
- schedule_interval=None,
- tags=['example'],
-)
-
def run_this_func(**context):
"""
@@ -46,11 +38,18 @@ def run_this_func(**context):
print("Remotely received value of {} for key=message".format(context["dag_run"].conf["message"]))
-run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag)
+with DAG(
+ dag_id="example_trigger_target_dag",
+ default_args={"owner": "airflow"},
+ start_date=days_ago(2),
+ schedule_interval=None,
+ tags=['example'],
+) as dag:
+
+ run_this = PythonOperator(task_id="run_this", python_callable=run_this_func)
-bash_task = BashOperator(
- task_id="bash_task",
- bash_command='echo "Here is the message: $message"',
- env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
- dag=dag,
-)
+ bash_task = BashOperator(
+ task_id="bash_task",
+ bash_command='echo "Here is the message: $message"',
+ env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
+ )
diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py
index 779e392..03f85d9 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -21,14 +21,6 @@ from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
-dag = DAG(
- 'example_xcom',
- schedule_interval="@once",
- start_date=days_ago(2),
- default_args={'owner': 'airflow'},
- tags=['example'],
-)
-
value_1 = [1, 2, 3]
value_2 = {'a': 'b'}
@@ -65,22 +57,27 @@ def puller(**kwargs):
raise ValueError(f'The two values differ {pulled_value_2} and {value_2}')
-push1 = PythonOperator(
- task_id='push',
- dag=dag,
- python_callable=push,
-)
+with DAG(
+ 'example_xcom',
+ schedule_interval="@once",
+ start_date=days_ago(2),
+ default_args={'owner': 'airflow'},
+ tags=['example'],
+) as dag:
+
+ push1 = PythonOperator(
+ task_id='push',
+ python_callable=push,
+ )
-push2 = PythonOperator(
- task_id='push_by_returning',
- dag=dag,
- python_callable=push_by_returning,
-)
+ push2 = PythonOperator(
+ task_id='push_by_returning',
+ python_callable=push_by_returning,
+ )
-pull = PythonOperator(
- task_id='puller',
- dag=dag,
- python_callable=puller,
-)
+ pull = PythonOperator(
+ task_id='puller',
+ python_callable=puller,
+ )
-pull << [push1, push2]
+ pull << [push1, push2]
diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py
index a1a2ed0..0211dfb 100644
--- a/airflow/example_dags/test_utils.py
+++ b/airflow/example_dags/test_utils.py
@@ -20,12 +20,11 @@ from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
-dag = DAG(dag_id='test_utils', schedule_interval=None, tags=['example'])
+with DAG(dag_id='test_utils', schedule_interval=None, tags=['example']) as dag:
-task = BashOperator(
- task_id='sleeps_forever',
- dag=dag,
- bash_command="sleep 10000000000",
- start_date=days_ago(2),
- owner='airflow',
-)
+ task = BashOperator(
+ task_id='sleeps_forever',
+ bash_command="sleep 10000000000",
+ start_date=days_ago(2),
+ owner='airflow',
+ )
diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py
index a00051c..518c801 100644
--- a/airflow/example_dags/tutorial.py
+++ b/airflow/example_dags/tutorial.py
@@ -24,6 +24,7 @@ Documentation that goes along with the Airflow tutorial located
# [START tutorial]
# [START import_module]
from datetime import timedelta
+from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
@@ -62,62 +63,63 @@ default_args = {
# [END default_args]
# [START instantiate_dag]
-dag = DAG(
+with DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['example'],
-)
-# [END instantiate_dag]
+) as dag:
+ # [END instantiate_dag]
-# t1, t2 and t3 are examples of tasks created by instantiating operators
-# [START basic_task]
-t1 = BashOperator(
- task_id='print_date',
- bash_command='date',
- dag=dag,
-)
+ # t1, t2 and t3 are examples of tasks created by instantiating operators
+ # [START basic_task]
+ t1 = BashOperator(
+ task_id='print_date',
+ bash_command='date',
+ )
-t2 = BashOperator(
- task_id='sleep',
- depends_on_past=False,
- bash_command='sleep 5',
- retries=3,
- dag=dag,
-)
-# [END basic_task]
+ t2 = BashOperator(
+ task_id='sleep',
+ depends_on_past=False,
+ bash_command='sleep 5',
+ retries=3,
+ )
+ # [END basic_task]
-# [START documentation]
-dag.doc_md = __doc__
+ # [START documentation]
+ dag.doc_md = __doc__
-t1.doc_md = """\
-#### Task Documentation
-You can document your task using the attributes `doc_md` (markdown),
-`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
-rendered in the UI's Task Instance Details page.
-![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
-"""
-# [END documentation]
+ t1.doc_md = dedent(
+ """\
+ #### Task Documentation
+ You can document your task using the attributes `doc_md` (markdown),
+ `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
+ rendered in the UI's Task Instance Details page.
+ ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
+ """
+ )
+ # [END documentation]
-# [START jinja_template]
-templated_command = """
-{% for i in range(5) %}
- echo "{{ ds }}"
- echo "{{ macros.ds_add(ds, 7)}}"
- echo "{{ params.my_param }}"
-{% endfor %}
-"""
+ # [START jinja_template]
+ templated_command = dedent(
+ """
+ {% for i in range(5) %}
+ echo "{{ ds }}"
+ echo "{{ macros.ds_add(ds, 7)}}"
+ echo "{{ params.my_param }}"
+ {% endfor %}
+ """
+ )
-t3 = BashOperator(
- task_id='templated',
- depends_on_past=False,
- bash_command=templated_command,
- params={'my_param': 'Parameter I passed in'},
- dag=dag,
-)
-# [END jinja_template]
+ t3 = BashOperator(
+ task_id='templated',
+ depends_on_past=False,
+ bash_command=templated_command,
+ params={'my_param': 'Parameter I passed in'},
+ )
+ # [END jinja_template]
-t1 >> [t2, t3]
+ t1 >> [t2, t3]
# [END tutorial]
diff --git a/airflow/example_dags/tutorial_etl_dag.py b/airflow/example_dags/tutorial_etl_dag.py
index 48b519b..8b45600 100644
--- a/airflow/example_dags/tutorial_etl_dag.py
+++ b/airflow/example_dags/tutorial_etl_dag.py
@@ -27,6 +27,7 @@ as part of the documentation that goes along with the Airflow Functional DAG tut
# [START tutorial]
# [START import_module]
import json
+from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
@@ -98,33 +99,39 @@ with DAG(
task_id='extract',
python_callable=extract,
)
- extract_task.doc_md = """\
-#### Extract task
-A simple Extract task to get data ready for the rest of the data pipeline.
-In this case, getting data is simulated by reading from a hardcoded JSON string.
-This data is then put into xcom, so that it can be processed by the next task.
-"""
+ extract_task.doc_md = dedent(
+ """\
+ #### Extract task
+ A simple Extract task to get data ready for the rest of the data pipeline.
+ In this case, getting data is simulated by reading from a hardcoded JSON string.
+ This data is then put into xcom, so that it can be processed by the next task.
+ """
+ )
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
)
- transform_task.doc_md = """\
-#### Transform task
-A simple Transform task which takes in the collection of order data from xcom
-and computes the total order value.
-This computed value is then put into xcom, so that it can be processed by the next task.
-"""
+ transform_task.doc_md = dedent(
+ """\
+ #### Transform task
+ A simple Transform task which takes in the collection of order data from xcom
+ and computes the total order value.
+ This computed value is then put into xcom, so that it can be processed by the next task.
+ """
+ )
load_task = PythonOperator(
task_id='load',
python_callable=load,
)
- load_task.doc_md = """\
-#### Load task
-A simple Load task which takes in the result of the Transform task, by reading it
-from xcom and instead of saving it to end user review, just prints it out.
-"""
+ load_task.doc_md = dedent(
+ """\
+ #### Load task
+ A simple Load task which takes in the result of the Transform task, by reading it
+ from xcom and instead of saving it to end user review, just prints it out.
+ """
+ )
extract_task >> transform_task >> load_task
diff --git a/docs/apache-airflow/executor/kubernetes.rst b/docs/apache-airflow/executor/kubernetes.rst
index 9b774cf..a0df9db 100644
--- a/docs/apache-airflow/executor/kubernetes.rst
+++ b/docs/apache-airflow/executor/kubernetes.rst
@@ -120,6 +120,7 @@ name ``base`` and a second container containing your desired sidecar.
.. exampleinclude:: /../../airflow/example_dags/example_kubernetes_executor_config.py
:language: python
+ :dedent: 8
:start-after: [START task_with_sidecar]
:end-before: [END task_with_sidecar]
@@ -130,6 +131,7 @@ Here is an example of a task with both features:
.. exampleinclude:: /../../airflow/example_dags/example_kubernetes_executor_config.py
:language: python
+ :dedent: 8
:start-after: [START task_with_template]
:end-before: [END task_with_template]
diff --git a/docs/apache-airflow/howto/operator/bash.rst b/docs/apache-airflow/howto/operator/bash.rst
index c8a923f..3d2195f 100644
--- a/docs/apache-airflow/howto/operator/bash.rst
+++ b/docs/apache-airflow/howto/operator/bash.rst
@@ -27,6 +27,7 @@ commands in a `Bash <https://www.gnu.org/software/bash/>`__ shell.
.. exampleinclude:: /../../airflow/example_dags/example_bash_operator.py
:language: python
+ :dedent: 4
:start-after: [START howto_operator_bash]
:end-before: [END howto_operator_bash]
@@ -38,6 +39,7 @@ You can use :ref:`Jinja templates <jinja-templating>` to parameterize the
.. exampleinclude:: /../../airflow/example_dags/example_bash_operator.py
:language: python
+ :dedent: 4
:start-after: [START howto_operator_bash_template]
:end-before: [END howto_operator_bash_template]
diff --git a/docs/apache-airflow/howto/operator/external_task_sensor.rst b/docs/apache-airflow/howto/operator/external_task_sensor.rst
index eec8074..420bd13 100644
--- a/docs/apache-airflow/howto/operator/external_task_sensor.rst
+++ b/docs/apache-airflow/howto/operator/external_task_sensor.rst
@@ -46,6 +46,7 @@ via ``allowed_states`` and ``failed_states`` parameters.
.. exampleinclude:: /../../airflow/example_dags/example_external_task_marker_dag.py
:language: python
+ :dedent: 4
:start-after: [START howto_operator_external_task_sensor]
:end-before: [END howto_operator_external_task_sensor]
@@ -60,5 +61,6 @@ user clears ``parent_task``.
.. exampleinclude:: /../../airflow/example_dags/example_external_task_marker_dag.py
:language: python
+ :dedent: 4
:start-after: [START howto_operator_external_task_marker]
:end-before: [END howto_operator_external_task_marker]
diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst
index 7f4d2b8..4a59df6 100644
--- a/docs/apache-airflow/howto/operator/python.rst
+++ b/docs/apache-airflow/howto/operator/python.rst
@@ -27,6 +27,7 @@ Python callables.
.. exampleinclude:: /../../airflow/example_dags/example_python_operator.py
:language: python
+ :dedent: 4
:start-after: [START howto_operator_python]
:end-before: [END howto_operator_python]
@@ -38,6 +39,7 @@ to the Python callable.
.. exampleinclude:: /../../airflow/example_dags/example_python_operator.py
:language: python
+ :dedent: 4
:start-after: [START howto_operator_python_kwargs]
:end-before: [END howto_operator_python_kwargs]
@@ -63,6 +65,7 @@ Python callables inside a new Python virtual environment.
.. exampleinclude:: /../../airflow/example_dags/example_python_operator.py
:language: python
+ :dedent: 4
:start-after: [START howto_operator_python_venv]
:end-before: [END howto_operator_python_venv]
diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index 9324014..3a6b7ce9 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -109,6 +109,7 @@ instantiated from an operator is called a task. The first argument
.. exampleinclude:: /../../airflow/example_dags/tutorial.py
:language: python
+ :dedent: 4
:start-after: [START basic_task]
:end-before: [END basic_task]
@@ -144,6 +145,7 @@ stamp").
.. exampleinclude:: /../../airflow/example_dags/tutorial.py
:language: python
+ :dedent: 4
:start-after: [START jinja_template]
:end-before: [END jinja_template]
@@ -186,6 +188,7 @@ json, yaml.
.. exampleinclude:: /../../airflow/example_dags/tutorial.py
:language: python
+ :dedent: 4
:start-after: [START documentation]
:end-before: [END documentation]
diff --git a/docs/apache-airflow/tutorial_taskflow_api.rst b/docs/apache-airflow/tutorial_taskflow_api.rst
index 6c51f16..e389a4c 100644
--- a/docs/apache-airflow/tutorial_taskflow_api.rst
+++ b/docs/apache-airflow/tutorial_taskflow_api.rst
@@ -69,6 +69,7 @@ as shown below. The function name acts as a unique identifier for the task.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
:language: python
+ :dedent: 4
:start-after: [START extract]
:end-before: [END extract]
@@ -83,6 +84,7 @@ we can move to the main part of the DAG.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
:language: python
+ :dedent: 4
:start-after: [START main_flow]
:end-before: [END main_flow]
@@ -119,6 +121,7 @@ in the middle of the data pipeline. In Airflow 1.x, this task is defined as show
.. exampleinclude:: /../../airflow/example_dags/tutorial_etl_dag.py
:language: python
+ :dedent: 4
:start-after: [START transform_function]
:end-before: [END transform_function]
@@ -130,6 +133,7 @@ Contrasting that with Taskflow API in Airflow 2.0 as shown below.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
:language: python
+ :dedent: 4
:start-after: [START transform]
:end-before: [END transform]
@@ -143,6 +147,7 @@ dependencies specified as shown below.
.. exampleinclude:: /../../airflow/example_dags/tutorial_etl_dag.py
:language: python
+ :dedent: 4
:start-after: [START main_flow]
:end-before: [END main_flow]
@@ -151,6 +156,7 @@ the dependencies as shown below.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
:language: python
+ :dedent: 4
:start-after: [START main_flow]
:end-before: [END main_flow]