You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/06/22 13:46:04 UTC

[airflow] 14/38: Fix tasks in an infinite slots pool were never scheduled (#15247)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1ca495c3d7a34181a2c8f9e743cbef78dc63c490
Author: Benoit Person <be...@gmail.com>
AuthorDate: Tue Jun 22 08:31:04 2021 +0000

    Fix tasks in an infinite slots pool were never scheduled (#15247)
    
    Infinite pools: Make their `total_slots` be `inf` instead of `-1`
    
    (cherry picked from commit 96f764389eded9f1ea908e899b54bf00635ec787)
---
 airflow/models/pool.py           |  6 +++++-
 tests/jobs/test_scheduler_job.py | 35 +++++++++++++++++++++++++++++++++++
 tests/models/test_pool.py        |  4 ++--
 3 files changed, 42 insertions(+), 3 deletions(-)

diff --git a/airflow/models/pool.py b/airflow/models/pool.py
index feade77..3d152ee 100644
--- a/airflow/models/pool.py
+++ b/airflow/models/pool.py
@@ -106,6 +106,8 @@ class Pool(Base):
 
         pool_rows: Iterable[Tuple[str, int]] = query.all()
         for (pool_name, total_slots) in pool_rows:
+            if total_slots == -1:
+                total_slots = float('inf')  # type: ignore
             pools[pool_name] = PoolStats(total=total_slots, running=0, queued=0, open=0)
 
         state_count_by_pool = (
@@ -115,8 +117,10 @@ class Pool(Base):
         ).all()
 
         # calculate queued and running metrics
-        count: int
         for (pool_name, state, count) in state_count_by_pool:
+            # Some databases return decimal.Decimal here.
+            count = int(count)
+
             stats_dict: Optional[PoolStats] = pools.get(pool_name)
             if not stats_dict:
                 continue
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 954b395..faf19d9 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1264,6 +1264,41 @@ class TestSchedulerJob(unittest.TestCase):
         assert 0 == len(res)
         session.rollback()
 
+    def test_infinite_pool(self):
+        dag_id = 'SchedulerJobTest.test_infinite_pool'
+        task_id = 'dummy'
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
+        task = DummyOperator(dag=dag, task_id=task_id, pool="infinite_pool")
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_model = DagModel(
+            dag_id=dag_id,
+            is_paused=False,
+            concurrency=dag.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model)
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        ti = TaskInstance(task, dr.execution_date)
+        ti.state = State.SCHEDULED
+        session.merge(ti)
+        infinite_pool = Pool(pool='infinite_pool', slots=-1, description='infinite pool')
+        session.add(infinite_pool)
+        session.commit()
+
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        session.flush()
+        assert 1 == len(res)
+        session.rollback()
+
     def test_find_executable_task_instances_none(self):
         dag_id = 'SchedulerJobTest.test_find_executable_task_instances_none'
         task_id_1 = 'dummy'
diff --git a/tests/models/test_pool.py b/tests/models/test_pool.py
index f4c7626..7981e23 100644
--- a/tests/models/test_pool.py
+++ b/tests/models/test_pool.py
@@ -110,10 +110,10 @@ class TestPool(unittest.TestCase):
                 "running": 0,
             },
             "test_pool": {
-                "open": -1,
+                "open": float('inf'),
                 "queued": 1,
                 "running": 1,
-                "total": -1,
+                "total": float('inf'),
             },
         } == pool.slots_stats()