You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/04/19 22:14:55 UTC

[airflow] branch master updated: Fix incorrect slots stats when TI ``pool_slots > 1`` (#15426)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new d7c27b8  Fix incorrect slots stats when TI ``pool_slots > 1`` (#15426)
d7c27b8 is described below

commit d7c27b85055010377b6f971c3c604ce9821d6f46
Author: wongelz <wo...@gmail.com>
AuthorDate: Tue Apr 20 08:14:40 2021 +1000

    Fix incorrect slots stats when TI ``pool_slots > 1`` (#15426)
    
    Fixes the incorrect number of queued and running slots, and therefore, open slots, when there exist task instances that occupy > 1 pool_slots. This was causing the scheduler to over-commit to a given pool, and a subsequent state where no further tasks can be scheduled because slots cannot be freed.
    
    closes: #15399
---
 airflow/models/pool.py    |  2 +-
 tests/models/test_pool.py | 22 ++++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/airflow/models/pool.py b/airflow/models/pool.py
index 61d48c3..131559d 100644
--- a/airflow/models/pool.py
+++ b/airflow/models/pool.py
@@ -109,7 +109,7 @@ class Pool(Base):
             pools[pool_name] = PoolStats(total=total_slots, running=0, queued=0, open=0)
 
         state_count_by_pool = (
-            session.query(TaskInstance.pool, TaskInstance.state, func.count())
+            session.query(TaskInstance.pool, TaskInstance.state, func.sum(TaskInstance.pool_slots))
             .filter(TaskInstance.state.in_(list(EXECUTION_STATES)))
             .group_by(TaskInstance.pool, TaskInstance.state)
         ).all()
diff --git a/tests/models/test_pool.py b/tests/models/test_pool.py
index 915d01d..f4c7626 100644
--- a/tests/models/test_pool.py
+++ b/tests/models/test_pool.py
@@ -102,6 +102,20 @@ class TestPool(unittest.TestCase):
         assert 1 == pool.running_slots()  # pylint: disable=no-value-for-parameter
         assert 1 == pool.queued_slots()  # pylint: disable=no-value-for-parameter
         assert 2 == pool.occupied_slots()  # pylint: disable=no-value-for-parameter
+        assert {
+            "default_pool": {
+                "open": 128,
+                "queued": 0,
+                "total": 128,
+                "running": 0,
+            },
+            "test_pool": {
+                "open": -1,
+                "queued": 1,
+                "running": 1,
+                "total": -1,
+            },
+        } == pool.slots_stats()
 
     def test_default_pool_open_slots(self):
         set_default_pool_slots(5)
@@ -125,3 +139,11 @@ class TestPool(unittest.TestCase):
         session.close()
 
         assert 2 == Pool.get_default_pool().open_slots()
+        assert {
+            "default_pool": {
+                "open": 2,
+                "queued": 2,
+                "total": 5,
+                "running": 1,
+            }
+        } == Pool.slots_stats()