You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/07/20 15:50:20 UTC
[airflow] branch main updated: Prefix the dags in dataset example dags with example_dataset_ (#25181)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f0112458e7 Prefix the dags in dataset example dags with example_dataset_ (#25181)
f0112458e7 is described below
commit f0112458e78a9d55323b77b2c444982488e3494b
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jul 20 16:49:54 2022 +0100
Prefix the dags in dataset example dags with example_dataset_ (#25181)
Having dag1, dag2 etc was not telling much about the different dataset dags
---
airflow/example_dags/example_datasets.py | 42 +++++++++++++++++---------------
tests/models/test_taskinstance.py | 6 ++++-
2 files changed, 27 insertions(+), 21 deletions(-)
diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py
index ee06e3a760..66a20e4371 100644
--- a/airflow/example_dags/example_datasets.py
+++ b/airflow/example_dags/example_datasets.py
@@ -21,22 +21,24 @@ Notes on usage:
Turn on all the dags.
-DAG dag1 should run because it's on a schedule.
+DAG example_dataset_dag1 should run because it's on a schedule.
-After dag1 runs, dag3 should be triggered immediately because its only
-dataset dependency is managed by dag1.
+After example_dataset_dag1 runs, example_dataset_dag3_req_dag1 should be triggered immediately
+because its only dataset dependency is managed by example_dataset_dag1.
-No other dags should be triggered. Note that even though dag4 depends on
-the dataset in dag1, it will not be triggered until dag2 runs (and dag2 is
-left with no schedule so that we can trigger it manually).
+No other dags should be triggered. Note that even though example_dataset_dag4_req_dag1_dag2 depends on
+the dataset in example_dataset_dag1, it will not be triggered until example_dataset_dag2 runs
+(and example_dataset_dag2 is left with no schedule so that we can trigger it manually).
-Next, trigger dag2. After dag2 finishes, dag4 should run.
+Next, trigger example_dataset_dag2. After example_dataset_dag2 finishes,
+example_dataset_dag4_req_dag1_dag2 should run.
-Dags 5 and 6 should not run because they depend on datasets that never get updated.
+Dags example_dataset_dag5_req_dag1_D and example_dataset_dag6_req_DD should not run because they depend on
+datasets that never get updated.
-DAG dag7 should skip its only task and never trigger dag8
+DAG example_dataset_dag7 should skip its only task and never trigger example_dataset_dag8_req_dag7
-DAG dag9 should fail its only task and never trigger dag10
+DAG example_dataset_dag9 should fail its only task and never trigger example_dataset_dag10_req_dag9
"""
from datetime import datetime
@@ -54,7 +56,7 @@ dag7_dataset = Dataset('s3://dag7/output_1.txt', extra={'hi': 'bye'})
dag9_dataset = Dataset('s3://dag9/output_1.txt', extra={'hi': 'bye'})
dag1 = DAG(
- dag_id='dag1',
+ dag_id='example_dataset_dag1',
catchup=False,
start_date=datetime(2020, 1, 1),
schedule_interval='@daily',
@@ -66,7 +68,7 @@ BashOperator(outlets=[dag1_dataset], task_id='upstream_task_1', bash_command="sl
# [END task_outlet]
with DAG(
- dag_id='dag2',
+ dag_id='example_dataset_dag2',
catchup=False,
start_date=datetime(2020, 1, 1),
schedule_interval=None,
@@ -80,7 +82,7 @@ with DAG(
# [START dag_dep]
dag3 = DAG(
- dag_id='dag3',
+ dag_id='example_dataset_dag3_req_dag1',
catchup=False,
start_date=datetime(2020, 1, 1),
schedule_on=[dag1_dataset],
@@ -96,7 +98,7 @@ BashOperator(
)
with DAG(
- dag_id='dag4',
+ dag_id='example_dataset_dag4_req_dag1_dag2',
catchup=False,
start_date=datetime(2020, 1, 1),
schedule_on=[dag1_dataset, dag2_dataset],
@@ -109,7 +111,7 @@ with DAG(
)
with DAG(
- dag_id='dag5',
+ dag_id='example_dataset_dag5_req_dag1_D',
catchup=False,
start_date=datetime(2020, 1, 1),
schedule_on=[
@@ -125,7 +127,7 @@ with DAG(
)
with DAG(
- dag_id='dag6',
+ dag_id='example_dataset_dag6_req_DD',
catchup=False,
start_date=datetime(2020, 1, 1),
schedule_on=[
@@ -146,7 +148,7 @@ def raise_skip_exc():
dag7 = DAG(
- dag_id='dag7',
+ dag_id='example_dataset_dag7',
catchup=False,
start_date=datetime(2020, 1, 1),
schedule_interval='@daily',
@@ -160,7 +162,7 @@ PythonOperator(
)
with DAG(
- dag_id='dag8',
+ dag_id='example_dataset_dag8_req_dag7',
catchup=False,
start_date=datetime(2020, 1, 1),
schedule_on=[dag7_dataset],
@@ -177,7 +179,7 @@ def raise_assertionerror():
dag9 = DAG(
- dag_id='dag9',
+ dag_id='example_dataset_dag9',
catchup=False,
start_date=datetime(2020, 1, 1),
schedule_interval='@daily',
@@ -191,7 +193,7 @@ PythonOperator(
)
with DAG(
- dag_id='dag10',
+ dag_id='example_dataset_dag10_req_dag9',
catchup=False,
start_date=datetime(2020, 1, 1),
schedule_on=[dag9_dataset],
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 0594c639ac..a8cbc8a432 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1540,7 +1540,11 @@ class TestTaskInstance:
# check that one queue record created for each dag that depends on dataset 1
assert session.query(DatasetDagRunQueue.target_dag_id).filter(
DatasetTaskRef.dag_id == dag1.dag_id, DatasetTaskRef.task_id == 'upstream_task_1'
- ).all() == [('dag3',), ('dag4',), ('dag5',)]
+ ).all() == [
+ ('example_dataset_dag3_req_dag1',),
+ ('example_dataset_dag4_req_dag1_dag2',),
+ ('example_dataset_dag5_req_dag1_D',),
+ ]
# check that one event record created for dataset1 and this TI
assert session.query(Dataset.uri).join(DatasetEvent.dataset).filter(