You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/07/19 03:43:55 UTC

[airflow] branch main updated: Don't queue dagruns or create dataset events on skipped task instances (#25086)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f0c9ac9da6 Don't queue dagruns or create dataset events on skipped task instances (#25086)
f0c9ac9da6 is described below

commit f0c9ac9da6db3a00668743adc9b55329ec567066
Author: blag <bl...@users.noreply.github.com>
AuthorDate: Mon Jul 18 20:43:42 2022 -0700

    Don't queue dagruns or create dataset events on skipped task instances (#25086)
---
 airflow/example_dags/example_datasets.py | 70 ++++++++++++++++++++++++++++++++
 airflow/models/taskinstance.py           |  3 +-
 tests/models/test_taskinstance.py        | 66 +++++++++++++++++++++++++++++-
 3 files changed, 137 insertions(+), 2 deletions(-)

diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py
index e04479b0f5..ee06e3a760 100644
--- a/airflow/example_dags/example_datasets.py
+++ b/airflow/example_dags/example_datasets.py
@@ -34,16 +34,24 @@ Next, trigger dag2.  After dag2 finishes, dag4 should run.
 
 Dags 5 and 6 should not run because they depend on datasets that never get updated.
 
+DAG dag7 should skip its only task and never trigger dag8
+
+DAG dag9 should fail its only task and never trigger dag10
+
 """
 from datetime import datetime
 
+from airflow.exceptions import AirflowFailException, AirflowSkipException
 from airflow.models import DAG, Dataset
 from airflow.operators.bash import BashOperator
+from airflow.operators.python import PythonOperator
 
 # [START dataset_def]
 dag1_dataset = Dataset('s3://dag1/output_1.txt', extra={'hi': 'bye'})
 # [END dataset_def]
 dag2_dataset = Dataset('s3://dag2/output_1.txt', extra={'hi': 'bye'})
+dag7_dataset = Dataset('s3://dag7/output_1.txt', extra={'hi': 'bye'})
+dag9_dataset = Dataset('s3://dag9/output_1.txt', extra={'hi': 'bye'})
 
 dag1 = DAG(
     dag_id='dag1',
@@ -131,3 +139,65 @@ with DAG(
         outlets=[Dataset('s3://unrelated_task/dataset_other_unknown.txt')],
         bash_command="sleep 5",
     )
+
+
+def raise_skip_exc():
+    raise AirflowSkipException
+
+
+dag7 = DAG(
+    dag_id='dag7',
+    catchup=False,
+    start_date=datetime(2020, 1, 1),
+    schedule_interval='@daily',
+    tags=['upstream-skipping'],
+)
+PythonOperator(
+    task_id='skip_task',
+    outlets=[dag7_dataset],
+    python_callable=raise_skip_exc,
+    dag=dag7,
+)
+
+with DAG(
+    dag_id='dag8',
+    catchup=False,
+    start_date=datetime(2020, 1, 1),
+    schedule_on=[dag7_dataset],
+    tags=['downstream-skipped'],
+) as dag8:
+    BashOperator(
+        task_id='dag8_task',
+        bash_command="sleep 5",
+    )
+
+
+def raise_assertionerror():
+    raise AirflowFailException
+
+
+dag9 = DAG(
+    dag_id='dag9',
+    catchup=False,
+    start_date=datetime(2020, 1, 1),
+    schedule_interval='@daily',
+    tags=['upstream-skipping'],
+)
+PythonOperator(
+    task_id='fail_task',
+    outlets=[dag9_dataset],
+    python_callable=raise_assertionerror,
+    dag=dag9,
+)
+
+with DAG(
+    dag_id='dag10',
+    catchup=False,
+    start_date=datetime(2020, 1, 1),
+    schedule_on=[dag9_dataset],
+    tags=['downstream-failed'],
+) as dag10:
+    BashOperator(
+        task_id='dag10_task',
+        bash_command="sleep 5",
+    )
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 2f48a276fb..bb4eff6faf 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1518,7 +1518,8 @@ class TaskInstance(Base, LoggingMixin):
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
-            self._create_dataset_dag_run_queue_records(session=session)
+            if self.state == TaskInstanceState.SUCCESS:
+                self._create_dataset_dag_run_queue_records(session=session)
             session.commit()
 
     def _create_dataset_dag_run_queue_records(self, *, session: Session) -> None:
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index b1db5dc937..0594c639ac 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1535,7 +1535,7 @@ class TestTaskInstance:
         session.commit()
         ti._run_raw_task()
         ti.refresh_from_db()
-        assert ti.state == State.SUCCESS
+        assert ti.state == TaskInstanceState.SUCCESS
 
         # check that one queue record created for each dag that depends on dataset 1
         assert session.query(DatasetDagRunQueue.target_dag_id).filter(
@@ -1555,6 +1555,70 @@ class TestTaskInstance:
             .count()
         ) == 1
 
+        # Clean up after ourselves
+        db.clear_db_datasets()
+
+    def test_outlet_datasets_failed(self, create_task_instance):
+        """
+        Verify that when we have an outlet dataset on a task, and the task
+        failed, a DatasetDagRunQueue is not logged, and a DatasetEvent is
+        not generated
+        """
+        from airflow.example_dags import example_datasets
+        from airflow.example_dags.example_datasets import dag9
+
+        session = settings.Session()
+        dagbag = DagBag(dag_folder=example_datasets.__file__)
+        dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+        dagbag.sync_to_db(session=session)
+        run_id = str(uuid4())
+        dr = DagRun(dag9.dag_id, run_id=run_id, run_type='anything')
+        session.merge(dr)
+        task = dag9.get_task('fail_task')
+        ti = TaskInstance(task, run_id=run_id)
+        session.merge(ti)
+        session.commit()
+        with pytest.raises(AirflowFailException):
+            ti._run_raw_task()
+        ti.refresh_from_db()
+        assert ti.state == TaskInstanceState.FAILED
+
+        # check that no dagruns were queued
+        assert session.query(DatasetDagRunQueue).count() == 0
+
+        # check that no dataset events were generated
+        assert session.query(DatasetEvent).count() == 0
+
+    def test_outlet_datasets_skipped(self, create_task_instance):
+        """
+        Verify that when we have an outlet dataset on a task, and the task
+        is skipped, a DatasetDagRunQueue is not logged, and a DatasetEvent is
+        not generated
+        """
+        from airflow.example_dags import example_datasets
+        from airflow.example_dags.example_datasets import dag7
+
+        session = settings.Session()
+        dagbag = DagBag(dag_folder=example_datasets.__file__)
+        dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+        dagbag.sync_to_db(session=session)
+        run_id = str(uuid4())
+        dr = DagRun(dag7.dag_id, run_id=run_id, run_type='anything')
+        session.merge(dr)
+        task = dag7.get_task('skip_task')
+        ti = TaskInstance(task, run_id=run_id)
+        session.merge(ti)
+        session.commit()
+        ti._run_raw_task()
+        ti.refresh_from_db()
+        assert ti.state == TaskInstanceState.SKIPPED
+
+        # check that no dagruns were queued
+        assert session.query(DatasetDagRunQueue).count() == 0
+
+        # check that no dataset events were generated
+        assert session.query(DatasetEvent).count() == 0
+
     @staticmethod
     def _test_previous_dates_setup(
         schedule_interval: Union[str, datetime.timedelta, None],