You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2021/12/10 13:01:39 UTC

[airflow] branch main updated: Only execute TIs of running DagRuns (#20182)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 534df1e  Only execute TIs of running DagRuns (#20182)
534df1e is described below

commit 534df1ee5c5c2fbb82e8b88af3bf0d42f7ee13a6
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Fri Dec 10 14:01:03 2021 +0100

    Only execute TIs of running DagRuns (#20182)
    
    Since we can no longer have TIs without DagRun, we can
    also, stop executing TIs if the DagRun is not in a RUNNING state.
    
    Less work for the Scheduler
---
 airflow/jobs/scheduler_job.py    |  2 +-
 tests/jobs/test_scheduler_job.py | 34 ++++++++++++++++++++++++++++++++++
 2 files changed, 35 insertions(+), 1 deletion(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 9cad91f..51d7b41 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -273,7 +273,7 @@ class SchedulerJob(BaseJob):
         query = (
             session.query(TI)
             .join(TI.dag_run)
-            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state != DagRunState.QUEUED)
+            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
             .join(TI.dag_model)
             .filter(not_(DM.is_paused))
             .filter(TI.state == State.SCHEDULED)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 5f4d854..5e72fee 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -473,6 +473,40 @@ class TestSchedulerJob:
         assert tis[3].key in res_keys
         session.rollback()
 
+    @pytest.mark.parametrize(
+        "state, total_executed_ti",
+        [
+            (DagRunState.SUCCESS, 0),
+            (DagRunState.FAILED, 0),
+            (DagRunState.RUNNING, 2),
+            (DagRunState.QUEUED, 0),
+        ],
+    )
+    def test_find_executable_task_instances_only_running_dagruns(
+        self, state, total_executed_ti, dag_maker, session
+    ):
+        """Test that only task instances of 'running' dagruns are executed"""
+        dag_id = 'SchedulerJobTest.test_find_executable_task_instances_only_running_dagruns'
+        task_id_1 = 'dummy'
+        task_id_2 = 'dummydummy'
+
+        with dag_maker(dag_id=dag_id, session=session):
+            DummyOperator(task_id=task_id_1)
+            DummyOperator(task_id=task_id_2)
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+
+        dr = dag_maker.create_dagrun(state=state)
+
+        tis = dr.task_instances
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        session.flush()
+        assert total_executed_ti == len(res)
+
     def test_find_executable_task_instances_order_execution_date(self, dag_maker):
         """
         Test that task instances follow execution_date order priority. If two dagruns with