You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/03/13 19:43:06 UTC

[GitHub] [airflow] houqp commented on a change in pull request #6342: [AIRFLOW-5662] fix incorrect naming and batch db call for scheduler metrics

houqp commented on a change in pull request #6342: [AIRFLOW-5662] fix incorrect naming and batch db call for scheduler metrics
URL: https://github.com/apache/airflow/pull/6342#discussion_r392433697
 
 

 ##########
 File path: airflow/models/pool.py
 ##########
 @@ -48,6 +50,45 @@ def get_pool(pool_name, session=None):
     def get_default_pool(session=None):
         return Pool.get_pool(Pool.DEFAULT_POOL_NAME, session=session)
 
+    @staticmethod
+    @provide_session
+    def slots_stats(session=None) -> Dict[str, Dict[str, int]]:
+        from airflow.models.taskinstance import TaskInstance  # Avoid circular import
+
+        pools = {}
+
+        pool_rows = session.query(Pool.pool, Pool.slots).all()
+        for (pool_name, total_slots) in pool_rows:
+            pools[pool_name] = {
+                "total": total_slots,
+                "used": 0,
+                "queued": 0,
+            }
+
+        state_count = (
+            session.query(TaskInstance.pool, TaskInstance.state, func.count())
+            .filter(TaskInstance.state.in_(STATES_TO_COUNT_AS_RUNNING))
+            .filter(TaskInstance.pool.in_(pools.keys()))
 
 Review comment:
   Yes, there is a pool can be deleted before task state is updated. However, I think we can handle this case in a better way by doing the validation in python in the subsequent for loop.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services