You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2021/12/10 13:01:39 UTC
[airflow] branch main updated: Only execute TIs of running DagRuns (#20182)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 534df1e Only execute TIs of running DagRuns (#20182)
534df1e is described below
commit 534df1ee5c5c2fbb82e8b88af3bf0d42f7ee13a6
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Fri Dec 10 14:01:03 2021 +0100
Only execute TIs of running DagRuns (#20182)
Since we can no longer have TIs without DagRun, we can
also, stop executing TIs if the DagRun is not in a RUNNING state.
Less work for the Scheduler
---
airflow/jobs/scheduler_job.py | 2 +-
tests/jobs/test_scheduler_job.py | 34 ++++++++++++++++++++++++++++++++++
2 files changed, 35 insertions(+), 1 deletion(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 9cad91f..51d7b41 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -273,7 +273,7 @@ class SchedulerJob(BaseJob):
query = (
session.query(TI)
.join(TI.dag_run)
- .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state != DagRunState.QUEUED)
+ .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
.join(TI.dag_model)
.filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 5f4d854..5e72fee 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -473,6 +473,40 @@ class TestSchedulerJob:
assert tis[3].key in res_keys
session.rollback()
+ @pytest.mark.parametrize(
+ "state, total_executed_ti",
+ [
+ (DagRunState.SUCCESS, 0),
+ (DagRunState.FAILED, 0),
+ (DagRunState.RUNNING, 2),
+ (DagRunState.QUEUED, 0),
+ ],
+ )
+ def test_find_executable_task_instances_only_running_dagruns(
+ self, state, total_executed_ti, dag_maker, session
+ ):
+ """Test that only task instances of 'running' dagruns are executed"""
+ dag_id = 'SchedulerJobTest.test_find_executable_task_instances_only_running_dagruns'
+ task_id_1 = 'dummy'
+ task_id_2 = 'dummydummy'
+
+ with dag_maker(dag_id=dag_id, session=session):
+ DummyOperator(task_id=task_id_1)
+ DummyOperator(task_id=task_id_2)
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+
+ dr = dag_maker.create_dagrun(state=state)
+
+ tis = dr.task_instances
+ for ti in tis:
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.flush()
+ res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+ session.flush()
+ assert total_executed_ti == len(res)
+
def test_find_executable_task_instances_order_execution_date(self, dag_maker):
"""
Test that task instances follow execution_date order priority. If two dagruns with