You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/07/20 15:50:20 UTC

[airflow] branch main updated: Prefix the dags in dataset example dags with example_dataset_ (#25181)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f0112458e7 Prefix the dags in dataset example dags with example_dataset_ (#25181)
f0112458e7 is described below

commit f0112458e78a9d55323b77b2c444982488e3494b
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jul 20 16:49:54 2022 +0100

    Prefix the dags in dataset example dags with example_dataset_ (#25181)
    
    Having dag1, dag2 etc was not telling much about the different dataset dags
---
 airflow/example_dags/example_datasets.py | 42 +++++++++++++++++---------------
 tests/models/test_taskinstance.py        |  6 ++++-
 2 files changed, 27 insertions(+), 21 deletions(-)

diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py
index ee06e3a760..66a20e4371 100644
--- a/airflow/example_dags/example_datasets.py
+++ b/airflow/example_dags/example_datasets.py
@@ -21,22 +21,24 @@ Notes on usage:
 
 Turn on all the dags.
 
-DAG dag1 should run because it's on a schedule.
+DAG example_dataset_dag1 should run because it's on a schedule.
 
-After dag1 runs, dag3 should be triggered immediately because its only
-dataset dependency is managed by dag1.
+After example_dataset_dag1 runs, example_dataset_dag3_req_dag1 should be triggered immediately
+because its only dataset dependency is managed by example_dataset_dag1.
 
-No other dags should be triggered.  Note that even though dag4 depends on
-the dataset in dag1, it will not be triggered until dag2 runs (and dag2 is
-left with no schedule so that we can trigger it manually).
+No other dags should be triggered.  Note that even though example_dataset_dag4_req_dag1_dag2 depends on
+the dataset in example_dataset_dag1, it will not be triggered until example_dataset_dag2 runs
+(and example_dataset_dag2 is left with no schedule so that we can trigger it manually).
 
-Next, trigger dag2.  After dag2 finishes, dag4 should run.
+Next, trigger example_dataset_dag2.  After example_dataset_dag2 finishes,
+example_dataset_dag4_req_dag1_dag2 should run.
 
-Dags 5 and 6 should not run because they depend on datasets that never get updated.
+Dags example_dataset_dag5_req_dag1_D and example_dataset_dag6_req_DD should not run because they depend on
+datasets that never get updated.
 
-DAG dag7 should skip its only task and never trigger dag8
+DAG example_dataset_dag7 should skip its only task and never trigger example_dataset_dag8_req_dag7
 
-DAG dag9 should fail its only task and never trigger dag10
+DAG example_dataset_dag9 should fail its only task and never trigger example_dataset_dag10_req_dag9
 
 """
 from datetime import datetime
@@ -54,7 +56,7 @@ dag7_dataset = Dataset('s3://dag7/output_1.txt', extra={'hi': 'bye'})
 dag9_dataset = Dataset('s3://dag9/output_1.txt', extra={'hi': 'bye'})
 
 dag1 = DAG(
-    dag_id='dag1',
+    dag_id='example_dataset_dag1',
     catchup=False,
     start_date=datetime(2020, 1, 1),
     schedule_interval='@daily',
@@ -66,7 +68,7 @@ BashOperator(outlets=[dag1_dataset], task_id='upstream_task_1', bash_command="sl
 # [END task_outlet]
 
 with DAG(
-    dag_id='dag2',
+    dag_id='example_dataset_dag2',
     catchup=False,
     start_date=datetime(2020, 1, 1),
     schedule_interval=None,
@@ -80,7 +82,7 @@ with DAG(
 
 # [START dag_dep]
 dag3 = DAG(
-    dag_id='dag3',
+    dag_id='example_dataset_dag3_req_dag1',
     catchup=False,
     start_date=datetime(2020, 1, 1),
     schedule_on=[dag1_dataset],
@@ -96,7 +98,7 @@ BashOperator(
 )
 
 with DAG(
-    dag_id='dag4',
+    dag_id='example_dataset_dag4_req_dag1_dag2',
     catchup=False,
     start_date=datetime(2020, 1, 1),
     schedule_on=[dag1_dataset, dag2_dataset],
@@ -109,7 +111,7 @@ with DAG(
     )
 
 with DAG(
-    dag_id='dag5',
+    dag_id='example_dataset_dag5_req_dag1_D',
     catchup=False,
     start_date=datetime(2020, 1, 1),
     schedule_on=[
@@ -125,7 +127,7 @@ with DAG(
     )
 
 with DAG(
-    dag_id='dag6',
+    dag_id='example_dataset_dag6_req_DD',
     catchup=False,
     start_date=datetime(2020, 1, 1),
     schedule_on=[
@@ -146,7 +148,7 @@ def raise_skip_exc():
 
 
 dag7 = DAG(
-    dag_id='dag7',
+    dag_id='example_dataset_dag7',
     catchup=False,
     start_date=datetime(2020, 1, 1),
     schedule_interval='@daily',
@@ -160,7 +162,7 @@ PythonOperator(
 )
 
 with DAG(
-    dag_id='dag8',
+    dag_id='example_dataset_dag8_req_dag7',
     catchup=False,
     start_date=datetime(2020, 1, 1),
     schedule_on=[dag7_dataset],
@@ -177,7 +179,7 @@ def raise_assertionerror():
 
 
 dag9 = DAG(
-    dag_id='dag9',
+    dag_id='example_dataset_dag9',
     catchup=False,
     start_date=datetime(2020, 1, 1),
     schedule_interval='@daily',
@@ -191,7 +193,7 @@ PythonOperator(
 )
 
 with DAG(
-    dag_id='dag10',
+    dag_id='example_dataset_dag10_req_dag9',
     catchup=False,
     start_date=datetime(2020, 1, 1),
     schedule_on=[dag9_dataset],
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 0594c639ac..a8cbc8a432 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1540,7 +1540,11 @@ class TestTaskInstance:
         # check that one queue record created for each dag that depends on dataset 1
         assert session.query(DatasetDagRunQueue.target_dag_id).filter(
             DatasetTaskRef.dag_id == dag1.dag_id, DatasetTaskRef.task_id == 'upstream_task_1'
-        ).all() == [('dag3',), ('dag4',), ('dag5',)]
+        ).all() == [
+            ('example_dataset_dag3_req_dag1',),
+            ('example_dataset_dag4_req_dag1_dag2',),
+            ('example_dataset_dag5_req_dag1_D',),
+        ]
 
         # check that one event record created for dataset1 and this TI
         assert session.query(Dataset.uri).join(DatasetEvent.dataset).filter(