You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/08/14 13:54:45 UTC

[airflow] branch v1-10-test updated: Fixes treatment of open slots in scheduler (#9316) (#9505)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 488855b  Fixes treatment of open slots in scheduler (#9316) (#9505)
488855b is described below

commit 488855b75ea454aeb92f2ddd55be1ed44bb05bb6
Author: pulsar314 <pu...@gmail.com>
AuthorDate: Thu Jun 25 22:42:03 2020 +0300

    Fixes treatment of open slots in scheduler (#9316) (#9505)
    
    Makes scheduler count with number of slots required by tasks.
    If there's less open slots than required, a task isn't taken to a queue.
    
    (cherry picked from commit 0e31f186d38b776710080ba07be50eedf42c48a7)
---
 airflow/jobs/scheduler_job.py    |  25 +++++--
 tests/jobs/test_scheduler_job.py | 144 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 164 insertions(+), 5 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 97f3929..16536f4 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -969,6 +969,9 @@ class SchedulerJob(BaseJob):
         dag_concurrency_map, task_concurrency_map = self.__get_concurrency_maps(
             states=STATES_TO_COUNT_AS_RUNNING, session=session)
 
+        num_tasks_in_executor = 0
+        num_starving_tasks_total = 0
+
         # Go through each pool, and queue up a task for execution if there are
         # any open slots in the pool.
         for pool, task_instances in pool_to_task_instances.items():
@@ -1002,7 +1005,9 @@ class SchedulerJob(BaseJob):
                         open_slots, pool
                     )
                     # Can't schedule any more since there are no more open slots.
-                    num_starving_tasks = len(priority_sorted_task_instances) - current_index
+                    num_unhandled = len(priority_sorted_task_instances) - current_index
+                    num_starving_tasks += num_unhandled
+                    num_starving_tasks_total += num_unhandled
                     break
 
                 # Check to make sure that the task concurrency of the DAG hasn't been
@@ -1045,8 +1050,17 @@ class SchedulerJob(BaseJob):
                     num_tasks_in_executor += 1
                     continue
 
+                if task_instance.pool_slots > open_slots:
+                    self.log.info("Not executing %s since it requires %s slots "
+                                  "but there are %s open slots in the pool %s.",
+                                  task_instance, task_instance.pool_slots, open_slots, pool)
+                    num_starving_tasks += 1
+                    num_starving_tasks_total += 1
+                    # Though we can execute tasks with lower priority if there's enough room
+                    continue
+
                 executable_tis.append(task_instance)
-                open_slots -= 1
+                open_slots -= task_instance.pool_slots
                 dag_concurrency_map[dag_id] += 1
                 task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1
 
@@ -1057,9 +1071,10 @@ class SchedulerJob(BaseJob):
             Stats.gauge('pool.used_slots.{pool_name}'.format(pool_name=pool_name),
                         pools[pool_name].occupied_slots())
             Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
-            Stats.gauge('scheduler.tasks.running', num_tasks_in_executor)
-            Stats.gauge('scheduler.tasks.starving', num_starving_tasks)
-            Stats.gauge('scheduler.tasks.executable', len(executable_tis))
+
+        Stats.gauge('scheduler.tasks.running', num_tasks_in_executor)
+        Stats.gauge('scheduler.tasks.starving', num_starving_tasks_total)
+        Stats.gauge('scheduler.tasks.executable', len(executable_tis))
 
         task_instance_str = "\n\t".join(
             [repr(x) for x in executable_tis])
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 161e479..2188d8b 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -48,6 +48,7 @@ from airflow.models import DAG, DagBag, DagModel, DagRun, Pool, SlaMiss, \
     TaskInstance as TI, errors
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.utils import timezone
 from airflow.utils.dag_processing import SimpleDag, SimpleDagBag, list_py_file_paths
 from airflow.utils.dates import days_ago
@@ -1901,6 +1902,149 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertIsNotNone(dr)
         self.assertEqual(dr.execution_date, timezone.datetime(2016, 1, 1, 10, 10))
 
+    def test_scheduler_verify_pool_full_2_slots_per_task(self):
+        """
+        Test task instances not queued when pool is full.
+
+        Variation with non-default pool_slots
+        """
+        dag = DAG(
+            dag_id='test_scheduler_verify_pool_full_2_slots_per_task',
+            start_date=DEFAULT_DATE)
+
+        DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow',
+            pool='test_scheduler_verify_pool_full_2_slots_per_task',
+            pool_slots=2,
+        )
+
+        session = settings.Session()
+        pool = Pool(pool='test_scheduler_verify_pool_full_2_slots_per_task', slots=6)
+        session.add(pool)
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        orm_dag.is_paused = False
+        session.merge(orm_dag)
+        session.commit()
+
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+
+        # Create 5 dagruns, which will create 5 task instances.
+        for _ in range(5):
+            scheduler.create_dag_run(dag)
+        task_instances_list = []
+        scheduler._process_task_instances(dag, task_instances_list=task_instances_list)
+        self.assertEqual(len(task_instances_list), 5)
+        dagbag = self._make_simple_dag_bag([dag])
+
+        # Recreated part of the scheduler here, to kick off tasks -> executor
+        for ti_key in task_instances_list:
+            task = dag.get_task(ti_key[1])
+            ti = TaskInstance(task, ti_key[2])
+            # Task starts out in the scheduled state. All tasks in the
+            # scheduled state will be sent to the executor
+            ti.state = State.SCHEDULED
+
+            # Also save this task instance to the DB.
+            session.merge(ti)
+        session.commit()
+
+        self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition")
+        scheduler._execute_task_instances(dagbag, (State.SCHEDULED,), session=session)
+
+        # As tasks require 2 slots, only 3 can fit into 6 available
+        self.assertEqual(len(scheduler.executor.queued_tasks), 3)
+
+    def test_scheduler_verify_priority_and_slots(self):
+        """
+        Test task instances with higher priority are not queued
+        when pool does not have enough slots.
+
+        Though tasks with lower priority might be executed.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_verify_priority_and_slots',
+            start_date=DEFAULT_DATE)
+
+        # Medium priority, not enough slots
+        DummyOperator(
+            task_id='test_scheduler_verify_priority_and_slots_t0',
+            dag=dag,
+            owner='airflow',
+            pool='test_scheduler_verify_priority_and_slots',
+            pool_slots=2,
+            priority_weight=2,
+        )
+        # High priority, occupies first slot
+        DummyOperator(
+            task_id='test_scheduler_verify_priority_and_slots_t1',
+            dag=dag,
+            owner='airflow',
+            pool='test_scheduler_verify_priority_and_slots',
+            pool_slots=1,
+            priority_weight=3,
+        )
+        # Low priority, occupies second slot
+        DummyOperator(
+            task_id='test_scheduler_verify_priority_and_slots_t2',
+            dag=dag,
+            owner='airflow',
+            pool='test_scheduler_verify_priority_and_slots',
+            pool_slots=1,
+            priority_weight=1,
+        )
+
+        session = settings.Session()
+        pool = Pool(pool='test_scheduler_verify_priority_and_slots', slots=2)
+        session.add(pool)
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        orm_dag.is_paused = False
+        session.merge(orm_dag)
+        session.commit()
+
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+
+        scheduler.create_dag_run(dag)
+        task_instances_list = []
+        scheduler._process_task_instances(dag, task_instances_list=task_instances_list)
+        self.assertEqual(len(task_instances_list), 3)
+        dagbag = self._make_simple_dag_bag([dag])
+
+        # Recreated part of the scheduler here, to kick off tasks -> executor
+        for ti_key in task_instances_list:
+            task = dag.get_task(ti_key[1])
+            ti = TaskInstance(task, ti_key[2])
+            # Task starts out in the scheduled state. All tasks in the
+            # scheduled state will be sent to the executor
+            ti.state = State.SCHEDULED
+
+            # Also save this task instance to the DB.
+            session.merge(ti)
+        session.commit()
+
+        self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition")
+        scheduler._execute_task_instances(dagbag, (State.SCHEDULED, ), session=session)
+
+        # Only second and third
+        self.assertEqual(len(scheduler.executor.queued_tasks), 2)
+
+        ti0 = session.query(TaskInstance)\
+            .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t0').first()
+        self.assertEqual(ti0.state, State.SCHEDULED)
+
+        ti1 = session.query(TaskInstance)\
+            .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t1').first()
+        self.assertEqual(ti1.state, State.QUEUED)
+
+        ti2 = session.query(TaskInstance)\
+            .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t2').first()
+        self.assertEqual(ti2.state, State.QUEUED)
+
     def test_scheduler_reschedule(self):
         """
         Checks if tasks that are not taken up by the executor