You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/27 10:02:50 UTC
[airflow] branch main updated: Refactor test DAGs to unclutter example DAGs (#25327)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 20fe314de3 Refactor test DAGs to unclutter example DAGs (#25327)
20fe314de3 is described below
commit 20fe314de36c28cc17d47cfe374cc05bd8606f16
Author: blag <bl...@users.noreply.github.com>
AuthorDate: Wed Jul 27 03:02:36 2022 -0700
Refactor test DAGs to unclutter example DAGs (#25327)
---
airflow/example_dags/example_datasets.py | 71 -------------------------
tests/dags/test_datasets.py | 89 ++++++++++++++++++++++++++++++++
tests/models/test_taskinstance.py | 20 +++----
3 files changed, 99 insertions(+), 81 deletions(-)
diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py
index 66a20e4371..2068d66a5c 100644
--- a/airflow/example_dags/example_datasets.py
+++ b/airflow/example_dags/example_datasets.py
@@ -35,25 +35,16 @@ example_dataset_dag4_req_dag1_dag2 should run.
Dags example_dataset_dag5_req_dag1_D and example_dataset_dag6_req_DD should not run because they depend on
datasets that never get updated.
-
-DAG example_dataset_dag7 should skip its only task and never trigger example_dataset_dag8_req_dag7
-
-DAG example_dataset_dag9 should fail its only task and never trigger example_dataset_dag10_req_dag9
-
"""
from datetime import datetime
-from airflow.exceptions import AirflowFailException, AirflowSkipException
from airflow.models import DAG, Dataset
from airflow.operators.bash import BashOperator
-from airflow.operators.python import PythonOperator
# [START dataset_def]
dag1_dataset = Dataset('s3://dag1/output_1.txt', extra={'hi': 'bye'})
# [END dataset_def]
dag2_dataset = Dataset('s3://dag2/output_1.txt', extra={'hi': 'bye'})
-dag7_dataset = Dataset('s3://dag7/output_1.txt', extra={'hi': 'bye'})
-dag9_dataset = Dataset('s3://dag9/output_1.txt', extra={'hi': 'bye'})
dag1 = DAG(
dag_id='example_dataset_dag1',
@@ -141,65 +132,3 @@ with DAG(
outlets=[Dataset('s3://unrelated_task/dataset_other_unknown.txt')],
bash_command="sleep 5",
)
-
-
-def raise_skip_exc():
- raise AirflowSkipException
-
-
-dag7 = DAG(
- dag_id='example_dataset_dag7',
- catchup=False,
- start_date=datetime(2020, 1, 1),
- schedule_interval='@daily',
- tags=['upstream-skipping'],
-)
-PythonOperator(
- task_id='skip_task',
- outlets=[dag7_dataset],
- python_callable=raise_skip_exc,
- dag=dag7,
-)
-
-with DAG(
- dag_id='example_dataset_dag8_req_dag7',
- catchup=False,
- start_date=datetime(2020, 1, 1),
- schedule_on=[dag7_dataset],
- tags=['downstream-skipped'],
-) as dag8:
- BashOperator(
- task_id='dag8_task',
- bash_command="sleep 5",
- )
-
-
-def raise_assertionerror():
- raise AirflowFailException
-
-
-dag9 = DAG(
- dag_id='example_dataset_dag9',
- catchup=False,
- start_date=datetime(2020, 1, 1),
- schedule_interval='@daily',
- tags=['upstream-skipping'],
-)
-PythonOperator(
- task_id='fail_task',
- outlets=[dag9_dataset],
- python_callable=raise_assertionerror,
- dag=dag9,
-)
-
-with DAG(
- dag_id='example_dataset_dag10_req_dag9',
- catchup=False,
- start_date=datetime(2020, 1, 1),
- schedule_on=[dag9_dataset],
- tags=['downstream-failed'],
-) as dag10:
- BashOperator(
- task_id='dag10_task',
- bash_command="sleep 5",
- )
diff --git a/tests/dags/test_datasets.py b/tests/dags/test_datasets.py
new file mode 100644
index 0000000000..f228270b91
--- /dev/null
+++ b/tests/dags/test_datasets.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime
+
+from airflow.exceptions import AirflowFailException, AirflowSkipException
+from airflow.models import DAG, Dataset
+from airflow.operators.bash import BashOperator
+from airflow.operators.python import PythonOperator
+
+skip_task_dag_dataset = Dataset('s3://dag_with_skip_task/output_1.txt', extra={'hi': 'bye'})
+fail_task_dag_dataset = Dataset('s3://dag_with_fail_task/output_1.txt', extra={'hi': 'bye'})
+
+
+def raise_skip_exc():
+ raise AirflowSkipException
+
+
+dag_with_skip_task = DAG(
+ dag_id='dag_with_skip_task',
+ catchup=False,
+ start_date=datetime(2020, 1, 1),
+ schedule_interval='@daily',
+ tags=['upstream-skipping'],
+)
+PythonOperator(
+ task_id='skip_task',
+ outlets=[skip_task_dag_dataset],
+ python_callable=raise_skip_exc,
+ dag=dag_with_skip_task,
+)
+
+with DAG(
+ dag_id='dag_that_follows_dag_with_skip',
+ catchup=False,
+ start_date=datetime(2020, 1, 1),
+ schedule_on=[skip_task_dag_dataset],
+ tags=['downstream-skipped'],
+) as dag_that_follows_dag_with_skip:
+ BashOperator(
+ task_id='dag_that_follows_dag_with_skip_task',
+ bash_command="sleep 5",
+ )
+
+
+def raise_fail_exc():
+ raise AirflowFailException
+
+
+dag_with_fail_task = DAG(
+ dag_id='dag_with_fail_task',
+ catchup=False,
+ start_date=datetime(2020, 1, 1),
+ schedule_interval='@daily',
+ tags=['upstream-skipping'],
+)
+PythonOperator(
+ task_id='fail_task',
+ outlets=[fail_task_dag_dataset],
+ python_callable=raise_fail_exc,
+ dag=dag_with_fail_task,
+)
+
+with DAG(
+ dag_id='dag_that_follows_dag_with_fail',
+ catchup=False,
+ start_date=datetime(2020, 1, 1),
+ schedule_on=[fail_task_dag_dataset],
+ tags=['downstream-failed'],
+) as dag_that_follows_dag_with_fail:
+ BashOperator(
+ task_id='dag_that_follows_dag_with_fail_task',
+ bash_command="sleep 5",
+ )
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 87ac9b69cc..21badfe8cb 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1573,17 +1573,17 @@ class TestTaskInstance:
failed, a DatasetDagRunQueue is not logged, and a DatasetEvent is
not generated
"""
- from airflow.example_dags import example_datasets
- from airflow.example_dags.example_datasets import dag9
+ from tests.dags import test_datasets
+ from tests.dags.test_datasets import dag_with_fail_task
session = settings.Session()
- dagbag = DagBag(dag_folder=example_datasets.__file__)
+ dagbag = DagBag(dag_folder=test_datasets.__file__)
dagbag.collect_dags(only_if_updated=False, safe_mode=False)
dagbag.sync_to_db(session=session)
run_id = str(uuid4())
- dr = DagRun(dag9.dag_id, run_id=run_id, run_type='anything')
+ dr = DagRun(dag_with_fail_task.dag_id, run_id=run_id, run_type='anything')
session.merge(dr)
- task = dag9.get_task('fail_task')
+ task = dag_with_fail_task.get_task('fail_task')
ti = TaskInstance(task, run_id=run_id)
session.merge(ti)
session.commit()
@@ -1604,17 +1604,17 @@ class TestTaskInstance:
is skipped, a DatasetDagRunQueue is not logged, and a DatasetEvent is
not generated
"""
- from airflow.example_dags import example_datasets
- from airflow.example_dags.example_datasets import dag7
+ from tests.dags import test_datasets
+ from tests.dags.test_datasets import dag_with_skip_task
session = settings.Session()
- dagbag = DagBag(dag_folder=example_datasets.__file__)
+ dagbag = DagBag(dag_folder=test_datasets.__file__)
dagbag.collect_dags(only_if_updated=False, safe_mode=False)
dagbag.sync_to_db(session=session)
run_id = str(uuid4())
- dr = DagRun(dag7.dag_id, run_id=run_id, run_type='anything')
+ dr = DagRun(dag_with_skip_task.dag_id, run_id=run_id, run_type='anything')
session.merge(dr)
- task = dag7.get_task('skip_task')
+ task = dag_with_skip_task.get_task('skip_task')
ti = TaskInstance(task, run_id=run_id)
session.merge(ti)
session.commit()