You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/06/22 13:46:04 UTC
[airflow] 14/38: Fix tasks in an infinite slots pool were never
scheduled (#15247)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1ca495c3d7a34181a2c8f9e743cbef78dc63c490
Author: Benoit Person <be...@gmail.com>
AuthorDate: Tue Jun 22 08:31:04 2021 +0000
Fix tasks in an infinite slots pool were never scheduled (#15247)
Infinite pools: Make their `total_slots` be `inf` instead of `-1`
(cherry picked from commit 96f764389eded9f1ea908e899b54bf00635ec787)
---
airflow/models/pool.py | 6 +++++-
tests/jobs/test_scheduler_job.py | 35 +++++++++++++++++++++++++++++++++++
tests/models/test_pool.py | 4 ++--
3 files changed, 42 insertions(+), 3 deletions(-)
diff --git a/airflow/models/pool.py b/airflow/models/pool.py
index feade77..3d152ee 100644
--- a/airflow/models/pool.py
+++ b/airflow/models/pool.py
@@ -106,6 +106,8 @@ class Pool(Base):
pool_rows: Iterable[Tuple[str, int]] = query.all()
for (pool_name, total_slots) in pool_rows:
+ if total_slots == -1:
+ total_slots = float('inf') # type: ignore
pools[pool_name] = PoolStats(total=total_slots, running=0, queued=0, open=0)
state_count_by_pool = (
@@ -115,8 +117,10 @@ class Pool(Base):
).all()
# calculate queued and running metrics
- count: int
for (pool_name, state, count) in state_count_by_pool:
+ # Some databases return decimal.Decimal here.
+ count = int(count)
+
stats_dict: Optional[PoolStats] = pools.get(pool_name)
if not stats_dict:
continue
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 954b395..faf19d9 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1264,6 +1264,41 @@ class TestSchedulerJob(unittest.TestCase):
assert 0 == len(res)
session.rollback()
+ def test_infinite_pool(self):
+ dag_id = 'SchedulerJobTest.test_infinite_pool'
+ task_id = 'dummy'
+ dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
+ task = DummyOperator(dag=dag, task_id=task_id, pool="infinite_pool")
+ dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ session = settings.Session()
+
+ dag_model = DagModel(
+ dag_id=dag_id,
+ is_paused=False,
+ concurrency=dag.concurrency,
+ has_task_concurrency_limits=False,
+ )
+ session.add(dag_model)
+ dr = dag.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+
+ ti = TaskInstance(task, dr.execution_date)
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ infinite_pool = Pool(pool='infinite_pool', slots=-1, description='infinite pool')
+ session.add(infinite_pool)
+ session.commit()
+
+ res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+ session.flush()
+ assert 1 == len(res)
+ session.rollback()
+
def test_find_executable_task_instances_none(self):
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_none'
task_id_1 = 'dummy'
diff --git a/tests/models/test_pool.py b/tests/models/test_pool.py
index f4c7626..7981e23 100644
--- a/tests/models/test_pool.py
+++ b/tests/models/test_pool.py
@@ -110,10 +110,10 @@ class TestPool(unittest.TestCase):
"running": 0,
},
"test_pool": {
- "open": -1,
+ "open": float('inf'),
"queued": 1,
"running": 1,
- "total": -1,
+ "total": float('inf'),
},
} == pool.slots_stats()