You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/29 11:14:39 UTC

[airflow] 02/02: Scheduler: Remove TIs from starved pools from the critical path. (#14476)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8a2a33ae548db60cff46f9a0618d5489c7a83509
Author: Benoit Person <be...@gmail.com>
AuthorDate: Mon Mar 29 11:13:27 2021 +0000

    Scheduler: Remove TIs from starved pools from the critical path. (#14476)
    
    Co-authored-by: Ash Berlin-Taylor <as...@apache.org>
---
 airflow/jobs/scheduler_job.py    |  6 +++-
 tests/jobs/test_scheduler_job.py | 75 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 80 insertions(+), 1 deletion(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 3970df9..e380512 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -920,8 +920,12 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
             .filter(not_(DM.is_paused))
             .filter(TI.state == State.SCHEDULED)
             .options(selectinload('dag_model'))
-            .limit(max_tis)
         )
+        starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
+        if starved_pools:
+            query = query.filter(not_(TI.pool.in_(starved_pools)))
+
+        query = query.limit(max_tis)
 
         task_instances_to_examine: List[TI] = with_row_locks(
             query,
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index f01c139..9347fa4 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2670,6 +2670,81 @@ class TestSchedulerJob(unittest.TestCase):
         # As tasks require 2 slots, only 3 can fit into 6 available
         assert len(task_instances_list) == 3
 
+    def test_scheduler_keeps_scheduling_pool_full(self):
+        """
+        Test task instances in a pool that isn't full keep getting scheduled even when a pool is full.
+        """
+        dag_d1 = DAG(dag_id='test_scheduler_keeps_scheduling_pool_full_d1', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_pool_full_t1',
+            dag=dag_d1,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_pool_full_p1',
+            bash_command='echo hi',
+        )
+
+        dag_d2 = DAG(dag_id='test_scheduler_keeps_scheduling_pool_full_d2', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_pool_full_t2',
+            dag=dag_d2,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_pool_full_p2',
+            bash_command='echo hi',
+        )
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+        dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1)
+        dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2)
+        dagbag.sync_to_db()
+
+        session = settings.Session()
+        pool_p1 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p1', slots=1)
+        pool_p2 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p2', slots=10)
+        session.add(pool_p1)
+        session.add(pool_p2)
+        session.commit()
+
+        dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1))
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+        scheduler.processor_agent = mock.MagicMock()
+
+        # Create 5 dagruns for each DAG.
+        # To increase the chances the TIs from the "full" pool will get retrieved first, we schedule all
+        # TIs from the first dag first.
+        date = DEFAULT_DATE
+        for _ in range(5):
+            dr = dag_d1.create_dagrun(
+                run_type=DagRunType.SCHEDULED,
+                execution_date=date,
+                state=State.RUNNING,
+            )
+            scheduler._schedule_dag_run(dr, {}, session)
+            date = dag_d1.following_schedule(date)
+
+        date = DEFAULT_DATE
+        for _ in range(5):
+            dr = dag_d2.create_dagrun(
+                run_type=DagRunType.SCHEDULED,
+                execution_date=date,
+                state=State.RUNNING,
+            )
+            scheduler._schedule_dag_run(dr, {}, session)
+            date = dag_d2.following_schedule(date)
+
+        scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
+        task_instances_list2 = scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
+
+        # Make sure we get TIs from a non-full pool in the 2nd list
+        assert len(task_instances_list2) > 0
+        assert all(
+            task_instance.pool != 'test_scheduler_keeps_scheduling_pool_full_p1'
+            for task_instance in task_instances_list2
+        )
+
     def test_scheduler_verify_priority_and_slots(self):
         """
         Test task instances with higher priority are not queued