You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/04/19 22:14:55 UTC
[airflow] branch master updated: Fix incorrect slots stats when TI
``pool_slots > 1`` (#15426)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new d7c27b8 Fix incorrect slots stats when TI ``pool_slots > 1`` (#15426)
d7c27b8 is described below
commit d7c27b85055010377b6f971c3c604ce9821d6f46
Author: wongelz <wo...@gmail.com>
AuthorDate: Tue Apr 20 08:14:40 2021 +1000
Fix incorrect slots stats when TI ``pool_slots > 1`` (#15426)
Fixes the incorrect number of queued and running slots, and therefore, open slots, when there exist task instances that occupy > 1 pool_slots. This was causing the scheduler to over-commit to a given pool, and a subsequent state where no further tasks can be scheduled because slots cannot be freed.
closes: #15399
---
airflow/models/pool.py | 2 +-
tests/models/test_pool.py | 22 ++++++++++++++++++++++
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git a/airflow/models/pool.py b/airflow/models/pool.py
index 61d48c3..131559d 100644
--- a/airflow/models/pool.py
+++ b/airflow/models/pool.py
@@ -109,7 +109,7 @@ class Pool(Base):
pools[pool_name] = PoolStats(total=total_slots, running=0, queued=0, open=0)
state_count_by_pool = (
- session.query(TaskInstance.pool, TaskInstance.state, func.count())
+ session.query(TaskInstance.pool, TaskInstance.state, func.sum(TaskInstance.pool_slots))
.filter(TaskInstance.state.in_(list(EXECUTION_STATES)))
.group_by(TaskInstance.pool, TaskInstance.state)
).all()
diff --git a/tests/models/test_pool.py b/tests/models/test_pool.py
index 915d01d..f4c7626 100644
--- a/tests/models/test_pool.py
+++ b/tests/models/test_pool.py
@@ -102,6 +102,20 @@ class TestPool(unittest.TestCase):
assert 1 == pool.running_slots() # pylint: disable=no-value-for-parameter
assert 1 == pool.queued_slots() # pylint: disable=no-value-for-parameter
assert 2 == pool.occupied_slots() # pylint: disable=no-value-for-parameter
+ assert {
+ "default_pool": {
+ "open": 128,
+ "queued": 0,
+ "total": 128,
+ "running": 0,
+ },
+ "test_pool": {
+ "open": -1,
+ "queued": 1,
+ "running": 1,
+ "total": -1,
+ },
+ } == pool.slots_stats()
def test_default_pool_open_slots(self):
set_default_pool_slots(5)
@@ -125,3 +139,11 @@ class TestPool(unittest.TestCase):
session.close()
assert 2 == Pool.get_default_pool().open_slots()
+ assert {
+ "default_pool": {
+ "open": 2,
+ "queued": 2,
+ "total": 5,
+ "running": 1,
+ }
+ } == Pool.slots_stats()