You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/27 10:02:50 UTC

[airflow] branch main updated: Refactor test DAGs to unclutter example DAGs (#25327)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 20fe314de3 Refactor test DAGs to unclutter example DAGs (#25327)
20fe314de3 is described below

commit 20fe314de36c28cc17d47cfe374cc05bd8606f16
Author: blag <bl...@users.noreply.github.com>
AuthorDate: Wed Jul 27 03:02:36 2022 -0700

    Refactor test DAGs to unclutter example DAGs (#25327)
---
 airflow/example_dags/example_datasets.py | 71 -------------------------
 tests/dags/test_datasets.py              | 89 ++++++++++++++++++++++++++++++++
 tests/models/test_taskinstance.py        | 20 +++----
 3 files changed, 99 insertions(+), 81 deletions(-)

diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py
index 66a20e4371..2068d66a5c 100644
--- a/airflow/example_dags/example_datasets.py
+++ b/airflow/example_dags/example_datasets.py
@@ -35,25 +35,16 @@ example_dataset_dag4_req_dag1_dag2 should run.
 
 Dags example_dataset_dag5_req_dag1_D and example_dataset_dag6_req_DD should not run because they depend on
 datasets that never get updated.
-
-DAG example_dataset_dag7 should skip its only task and never trigger example_dataset_dag8_req_dag7
-
-DAG example_dataset_dag9 should fail its only task and never trigger example_dataset_dag10_req_dag9
-
 """
 from datetime import datetime
 
-from airflow.exceptions import AirflowFailException, AirflowSkipException
 from airflow.models import DAG, Dataset
 from airflow.operators.bash import BashOperator
-from airflow.operators.python import PythonOperator
 
 # [START dataset_def]
 dag1_dataset = Dataset('s3://dag1/output_1.txt', extra={'hi': 'bye'})
 # [END dataset_def]
 dag2_dataset = Dataset('s3://dag2/output_1.txt', extra={'hi': 'bye'})
-dag7_dataset = Dataset('s3://dag7/output_1.txt', extra={'hi': 'bye'})
-dag9_dataset = Dataset('s3://dag9/output_1.txt', extra={'hi': 'bye'})
 
 dag1 = DAG(
     dag_id='example_dataset_dag1',
@@ -141,65 +132,3 @@ with DAG(
         outlets=[Dataset('s3://unrelated_task/dataset_other_unknown.txt')],
         bash_command="sleep 5",
     )
-
-
-def raise_skip_exc():
-    raise AirflowSkipException
-
-
-dag7 = DAG(
-    dag_id='example_dataset_dag7',
-    catchup=False,
-    start_date=datetime(2020, 1, 1),
-    schedule_interval='@daily',
-    tags=['upstream-skipping'],
-)
-PythonOperator(
-    task_id='skip_task',
-    outlets=[dag7_dataset],
-    python_callable=raise_skip_exc,
-    dag=dag7,
-)
-
-with DAG(
-    dag_id='example_dataset_dag8_req_dag7',
-    catchup=False,
-    start_date=datetime(2020, 1, 1),
-    schedule_on=[dag7_dataset],
-    tags=['downstream-skipped'],
-) as dag8:
-    BashOperator(
-        task_id='dag8_task',
-        bash_command="sleep 5",
-    )
-
-
-def raise_assertionerror():
-    raise AirflowFailException
-
-
-dag9 = DAG(
-    dag_id='example_dataset_dag9',
-    catchup=False,
-    start_date=datetime(2020, 1, 1),
-    schedule_interval='@daily',
-    tags=['upstream-skipping'],
-)
-PythonOperator(
-    task_id='fail_task',
-    outlets=[dag9_dataset],
-    python_callable=raise_assertionerror,
-    dag=dag9,
-)
-
-with DAG(
-    dag_id='example_dataset_dag10_req_dag9',
-    catchup=False,
-    start_date=datetime(2020, 1, 1),
-    schedule_on=[dag9_dataset],
-    tags=['downstream-failed'],
-) as dag10:
-    BashOperator(
-        task_id='dag10_task',
-        bash_command="sleep 5",
-    )
diff --git a/tests/dags/test_datasets.py b/tests/dags/test_datasets.py
new file mode 100644
index 0000000000..f228270b91
--- /dev/null
+++ b/tests/dags/test_datasets.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime
+
+from airflow.exceptions import AirflowFailException, AirflowSkipException
+from airflow.models import DAG, Dataset
+from airflow.operators.bash import BashOperator
+from airflow.operators.python import PythonOperator
+
+skip_task_dag_dataset = Dataset('s3://dag_with_skip_task/output_1.txt', extra={'hi': 'bye'})
+fail_task_dag_dataset = Dataset('s3://dag_with_fail_task/output_1.txt', extra={'hi': 'bye'})
+
+
+def raise_skip_exc():
+    raise AirflowSkipException
+
+
+dag_with_skip_task = DAG(
+    dag_id='dag_with_skip_task',
+    catchup=False,
+    start_date=datetime(2020, 1, 1),
+    schedule_interval='@daily',
+    tags=['upstream-skipping'],
+)
+PythonOperator(
+    task_id='skip_task',
+    outlets=[skip_task_dag_dataset],
+    python_callable=raise_skip_exc,
+    dag=dag_with_skip_task,
+)
+
+with DAG(
+    dag_id='dag_that_follows_dag_with_skip',
+    catchup=False,
+    start_date=datetime(2020, 1, 1),
+    schedule_on=[skip_task_dag_dataset],
+    tags=['downstream-skipped'],
+) as dag_that_follows_dag_with_skip:
+    BashOperator(
+        task_id='dag_that_follows_dag_with_skip_task',
+        bash_command="sleep 5",
+    )
+
+
+def raise_fail_exc():
+    raise AirflowFailException
+
+
+dag_with_fail_task = DAG(
+    dag_id='dag_with_fail_task',
+    catchup=False,
+    start_date=datetime(2020, 1, 1),
+    schedule_interval='@daily',
+    tags=['upstream-skipping'],
+)
+PythonOperator(
+    task_id='fail_task',
+    outlets=[fail_task_dag_dataset],
+    python_callable=raise_fail_exc,
+    dag=dag_with_fail_task,
+)
+
+with DAG(
+    dag_id='dag_that_follows_dag_with_fail',
+    catchup=False,
+    start_date=datetime(2020, 1, 1),
+    schedule_on=[fail_task_dag_dataset],
+    tags=['downstream-failed'],
+) as dag_that_follows_dag_with_fail:
+    BashOperator(
+        task_id='dag_that_follows_dag_with_fail_task',
+        bash_command="sleep 5",
+    )
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 87ac9b69cc..21badfe8cb 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1573,17 +1573,17 @@ class TestTaskInstance:
         failed, a DatasetDagRunQueue is not logged, and a DatasetEvent is
         not generated
         """
-        from airflow.example_dags import example_datasets
-        from airflow.example_dags.example_datasets import dag9
+        from tests.dags import test_datasets
+        from tests.dags.test_datasets import dag_with_fail_task
 
         session = settings.Session()
-        dagbag = DagBag(dag_folder=example_datasets.__file__)
+        dagbag = DagBag(dag_folder=test_datasets.__file__)
         dagbag.collect_dags(only_if_updated=False, safe_mode=False)
         dagbag.sync_to_db(session=session)
         run_id = str(uuid4())
-        dr = DagRun(dag9.dag_id, run_id=run_id, run_type='anything')
+        dr = DagRun(dag_with_fail_task.dag_id, run_id=run_id, run_type='anything')
         session.merge(dr)
-        task = dag9.get_task('fail_task')
+        task = dag_with_fail_task.get_task('fail_task')
         ti = TaskInstance(task, run_id=run_id)
         session.merge(ti)
         session.commit()
@@ -1604,17 +1604,17 @@ class TestTaskInstance:
         is skipped, a DatasetDagRunQueue is not logged, and a DatasetEvent is
         not generated
         """
-        from airflow.example_dags import example_datasets
-        from airflow.example_dags.example_datasets import dag7
+        from tests.dags import test_datasets
+        from tests.dags.test_datasets import dag_with_skip_task
 
         session = settings.Session()
-        dagbag = DagBag(dag_folder=example_datasets.__file__)
+        dagbag = DagBag(dag_folder=test_datasets.__file__)
         dagbag.collect_dags(only_if_updated=False, safe_mode=False)
         dagbag.sync_to_db(session=session)
         run_id = str(uuid4())
-        dr = DagRun(dag7.dag_id, run_id=run_id, run_type='anything')
+        dr = DagRun(dag_with_skip_task.dag_id, run_id=run_id, run_type='anything')
         session.merge(dr)
-        task = dag7.get_task('skip_task')
+        task = dag_with_skip_task.get_task('skip_task')
         ti = TaskInstance(task, run_id=run_id)
         session.merge(ti)
         session.commit()