You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/14 18:52:52 UTC
[airflow] 01/04: Fixes treatment of open slots in scheduler (#9316)
(#9505)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c5a941b9b590eaa528e0b41efa62b63027327664
Author: pulsar314 <pu...@gmail.com>
AuthorDate: Thu Jun 25 22:42:03 2020 +0300
Fixes treatment of open slots in scheduler (#9316) (#9505)
Makes scheduler count with number of slots required by tasks.
If there's less open slots than required, a task isn't taken to a queue.
(cherry picked from commit 0e31f186d38b776710080ba07be50eedf42c48a7)
---
airflow/jobs/scheduler_job.py | 29 +++++---
tests/jobs/test_scheduler_job.py | 144 +++++++++++++++++++++++++++++++++++++++
2 files changed, 165 insertions(+), 8 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 97f3929..685b57f 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -969,6 +969,9 @@ class SchedulerJob(BaseJob):
dag_concurrency_map, task_concurrency_map = self.__get_concurrency_maps(
states=STATES_TO_COUNT_AS_RUNNING, session=session)
+ num_tasks_in_executor = 0
+ num_starving_tasks_total = 0
+
# Go through each pool, and queue up a task for execution if there are
# any open slots in the pool.
for pool, task_instances in pool_to_task_instances.items():
@@ -992,9 +995,7 @@ class SchedulerJob(BaseJob):
priority_sorted_task_instances = sorted(
task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date))
- # Number of tasks that cannot be scheduled because of no open slot in pool
num_starving_tasks = 0
- num_tasks_in_executor = 0
for current_index, task_instance in enumerate(priority_sorted_task_instances):
if open_slots <= 0:
self.log.info(
@@ -1002,7 +1003,9 @@ class SchedulerJob(BaseJob):
open_slots, pool
)
# Can't schedule any more since there are no more open slots.
- num_starving_tasks = len(priority_sorted_task_instances) - current_index
+ num_unhandled = len(priority_sorted_task_instances) - current_index
+ num_starving_tasks += num_unhandled
+ num_starving_tasks_total += num_unhandled
break
# Check to make sure that the task concurrency of the DAG hasn't been
@@ -1045,8 +1048,17 @@ class SchedulerJob(BaseJob):
num_tasks_in_executor += 1
continue
+ if task_instance.pool_slots > open_slots:
+ self.log.info("Not executing %s since it requires %s slots "
+ "but there are %s open slots in the pool %s.",
+ task_instance, task_instance.pool_slots, open_slots, pool)
+ num_starving_tasks += 1
+ num_starving_tasks_total += 1
+ # Though we can execute tasks with lower priority if there's enough room
+ continue
+
executable_tis.append(task_instance)
- open_slots -= 1
+ open_slots -= task_instance.pool_slots
dag_concurrency_map[dag_id] += 1
task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1
@@ -1056,10 +1068,11 @@ class SchedulerJob(BaseJob):
pools[pool_name].open_slots())
Stats.gauge('pool.used_slots.{pool_name}'.format(pool_name=pool_name),
pools[pool_name].occupied_slots())
- Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
- Stats.gauge('scheduler.tasks.running', num_tasks_in_executor)
- Stats.gauge('scheduler.tasks.starving', num_starving_tasks)
- Stats.gauge('scheduler.tasks.executable', len(executable_tis))
+
+ Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
+ Stats.gauge('scheduler.tasks.running', num_tasks_in_executor)
+ Stats.gauge('scheduler.tasks.starving', num_starving_tasks_total)
+ Stats.gauge('scheduler.tasks.executable', len(executable_tis))
task_instance_str = "\n\t".join(
[repr(x) for x in executable_tis])
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 161e479..2188d8b 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -48,6 +48,7 @@ from airflow.models import DAG, DagBag, DagModel, DagRun, Pool, SlaMiss, \
TaskInstance as TI, errors
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
+from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils import timezone
from airflow.utils.dag_processing import SimpleDag, SimpleDagBag, list_py_file_paths
from airflow.utils.dates import days_ago
@@ -1901,6 +1902,149 @@ class SchedulerJobTest(unittest.TestCase):
self.assertIsNotNone(dr)
self.assertEqual(dr.execution_date, timezone.datetime(2016, 1, 1, 10, 10))
+ def test_scheduler_verify_pool_full_2_slots_per_task(self):
+ """
+ Test task instances not queued when pool is full.
+
+ Variation with non-default pool_slots
+ """
+ dag = DAG(
+ dag_id='test_scheduler_verify_pool_full_2_slots_per_task',
+ start_date=DEFAULT_DATE)
+
+ DummyOperator(
+ task_id='dummy',
+ dag=dag,
+ owner='airflow',
+ pool='test_scheduler_verify_pool_full_2_slots_per_task',
+ pool_slots=2,
+ )
+
+ session = settings.Session()
+ pool = Pool(pool='test_scheduler_verify_pool_full_2_slots_per_task', slots=6)
+ session.add(pool)
+ orm_dag = DagModel(dag_id=dag.dag_id)
+ orm_dag.is_paused = False
+ session.merge(orm_dag)
+ session.commit()
+
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
+ scheduler = SchedulerJob(executor=self.null_exec)
+
+ # Create 5 dagruns, which will create 5 task instances.
+ for _ in range(5):
+ scheduler.create_dag_run(dag)
+ task_instances_list = []
+ scheduler._process_task_instances(dag, task_instances_list=task_instances_list)
+ self.assertEqual(len(task_instances_list), 5)
+ dagbag = self._make_simple_dag_bag([dag])
+
+ # Recreated part of the scheduler here, to kick off tasks -> executor
+ for ti_key in task_instances_list:
+ task = dag.get_task(ti_key[1])
+ ti = TaskInstance(task, ti_key[2])
+ # Task starts out in the scheduled state. All tasks in the
+ # scheduled state will be sent to the executor
+ ti.state = State.SCHEDULED
+
+ # Also save this task instance to the DB.
+ session.merge(ti)
+ session.commit()
+
+ self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition")
+ scheduler._execute_task_instances(dagbag, (State.SCHEDULED,), session=session)
+
+ # As tasks require 2 slots, only 3 can fit into 6 available
+ self.assertEqual(len(scheduler.executor.queued_tasks), 3)
+
+ def test_scheduler_verify_priority_and_slots(self):
+ """
+ Test task instances with higher priority are not queued
+ when pool does not have enough slots.
+
+ Though tasks with lower priority might be executed.
+ """
+ dag = DAG(
+ dag_id='test_scheduler_verify_priority_and_slots',
+ start_date=DEFAULT_DATE)
+
+ # Medium priority, not enough slots
+ DummyOperator(
+ task_id='test_scheduler_verify_priority_and_slots_t0',
+ dag=dag,
+ owner='airflow',
+ pool='test_scheduler_verify_priority_and_slots',
+ pool_slots=2,
+ priority_weight=2,
+ )
+ # High priority, occupies first slot
+ DummyOperator(
+ task_id='test_scheduler_verify_priority_and_slots_t1',
+ dag=dag,
+ owner='airflow',
+ pool='test_scheduler_verify_priority_and_slots',
+ pool_slots=1,
+ priority_weight=3,
+ )
+ # Low priority, occupies second slot
+ DummyOperator(
+ task_id='test_scheduler_verify_priority_and_slots_t2',
+ dag=dag,
+ owner='airflow',
+ pool='test_scheduler_verify_priority_and_slots',
+ pool_slots=1,
+ priority_weight=1,
+ )
+
+ session = settings.Session()
+ pool = Pool(pool='test_scheduler_verify_priority_and_slots', slots=2)
+ session.add(pool)
+ orm_dag = DagModel(dag_id=dag.dag_id)
+ orm_dag.is_paused = False
+ session.merge(orm_dag)
+ session.commit()
+
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
+ scheduler = SchedulerJob(executor=self.null_exec)
+
+ scheduler.create_dag_run(dag)
+ task_instances_list = []
+ scheduler._process_task_instances(dag, task_instances_list=task_instances_list)
+ self.assertEqual(len(task_instances_list), 3)
+ dagbag = self._make_simple_dag_bag([dag])
+
+ # Recreated part of the scheduler here, to kick off tasks -> executor
+ for ti_key in task_instances_list:
+ task = dag.get_task(ti_key[1])
+ ti = TaskInstance(task, ti_key[2])
+ # Task starts out in the scheduled state. All tasks in the
+ # scheduled state will be sent to the executor
+ ti.state = State.SCHEDULED
+
+ # Also save this task instance to the DB.
+ session.merge(ti)
+ session.commit()
+
+ self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition")
+ scheduler._execute_task_instances(dagbag, (State.SCHEDULED, ), session=session)
+
+ # Only second and third
+ self.assertEqual(len(scheduler.executor.queued_tasks), 2)
+
+ ti0 = session.query(TaskInstance)\
+ .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t0').first()
+ self.assertEqual(ti0.state, State.SCHEDULED)
+
+ ti1 = session.query(TaskInstance)\
+ .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t1').first()
+ self.assertEqual(ti1.state, State.QUEUED)
+
+ ti2 = session.query(TaskInstance)\
+ .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t2').first()
+ self.assertEqual(ti2.state, State.QUEUED)
+
def test_scheduler_reschedule(self):
"""
Checks if tasks that are not taken up by the executor