You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/29 11:14:39 UTC
[airflow] 02/02: Scheduler: Remove TIs from starved pools from the
critical path. (#14476)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8a2a33ae548db60cff46f9a0618d5489c7a83509
Author: Benoit Person <be...@gmail.com>
AuthorDate: Mon Mar 29 11:13:27 2021 +0000
Scheduler: Remove TIs from starved pools from the critical path. (#14476)
Co-authored-by: Ash Berlin-Taylor <as...@apache.org>
---
airflow/jobs/scheduler_job.py | 6 +++-
tests/jobs/test_scheduler_job.py | 75 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 80 insertions(+), 1 deletion(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 3970df9..e380512 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -920,8 +920,12 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
.filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
.options(selectinload('dag_model'))
- .limit(max_tis)
)
+ starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
+ if starved_pools:
+ query = query.filter(not_(TI.pool.in_(starved_pools)))
+
+ query = query.limit(max_tis)
task_instances_to_examine: List[TI] = with_row_locks(
query,
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index f01c139..9347fa4 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2670,6 +2670,81 @@ class TestSchedulerJob(unittest.TestCase):
# As tasks require 2 slots, only 3 can fit into 6 available
assert len(task_instances_list) == 3
+ def test_scheduler_keeps_scheduling_pool_full(self):
+ """
+ Test task instances in a pool that isn't full keep getting scheduled even when a pool is full.
+ """
+ dag_d1 = DAG(dag_id='test_scheduler_keeps_scheduling_pool_full_d1', start_date=DEFAULT_DATE)
+ BashOperator(
+ task_id='test_scheduler_keeps_scheduling_pool_full_t1',
+ dag=dag_d1,
+ owner='airflow',
+ pool='test_scheduler_keeps_scheduling_pool_full_p1',
+ bash_command='echo hi',
+ )
+
+ dag_d2 = DAG(dag_id='test_scheduler_keeps_scheduling_pool_full_d2', start_date=DEFAULT_DATE)
+ BashOperator(
+ task_id='test_scheduler_keeps_scheduling_pool_full_t2',
+ dag=dag_d2,
+ owner='airflow',
+ pool='test_scheduler_keeps_scheduling_pool_full_p2',
+ bash_command='echo hi',
+ )
+ dagbag = DagBag(
+ dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+ include_examples=False,
+ read_dags_from_db=True,
+ )
+ dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1)
+ dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2)
+ dagbag.sync_to_db()
+
+ session = settings.Session()
+ pool_p1 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p1', slots=1)
+ pool_p2 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p2', slots=10)
+ session.add(pool_p1)
+ session.add(pool_p2)
+ session.commit()
+
+ dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1))
+
+ scheduler = SchedulerJob(executor=self.null_exec)
+ scheduler.processor_agent = mock.MagicMock()
+
+ # Create 5 dagruns for each DAG.
+ # To increase the chances the TIs from the "full" pool will get retrieved first, we schedule all
+ # TIs from the first dag first.
+ date = DEFAULT_DATE
+ for _ in range(5):
+ dr = dag_d1.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=date,
+ state=State.RUNNING,
+ )
+ scheduler._schedule_dag_run(dr, {}, session)
+ date = dag_d1.following_schedule(date)
+
+ date = DEFAULT_DATE
+ for _ in range(5):
+ dr = dag_d2.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=date,
+ state=State.RUNNING,
+ )
+ scheduler._schedule_dag_run(dr, {}, session)
+ date = dag_d2.following_schedule(date)
+
+ scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
+ task_instances_list2 = scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
+
+ # Make sure we get TIs from a non-full pool in the 2nd list
+ assert len(task_instances_list2) > 0
+ assert all(
+ task_instance.pool != 'test_scheduler_keeps_scheduling_pool_full_p1'
+ for task_instance in task_instances_list2
+ )
+
def test_scheduler_verify_priority_and_slots(self):
"""
Test task instances with higher priority are not queued