You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/15 14:33:53 UTC

[GitHub] [airflow] jedcunningham commented on a diff in pull request #25086: Don't queue dagruns or create dataset events on skipped task instances

jedcunningham commented on code in PR #25086:
URL: https://github.com/apache/airflow/pull/25086#discussion_r922222894


##########
airflow/models/taskinstance.py:
##########
@@ -1518,7 +1518,8 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
-            self._create_dataset_dag_run_queue_records(session=session)
+            if self.state != State.SKIPPED:

Review Comment:
   Wait, should we test if it's in State.SUCCESS instead?



##########
airflow/models/taskinstance.py:
##########
@@ -1518,7 +1518,8 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
-            self._create_dataset_dag_run_queue_records(session=session)
+            if self.state != State.SKIPPED:

Review Comment:
   Also, you should be using `TaskInstanceState` instead of `State`.



##########
tests/models/test_taskinstance.py:
##########
@@ -1521,6 +1521,46 @@ def test_outlet_datasets(self, create_task_instance):
             .count()
         ) == 1
 
+    def test_outlet_datasets_skipped(self, create_task_instance):
+        """
+        Verify that when we have an outlet dataset on a task, and the task
+        is skipped, a DatasetDagRunQueue is not logged, and a DatasetEvent is
+        not generated
+        """
+        from airflow.example_dags import example_datasets
+        from airflow.example_dags.example_datasets import dag7, dag8
+
+        session = settings.Session()
+        dagbag = DagBag(dag_folder=example_datasets.__file__)
+        dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+        dagbag.sync_to_db(session=session)
+        run_id = str(uuid4())
+        dr = DagRun(dag7.dag_id, run_id=run_id, run_type='anything')
+        session.merge(dr)
+        task = dag7.get_task('skip_task')
+        ti = TaskInstance(task, run_id=run_id)
+        session.merge(ti)
+        session.commit()
+        ti._run_raw_task()
+        ti.refresh_from_db()
+        assert ti.state == State.SKIPPED
+
+        # check that no queue records exist for dataset 8
+        assert (

Review Comment:
   We probably want to make sure the table is empty instead. If we need to be this selective on the queries, we should get [clear_db_datasets](https://github.com/apache/airflow/blob/4efb1884e5a86ec65a58890e290fd89822e65c05/tests/test_utils/db.py#L56) in the mix (soon it'll also purge the events too).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org