You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/07/19 03:43:55 UTC
[airflow] branch main updated: Don't queue dagruns or create dataset events on skipped task instances (#25086)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f0c9ac9da6 Don't queue dagruns or create dataset events on skipped task instances (#25086)
f0c9ac9da6 is described below
commit f0c9ac9da6db3a00668743adc9b55329ec567066
Author: blag <bl...@users.noreply.github.com>
AuthorDate: Mon Jul 18 20:43:42 2022 -0700
Don't queue dagruns or create dataset events on skipped task instances (#25086)
---
airflow/example_dags/example_datasets.py | 70 ++++++++++++++++++++++++++++++++
airflow/models/taskinstance.py | 3 +-
tests/models/test_taskinstance.py | 66 +++++++++++++++++++++++++++++-
3 files changed, 137 insertions(+), 2 deletions(-)
diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py
index e04479b0f5..ee06e3a760 100644
--- a/airflow/example_dags/example_datasets.py
+++ b/airflow/example_dags/example_datasets.py
@@ -34,16 +34,24 @@ Next, trigger dag2. After dag2 finishes, dag4 should run.
Dags 5 and 6 should not run because they depend on datasets that never get updated.
+DAG dag7 should skip its only task and never trigger dag8
+
+DAG dag9 should fail its only task and never trigger dag10
+
"""
from datetime import datetime
+from airflow.exceptions import AirflowFailException, AirflowSkipException
from airflow.models import DAG, Dataset
from airflow.operators.bash import BashOperator
+from airflow.operators.python import PythonOperator
# [START dataset_def]
dag1_dataset = Dataset('s3://dag1/output_1.txt', extra={'hi': 'bye'})
# [END dataset_def]
dag2_dataset = Dataset('s3://dag2/output_1.txt', extra={'hi': 'bye'})
+dag7_dataset = Dataset('s3://dag7/output_1.txt', extra={'hi': 'bye'})
+dag9_dataset = Dataset('s3://dag9/output_1.txt', extra={'hi': 'bye'})
dag1 = DAG(
dag_id='dag1',
@@ -131,3 +139,65 @@ with DAG(
outlets=[Dataset('s3://unrelated_task/dataset_other_unknown.txt')],
bash_command="sleep 5",
)
+
+
+def raise_skip_exc():
+ raise AirflowSkipException
+
+
+dag7 = DAG(
+ dag_id='dag7',
+ catchup=False,
+ start_date=datetime(2020, 1, 1),
+ schedule_interval='@daily',
+ tags=['upstream-skipping'],
+)
+PythonOperator(
+ task_id='skip_task',
+ outlets=[dag7_dataset],
+ python_callable=raise_skip_exc,
+ dag=dag7,
+)
+
+with DAG(
+ dag_id='dag8',
+ catchup=False,
+ start_date=datetime(2020, 1, 1),
+ schedule_on=[dag7_dataset],
+ tags=['downstream-skipped'],
+) as dag8:
+ BashOperator(
+ task_id='dag8_task',
+ bash_command="sleep 5",
+ )
+
+
+def raise_assertionerror():
+ raise AirflowFailException
+
+
+dag9 = DAG(
+ dag_id='dag9',
+ catchup=False,
+ start_date=datetime(2020, 1, 1),
+ schedule_interval='@daily',
+ tags=['upstream-skipping'],
+)
+PythonOperator(
+ task_id='fail_task',
+ outlets=[dag9_dataset],
+ python_callable=raise_assertionerror,
+ dag=dag9,
+)
+
+with DAG(
+ dag_id='dag10',
+ catchup=False,
+ start_date=datetime(2020, 1, 1),
+ schedule_on=[dag9_dataset],
+ tags=['downstream-failed'],
+) as dag10:
+ BashOperator(
+ task_id='dag10_task',
+ bash_command="sleep 5",
+ )
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 2f48a276fb..bb4eff6faf 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1518,7 +1518,8 @@ class TaskInstance(Base, LoggingMixin):
if not test_mode:
session.add(Log(self.state, self))
session.merge(self)
- self._create_dataset_dag_run_queue_records(session=session)
+ if self.state == TaskInstanceState.SUCCESS:
+ self._create_dataset_dag_run_queue_records(session=session)
session.commit()
def _create_dataset_dag_run_queue_records(self, *, session: Session) -> None:
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index b1db5dc937..0594c639ac 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1535,7 +1535,7 @@ class TestTaskInstance:
session.commit()
ti._run_raw_task()
ti.refresh_from_db()
- assert ti.state == State.SUCCESS
+ assert ti.state == TaskInstanceState.SUCCESS
# check that one queue record created for each dag that depends on dataset 1
assert session.query(DatasetDagRunQueue.target_dag_id).filter(
@@ -1555,6 +1555,70 @@ class TestTaskInstance:
.count()
) == 1
+ # Clean up after ourselves
+ db.clear_db_datasets()
+
+ def test_outlet_datasets_failed(self, create_task_instance):
+ """
+ Verify that when we have an outlet dataset on a task, and the task
+ failed, a DatasetDagRunQueue is not logged, and a DatasetEvent is
+ not generated
+ """
+ from airflow.example_dags import example_datasets
+ from airflow.example_dags.example_datasets import dag9
+
+ session = settings.Session()
+ dagbag = DagBag(dag_folder=example_datasets.__file__)
+ dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+ dagbag.sync_to_db(session=session)
+ run_id = str(uuid4())
+ dr = DagRun(dag9.dag_id, run_id=run_id, run_type='anything')
+ session.merge(dr)
+ task = dag9.get_task('fail_task')
+ ti = TaskInstance(task, run_id=run_id)
+ session.merge(ti)
+ session.commit()
+ with pytest.raises(AirflowFailException):
+ ti._run_raw_task()
+ ti.refresh_from_db()
+ assert ti.state == TaskInstanceState.FAILED
+
+ # check that no dagruns were queued
+ assert session.query(DatasetDagRunQueue).count() == 0
+
+ # check that no dataset events were generated
+ assert session.query(DatasetEvent).count() == 0
+
+ def test_outlet_datasets_skipped(self, create_task_instance):
+ """
+ Verify that when we have an outlet dataset on a task, and the task
+ is skipped, a DatasetDagRunQueue is not logged, and a DatasetEvent is
+ not generated
+ """
+ from airflow.example_dags import example_datasets
+ from airflow.example_dags.example_datasets import dag7
+
+ session = settings.Session()
+ dagbag = DagBag(dag_folder=example_datasets.__file__)
+ dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+ dagbag.sync_to_db(session=session)
+ run_id = str(uuid4())
+ dr = DagRun(dag7.dag_id, run_id=run_id, run_type='anything')
+ session.merge(dr)
+ task = dag7.get_task('skip_task')
+ ti = TaskInstance(task, run_id=run_id)
+ session.merge(ti)
+ session.commit()
+ ti._run_raw_task()
+ ti.refresh_from_db()
+ assert ti.state == TaskInstanceState.SKIPPED
+
+ # check that no dagruns were queued
+ assert session.query(DatasetDagRunQueue).count() == 0
+
+ # check that no dataset events were generated
+ assert session.query(DatasetEvent).count() == 0
+
@staticmethod
def _test_previous_dates_setup(
schedule_interval: Union[str, datetime.timedelta, None],