You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/08/09 12:34:44 UTC

[airflow] branch main updated: Enable pools to consider deferred tasks (#32709)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 70a050b1c3 Enable pools to consider deferred tasks (#32709)
70a050b1c3 is described below

commit 70a050b1c3a46e3eabae7f6052c95718800dc604
Author: Usiel Riedl <us...@gmail.com>
AuthorDate: Wed Aug 9 20:34:37 2023 +0800

    Enable pools to consider deferred tasks (#32709)
    
    * Makes pools respect deferrable tasks (with extra setting)
    
    See https://github.com/apache/airflow/discussions/21243
    
    This commit makes pools consider deferred tasks if the `include_deferred` flag is set. By default a pool will not consider deferred tasks as occupied slots, but still show the number of deferred tasks in its stats.
    
    ---------
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
---
 airflow/api/client/api_client.py                   |   3 +-
 airflow/api/client/json_client.py                  |   9 +-
 airflow/api/client/local_client.py                 |  10 +-
 airflow/api/common/experimental/pool.py            |   2 +-
 airflow/api_connexion/endpoints/pool_endpoint.py   |   4 +-
 airflow/api_connexion/openapi/v1.yaml              |  15 ++-
 airflow/api_connexion/schemas/pool_schema.py       |   8 ++
 airflow/cli/cli_config.py                          |  16 ++-
 airflow/cli/commands/pool_command.py               |  18 +++-
 airflow/jobs/scheduler_job_runner.py               |   2 +
 ...28_2_7_0_add_include_deferred_column_to_pool.py |  50 +++++++++
 airflow/models/pool.py                             |  58 ++++++++--
 airflow/ti_deps/deps/pool_slots_available_dep.py   |  11 +-
 airflow/utils/db.py                                |   1 +
 airflow/www/static/js/types/api-generated.ts       |  14 ++-
 airflow/www/views.py                               |  29 ++++-
 .../logging-monitoring/metrics.rst                 |   1 +
 .../administration-and-deployment/pools.rst        |   2 +-
 .../authoring-and-scheduling/deferring.rst         |   2 +-
 docs/apache-airflow/img/airflow_erd.sha256         |   2 +-
 docs/apache-airflow/img/airflow_erd.svg            |  86 ++++++++-------
 docs/apache-airflow/migrations-ref.rst             |   4 +-
 .../api_connexion/endpoints/test_pool_endpoint.py  | 120 ++++++++++++++++-----
 tests/api_connexion/schemas/test_pool_schemas.py   |  14 ++-
 tests/api_connexion/test_auth.py                   |   4 +
 tests/api_experimental/client/test_local_client.py |  20 ++--
 .../common/experimental/test_pool.py               |   7 +-
 tests/cli/commands/test_pool_command.py            |  67 ++++++++----
 tests/cli/commands/test_task_command.py            |   2 +-
 tests/jobs/test_backfill_job.py                    |   3 +-
 tests/jobs/test_scheduler_job.py                   |  31 +++---
 tests/models/test_pool.py                          |  70 +++++++++++-
 tests/models/test_taskinstance.py                  |   2 +-
 tests/operators/test_subdag_operator.py            |   8 +-
 .../ti_deps/deps/test_pool_slots_available_dep.py  |  13 ++-
 tests/www/api/experimental/test_endpoints.py       |   2 +
 tests/www/views/test_views_pool.py                 |   5 +
 37 files changed, 549 insertions(+), 166 deletions(-)

diff --git a/airflow/api/client/api_client.py b/airflow/api/client/api_client.py
index 1fdbc7027b..d4a6d2357a 100644
--- a/airflow/api/client/api_client.py
+++ b/airflow/api/client/api_client.py
@@ -60,12 +60,13 @@ class Client:
         """Get all pools."""
         raise NotImplementedError()
 
-    def create_pool(self, name, slots, description):
+    def create_pool(self, name, slots, description, include_deferred):
         """Create a pool.
 
         :param name: pool name
         :param slots: pool slots amount
         :param description: pool description
+        :param include_deferred: include deferred tasks in pool calculations
         """
         raise NotImplementedError()
 
diff --git a/airflow/api/client/json_client.py b/airflow/api/client/json_client.py
index 2eb91b3a42..ac73813136 100644
--- a/airflow/api/client/json_client.py
+++ b/airflow/api/client/json_client.py
@@ -108,23 +108,26 @@ class Client(api_client.Client):
         pools = self._request(url)
         return [(p["pool"], p["slots"], p["description"]) for p in pools]
 
-    def create_pool(self, name: str, slots: int, description: str):
+    def create_pool(self, name: str, slots: int, description: str, include_deferred: bool):
         """Create a new pool.
 
         :param name: The name of the pool to create.
         :param slots: The number of slots in the pool.
         :param description: A description of the pool.
+        :param include_deferred: include deferred tasks in pool calculations
+
         :return: A tuple containing the name of the pool, the number of slots in the pool,
-            and a description of the pool.
+            a description of the pool and the include_deferred flag.
         """
         endpoint = "/api/experimental/pools"
         data = {
             "name": name,
             "slots": slots,
             "description": description,
+            "include_deferred": include_deferred,
         }
         response = self._request(urljoin(self._api_base_url, endpoint), method="POST", json=data)
-        return response["pool"], response["slots"], response["description"]
+        return response["pool"], response["slots"], response["description"], response["include_deferred"]
 
     def delete_pool(self, name: str):
         """Delete a pool.
diff --git a/airflow/api/client/local_client.py b/airflow/api/client/local_client.py
index b90039608f..d8b9565099 100644
--- a/airflow/api/client/local_client.py
+++ b/airflow/api/client/local_client.py
@@ -63,12 +63,12 @@ class Client(api_client.Client):
         pool = Pool.get_pool(pool_name=name)
         if not pool:
             raise PoolNotFound(f"Pool {name} not found")
-        return pool.pool, pool.slots, pool.description
+        return pool.pool, pool.slots, pool.description, pool.include_deferred
 
     def get_pools(self):
-        return [(p.pool, p.slots, p.description) for p in Pool.get_pools()]
+        return [(p.pool, p.slots, p.description, p.include_deferred) for p in Pool.get_pools()]
 
-    def create_pool(self, name, slots, description):
+    def create_pool(self, name, slots, description, include_deferred):
         if not (name and name.strip()):
             raise AirflowBadRequest("Pool name shouldn't be empty")
         pool_name_length = Pool.pool.property.columns[0].type.length
@@ -78,7 +78,9 @@ class Client(api_client.Client):
             slots = int(slots)
         except ValueError:
             raise AirflowBadRequest(f"Bad value for `slots`: {slots}")
-        pool = Pool.create_or_update_pool(name=name, slots=slots, description=description)
+        pool = Pool.create_or_update_pool(
+            name=name, slots=slots, description=description, include_deferred=include_deferred
+        )
         return pool.pool, pool.slots, pool.description
 
     def delete_pool(self, name):
diff --git a/airflow/api/common/experimental/pool.py b/airflow/api/common/experimental/pool.py
index cff21d4d56..34e35cd435 100644
--- a/airflow/api/common/experimental/pool.py
+++ b/airflow/api/common/experimental/pool.py
@@ -68,7 +68,7 @@ def create_pool(name, slots, description, session: Session = NEW_SESSION):
     session.expire_on_commit = False
     pool = session.scalar(select(Pool).filter_by(pool=name).limit(1))
     if pool is None:
-        pool = Pool(pool=name, slots=slots, description=description)
+        pool = Pool(pool=name, slots=slots, description=description, include_deferred=False)
         session.add(pool)
     else:
         pool.slots = slots
diff --git a/airflow/api_connexion/endpoints/pool_endpoint.py b/airflow/api_connexion/endpoints/pool_endpoint.py
index 6503705d85..fb59ab5ea2 100644
--- a/airflow/api_connexion/endpoints/pool_endpoint.py
+++ b/airflow/api_connexion/endpoints/pool_endpoint.py
@@ -88,10 +88,10 @@ def patch_pool(
 ) -> APIResponse:
     """Update a pool."""
     request_dict = get_json_request_dict()
-    # Only slots can be modified in 'default_pool'
+    # Only slots and include_deferred can be modified in 'default_pool'
     try:
         if pool_name == Pool.DEFAULT_POOL_NAME and request_dict["name"] != Pool.DEFAULT_POOL_NAME:
-            if update_mask and len(update_mask) == 1 and update_mask[0].strip() == "slots":
+            if update_mask and all(mask.strip() in {"slots", "include_deferred"} for mask in update_mask):
                 pass
             else:
                 raise BadRequest(detail="Default Pool's name can't be modified")
diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index da7dca04f6..72f65438b0 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -3158,7 +3158,7 @@ components:
         occupied_slots:
           type: integer
           readOnly: true
-          description: The number of slots used by running/queued tasks at the moment.
+          description: The number of slots used by running/queued tasks at the moment. May include deferred tasks if 'include_deferred' is set to true.
         running_slots:
           type: integer
           readOnly: true
@@ -3175,6 +3175,13 @@ components:
           type: integer
           readOnly: true
           description: The number of slots used by scheduled tasks at the moment.
+        deferred_slots:
+          type: integer
+          readOnly: true
+          description: |
+            The number of slots used by deferred tasks at the moment. Relevant if 'include_deferred' is set to true.
+
+            *New in version 2.7.0*
         description:
           type: string
           description: |
@@ -3182,6 +3189,12 @@ components:
 
             *New in version 2.3.0*
           nullable: true
+        include_deferred:
+          type: boolean
+          description: |
+            If set to true, deferred tasks are considered when calculating open pool slots.
+
+            *New in version 2.7.0*
 
     PoolCollection:
       type: object
diff --git a/airflow/api_connexion/schemas/pool_schema.py b/airflow/api_connexion/schemas/pool_schema.py
index 4e25287d1d..e18548cee8 100644
--- a/airflow/api_connexion/schemas/pool_schema.py
+++ b/airflow/api_connexion/schemas/pool_schema.py
@@ -39,7 +39,10 @@ class PoolSchema(SQLAlchemySchema):
     queued_slots = fields.Method("get_queued_slots", dump_only=True)
     scheduled_slots = fields.Method("get_scheduled_slots", dump_only=True)
     open_slots = fields.Method("get_open_slots", dump_only=True)
+    deferred_slots = fields.Method("get_deferred_slots", dump_only=True)
     description = auto_field()
+    # we skip auto_field() here to be compatible with the manual validation in the pool_endpoint module
+    include_deferred = fields.Boolean(load_default=False)
 
     @staticmethod
     def get_occupied_slots(obj: Pool) -> int:
@@ -61,6 +64,11 @@ class PoolSchema(SQLAlchemySchema):
         """Returns the scheduled slots of the pool."""
         return obj.scheduled_slots()
 
+    @staticmethod
+    def get_deferred_slots(obj: Pool) -> int:
+        """Returns the deferred slots of the pool."""
+        return obj.deferred_slots()
+
     @staticmethod
     def get_open_slots(obj: Pool) -> float:
         """Returns the open slots of the pool."""
diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index 7569e2ff58..f2248efe5e 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -513,6 +513,9 @@ ARG_DB_RETRY_DELAY = Arg(
 ARG_POOL_NAME = Arg(("pool",), metavar="NAME", help="Pool name")
 ARG_POOL_SLOTS = Arg(("slots",), type=int, help="Pool slots")
 ARG_POOL_DESCRIPTION = Arg(("description",), help="Pool description")
+ARG_POOL_INCLUDE_DEFERRED = Arg(
+    ("--include-deferred",), help="Include deferred tasks in calculations for Pool", action="store_true"
+)
 ARG_POOL_IMPORT = Arg(
     ("file",),
     metavar="FILEPATH",
@@ -521,8 +524,8 @@ ARG_POOL_IMPORT = Arg(
         textwrap.dedent(
             """
             {
-                "pool_1": {"slots": 5, "description": ""},
-                "pool_2": {"slots": 10, "description": "test"}
+                "pool_1": {"slots": 5, "description": "", "include_deferred": true},
+                "pool_2": {"slots": 10, "description": "test", "include_deferred": false}
             }"""
         ),
         " " * 4,
@@ -1456,7 +1459,14 @@ POOLS_COMMANDS = (
         name="set",
         help="Configure pool",
         func=lazy_load_command("airflow.cli.commands.pool_command.pool_set"),
-        args=(ARG_POOL_NAME, ARG_POOL_SLOTS, ARG_POOL_DESCRIPTION, ARG_OUTPUT, ARG_VERBOSE),
+        args=(
+            ARG_POOL_NAME,
+            ARG_POOL_SLOTS,
+            ARG_POOL_DESCRIPTION,
+            ARG_POOL_INCLUDE_DEFERRED,
+            ARG_OUTPUT,
+            ARG_VERBOSE,
+        ),
     ),
     ActionCommand(
         name="delete",
diff --git a/airflow/cli/commands/pool_command.py b/airflow/cli/commands/pool_command.py
index 8d9e206f1b..7e7c63640c 100644
--- a/airflow/cli/commands/pool_command.py
+++ b/airflow/cli/commands/pool_command.py
@@ -38,6 +38,7 @@ def _show_pools(pools, output):
             "pool": x[0],
             "slots": x[1],
             "description": x[2],
+            "include_deferred": x[3],
         },
     )
 
@@ -69,7 +70,9 @@ def pool_get(args):
 def pool_set(args):
     """Creates new pool with a given name and slots."""
     api_client = get_current_api_client()
-    api_client.create_pool(name=args.pool, slots=args.slots, description=args.description)
+    api_client.create_pool(
+        name=args.pool, slots=args.slots, description=args.description, include_deferred=args.include_deferred
+    )
     print(f"Pool {args.pool} created")
 
 
@@ -119,8 +122,15 @@ def pool_import_helper(filepath):
     pools = []
     failed = []
     for k, v in pools_json.items():
-        if isinstance(v, dict) and len(v) == 2:
-            pools.append(api_client.create_pool(name=k, slots=v["slots"], description=v["description"]))
+        if isinstance(v, dict) and "slots" in v and "description" in v:
+            pools.append(
+                api_client.create_pool(
+                    name=k,
+                    slots=v["slots"],
+                    description=v["description"],
+                    include_deferred=v.get("include_deferred", False),
+                )
+            )
         else:
             failed.append(k)
     return pools, failed
@@ -132,7 +142,7 @@ def pool_export_helper(filepath):
     pool_dict = {}
     pools = api_client.get_pools()
     for pool in pools:
-        pool_dict[pool[0]] = {"slots": pool[1], "description": pool[2]}
+        pool_dict[pool[0]] = {"slots": pool[1], "description": pool[2], "include_deferred": pool[3]}
     with open(filepath, "w") as poolfile:
         poolfile.write(json.dumps(pool_dict, sort_keys=True, indent=4))
     return pools
diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py
index e47fc7bd86..0ed7aae5ce 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1571,10 +1571,12 @@ class SchedulerJobRunner(BaseJobRunner[Job], LoggingMixin):
             Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"])
             Stats.gauge(f"pool.queued_slots.{pool_name}", slot_stats["queued"])
             Stats.gauge(f"pool.running_slots.{pool_name}", slot_stats["running"])
+            Stats.gauge(f"pool.deferred_slots.{pool_name}", slot_stats["deferred"])
             # Same metrics with tagging
             Stats.gauge("pool.open_slots", slot_stats["open"], tags={"pool_name": pool_name})
             Stats.gauge("pool.queued_slots", slot_stats["queued"], tags={"pool_name": pool_name})
             Stats.gauge("pool.running_slots", slot_stats["running"], tags={"pool_name": pool_name})
+            Stats.gauge("pool.deferred_slots", slot_stats["deferred"], tags={"pool_name": pool_name})
 
     @provide_session
     def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> int:
diff --git a/airflow/migrations/versions/0128_2_7_0_add_include_deferred_column_to_pool.py b/airflow/migrations/versions/0128_2_7_0_add_include_deferred_column_to_pool.py
new file mode 100644
index 0000000000..ea10320480
--- /dev/null
+++ b/airflow/migrations/versions/0128_2_7_0_add_include_deferred_column_to_pool.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add include_deferred column to pool
+
+Revision ID: 405de8318b3a
+Revises: 788397e78828
+Create Date: 2023-07-20 04:22:21.007342
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+
+# revision identifiers, used by Alembic.
+revision = "405de8318b3a"
+down_revision = "788397e78828"
+branch_labels = None
+depends_on = None
+airflow_version = "2.7.0"
+
+
+def upgrade():
+    """Apply add include_deferred column to pool"""
+    with op.batch_alter_table("slot_pool") as batch_op:
+        batch_op.add_column(
+            sa.Column("include_deferred", sa.Boolean, nullable=False, server_default=sa.false())
+        )
+
+
+def downgrade():
+    """Unapply add include_deferred column to pool"""
+    with op.batch_alter_table("slot_pool") as batch_op:
+        batch_op.drop_column("include_deferred")
diff --git a/airflow/models/pool.py b/airflow/models/pool.py
index 3b5708b121..13a2e15e66 100644
--- a/airflow/models/pool.py
+++ b/airflow/models/pool.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 from typing import Any
 
-from sqlalchemy import Column, Integer, String, Text, func, select
+from sqlalchemy import Boolean, Column, Integer, String, Text, func, select
 from sqlalchemy.orm.session import Session
 
 from airflow.exceptions import AirflowException, PoolNotFound
@@ -37,6 +37,7 @@ class PoolStats(TypedDict):
 
     total: int
     running: int
+    deferred: int
     queued: int
     open: int
 
@@ -51,6 +52,7 @@ class Pool(Base):
     # -1 for infinite
     slots = Column(Integer, default=0)
     description = Column(Text)
+    include_deferred = Column(Boolean, nullable=False)
 
     DEFAULT_POOL_NAME = "default_pool"
 
@@ -108,6 +110,7 @@ class Pool(Base):
         name: str,
         slots: int,
         description: str,
+        include_deferred: bool,
         session: Session = NEW_SESSION,
     ) -> Pool:
         """Create a pool with given parameters or update it if it already exists."""
@@ -116,11 +119,12 @@ class Pool(Base):
 
         pool = session.scalar(select(Pool).filter_by(pool=name))
         if pool is None:
-            pool = Pool(pool=name, slots=slots, description=description)
+            pool = Pool(pool=name, slots=slots, description=description, include_deferred=include_deferred)
             session.add(pool)
         else:
             pool.slots = slots
             pool.description = description
+            pool.include_deferred = include_deferred
 
         session.commit()
         return pool
@@ -161,21 +165,26 @@ class Pool(Base):
         from airflow.models.taskinstance import TaskInstance  # Avoid circular import
 
         pools: dict[str, PoolStats] = {}
+        pool_includes_deferred: dict[str, bool] = {}
 
-        query = select(Pool.pool, Pool.slots)
+        query = select(Pool.pool, Pool.slots, Pool.include_deferred)
 
         if lock_rows:
             query = with_row_locks(query, session=session, **nowait(session))
 
         pool_rows = session.execute(query)
-        for (pool_name, total_slots) in pool_rows:
+        for (pool_name, total_slots, include_deferred) in pool_rows:
             if total_slots == -1:
                 total_slots = float("inf")  # type: ignore
-            pools[pool_name] = PoolStats(total=total_slots, running=0, queued=0, open=0)
+            pools[pool_name] = PoolStats(total=total_slots, running=0, queued=0, open=0, deferred=0)
+            pool_includes_deferred[pool_name] = include_deferred
 
+        allowed_execution_states = EXECUTION_STATES | {
+            TaskInstanceState.DEFERRED,
+        }
         state_count_by_pool = session.execute(
             select(TaskInstance.pool, TaskInstance.state, func.sum(TaskInstance.pool_slots))
-            .filter(TaskInstance.state.in_(EXECUTION_STATES))
+            .filter(TaskInstance.state.in_(allowed_execution_states))
             .group_by(TaskInstance.pool, TaskInstance.state)
         )
 
@@ -192,12 +201,16 @@ class Pool(Base):
                 stats_dict["running"] = count
             elif state == TaskInstanceState.QUEUED:
                 stats_dict["queued"] = count
+            elif state == TaskInstanceState.DEFERRED:
+                stats_dict["deferred"] = count
             else:
-                raise AirflowException(f"Unexpected state. Expected values: {EXECUTION_STATES}.")
+                raise AirflowException(f"Unexpected state. Expected values: {allowed_execution_states}.")
 
         # calculate open metric
         for pool_name, stats_dict in pools.items():
             stats_dict["open"] = stats_dict["total"] - stats_dict["running"] - stats_dict["queued"]
+            if pool_includes_deferred[pool_name]:
+                stats_dict["open"] -= stats_dict["deferred"]
 
         return pools
 
@@ -212,6 +225,7 @@ class Pool(Base):
             "pool": self.pool,
             "slots": self.slots,
             "description": self.description,
+            "include_deferred": self.include_deferred,
         }
 
     @provide_session
@@ -224,15 +238,24 @@ class Pool(Base):
         """
         from airflow.models.taskinstance import TaskInstance  # Avoid circular import
 
+        occupied_states = self.get_occupied_states()
+
         return int(
             session.scalar(
                 select(func.sum(TaskInstance.pool_slots))
                 .filter(TaskInstance.pool == self.pool)
-                .filter(TaskInstance.state.in_(EXECUTION_STATES))
+                .filter(TaskInstance.state.in_(occupied_states))
             )
             or 0
         )
 
+    def get_occupied_states(self):
+        if self.include_deferred:
+            return EXECUTION_STATES | {
+                TaskInstanceState.DEFERRED,
+            }
+        return EXECUTION_STATES
+
     @provide_session
     def running_slots(self, session: Session = NEW_SESSION) -> int:
         """
@@ -290,6 +313,25 @@ class Pool(Base):
             or 0
         )
 
+    @provide_session
+    def deferred_slots(self, session: Session = NEW_SESSION) -> int:
+        """
+        Get the number of slots deferred at the moment.
+
+        :param session: SQLAlchemy ORM Session
+        :return: the number of deferred slots
+        """
+        from airflow.models.taskinstance import TaskInstance  # Avoid circular import
+
+        return int(
+            session.scalar(
+                select(func.sum(TaskInstance.pool_slots)).where(
+                    TaskInstance.pool == self.pool, TaskInstance.state == TaskInstanceState.DEFERRED
+                )
+            )
+            or 0
+        )
+
     @provide_session
     def open_slots(self, session: Session = NEW_SESSION) -> float:
         """
diff --git a/airflow/ti_deps/deps/pool_slots_available_dep.py b/airflow/ti_deps/deps/pool_slots_available_dep.py
index 38bfe31936..e095cfb68e 100644
--- a/airflow/ti_deps/deps/pool_slots_available_dep.py
+++ b/airflow/ti_deps/deps/pool_slots_available_dep.py
@@ -18,7 +18,6 @@
 """This module defines dep for pool slots availability."""
 from __future__ import annotations
 
-from airflow.ti_deps.dependencies_states import EXECUTION_STATES
 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
 from airflow.utils.session import provide_session
 
@@ -49,12 +48,12 @@ class PoolSlotsAvailableDep(BaseTIDep):
                 reason=f"Tasks using non-existent pool '{pool_name}' will not be scheduled"
             )
             return
-        else:
-            # Controlled by UNIQUE key in slot_pool table,
-            # only one result can be returned.
-            open_slots = pools[0].open_slots(session=session)
+        # Controlled by UNIQUE key in slot_pool table,
+        # only one result can be returned.
+        open_slots = pools[0].open_slots(session=session)
 
-        if ti.state in EXECUTION_STATES:
+        occupied_states = pools[0].get_occupied_states()
+        if ti.state in occupied_states:
             open_slots += ti.pool_slots
 
         if open_slots <= (ti.pool_slots - 1):
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 1a4a5db6ae..92ad989800 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -124,6 +124,7 @@ def add_default_pool_if_not_exists(session: Session = NEW_SESSION):
             pool=Pool.DEFAULT_POOL_NAME,
             slots=conf.getint(section="core", key="default_pool_task_slot_count"),
             description="Default pool",
+            include_deferred=False,
         )
         session.add(default_pool)
         session.commit()
diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts
index e2d38d0725..8dead88fc7 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -1239,7 +1239,7 @@ export interface components {
       name?: string;
       /** @description The maximum number of slots that can be assigned to tasks. One job may occupy one or more slots. */
       slots?: number;
-      /** @description The number of slots used by running/queued tasks at the moment. */
+      /** @description The number of slots used by running/queued tasks at the moment. May include deferred tasks if 'include_deferred' is set to true. */
       occupied_slots?: number;
       /** @description The number of slots used by running tasks at the moment. */
       running_slots?: number;
@@ -1249,12 +1249,24 @@ export interface components {
       open_slots?: number;
       /** @description The number of slots used by scheduled tasks at the moment. */
       scheduled_slots?: number;
+      /**
+       * @description The number of slots used by deferred tasks at the moment. Relevant if 'include_deferred' is set to true.
+       *
+       * *New in version 2.7.0*
+       */
+      deferred_slots?: number;
       /**
        * @description The description of the pool.
        *
        * *New in version 2.3.0*
        */
       description?: string | null;
+      /**
+       * @description If set to true, deferred tasks are considered when calculating open pool slots.
+       *
+       * *New in version 2.7.0*
+       */
+      include_deferred?: boolean;
     };
     /**
      * @description Collection of pools.
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 3faedd3d8b..5f12a94969 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -70,7 +70,7 @@ from pendulum.parsing.exceptions import ParserError
 from sqlalchemy import Date, and_, case, desc, func, inspect, or_, select, union_all
 from sqlalchemy.exc import IntegrityError
 from sqlalchemy.orm import Session, joinedload
-from wtforms import validators
+from wtforms import BooleanField, validators
 
 import airflow
 from airflow import models, plugins_manager, settings
@@ -4874,9 +4874,17 @@ class PoolModelView(AirflowModelView):
         permissions.ACTION_CAN_ACCESS_MENU,
     ]
 
-    list_columns = ["pool", "slots", "running_slots", "queued_slots", "scheduled_slots"]
-    add_columns = ["pool", "slots", "description"]
-    edit_columns = ["pool", "slots", "description"]
+    list_columns = ["pool", "slots", "running_slots", "queued_slots", "scheduled_slots", "deferred_slots"]
+    add_columns = ["pool", "slots", "description", "include_deferred"]
+    edit_columns = ["pool", "slots", "description", "include_deferred"]
+
+    # include_deferred is non-nullable, but as a checkbox in the resulting form we want to allow it unchecked
+    include_deferred_field = BooleanField(
+        validators=[validators.Optional()],
+        description="Check to include deferred tasks when calculating open pool slots.",
+    )
+    edit_form_extra_fields = {"include_deferred": include_deferred_field}
+    add_form_extra_fields = {"include_deferred": include_deferred_field}
 
     base_order = ("pool", "asc")
 
@@ -4943,11 +4951,24 @@ class PoolModelView(AirflowModelView):
         else:
             return Markup('<span class="label label-danger">Invalid</span>')
 
+    def fdeferred_slots(self):
+        """Deferred slots rendering."""
+        pool_id = self.get("pool")
+        deferred_slots = self.get("deferred_slots")
+        if pool_id is not None and deferred_slots is not None:
+            url = url_for("TaskInstanceModelView.list", _flt_3_pool=pool_id, _flt_3_state="deferred")
+            return Markup("<a href='{url}'>{deferred_slots}</a>").format(
+                url=url, deferred_slots=deferred_slots
+            )
+        else:
+            return Markup('<span class="label label-danger">Invalid</span>')
+
     formatters_columns = {
         "pool": pool_link,
         "running_slots": frunning_slots,
         "queued_slots": fqueued_slots,
         "scheduled_slots": fscheduled_slots,
+        "deferred_slots": fdeferred_slots,
     }
 
     validators_columns = {"pool": [validators.DataRequired()], "slots": [validators.NumberRange(min=-1)]}
diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
index e4bb1da68a..b18d05a877 100644
--- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
+++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
@@ -214,6 +214,7 @@ Name                                                Description
 ``pool.open_slots.<pool_name>``                     Number of open slots in the pool
 ``pool.queued_slots.<pool_name>``                   Number of queued slots in the pool
 ``pool.running_slots.<pool_name>``                  Number of running slots in the pool
+``pool.deferred_slots.<pool_name>``                 Number of deferred slots in the pool
 ``pool.starving_tasks.<pool_name>``                 Number of starving tasks in the pool
 ``triggers.running.<hostname>``                     Number of triggers currently running for a triggerer (described by hostname)
 =================================================== ========================================================================
diff --git a/docs/apache-airflow/administration-and-deployment/pools.rst b/docs/apache-airflow/administration-and-deployment/pools.rst
index 5395ccfe38..5a906d3f6f 100644
--- a/docs/apache-airflow/administration-and-deployment/pools.rst
+++ b/docs/apache-airflow/administration-and-deployment/pools.rst
@@ -22,7 +22,7 @@ Pools
 
 Some systems can get overwhelmed when too many processes hit them at the same time. Airflow pools can be used to
 **limit the execution parallelism** on arbitrary sets of tasks. The list of pools is managed in the UI
-(``Menu -> Admin -> Pools``) by giving the pools a name and assigning it a number of worker slots.
+(``Menu -> Admin -> Pools``) by giving the pools a name and assigning it a number of worker slots. There you can also decide whether the pool should include :doc:`deferred tasks </authoring-and-scheduling/deferring>` in its calculation of occupied slots.
 
 Tasks can then be associated with one of the existing pools by using the ``pool`` parameter when creating tasks:
 
diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
index 6b5f124e8f..c44ac2c1dd 100644
--- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
@@ -20,7 +20,7 @@ Deferrable Operators & Triggers
 
 Standard :doc:`Operators </core-concepts/operators>` and :doc:`Sensors <../core-concepts/sensors>` take up a full *worker slot* for the entire time they are running, even if they are idle; for example, if you only have 100 worker slots available to run Tasks, and you have 100 DAGs waiting on a Sensor that's currently running but idle, then you *cannot run anything else* - even though your entire Airflow cluster is essentially idle. ``reschedule`` mode for Sensors solves some of this, all [...]
 
-This is where *Deferrable Operators* come in. A deferrable operator is one that is written with the ability to suspend itself and free up the worker when it knows it has to wait, and hand off the job of resuming it to something called a *Trigger*. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors.
+This is where *Deferrable Operators* come in. A deferrable operator is one that is written with the ability to suspend itself and free up the worker when it knows it has to wait, and hand off the job of resuming it to something called a *Trigger*. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors. Note that by default deferred tasks will not use up pool slots, if you would l [...]
 
 *Triggers* are small, asynchronous pieces of Python code designed to be run all together in a single Python process; because they are asynchronous, they are able to all co-exist efficiently. As an overview of how this process works:
 
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256
index f10af90f5b..5f9515fc19 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-7ff18b1eafa528dbdfb62d75151a526de280f73e1edf7fea18da7c64644f0da9
\ No newline at end of file
+3420f98a9dfa230d76be4da57bc4a3641cb929b33b9eafe7ffa41fb4fcc4d283
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg
index 18e5f83b11..ba548c60fd 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -4,11 +4,11 @@
 <!-- Generated by graphviz version 2.43.0 (0)
  -->
 <!-- Title: %3 Pages: 1 -->
-<svg width="1559pt" height="5959pt"
- viewBox="0.00 0.00 1559.00 5958.50" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
-<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 5954.5)">
+<svg width="1559pt" height="5983pt"
+ viewBox="0.00 0.00 1559.00 5982.50" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
+<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 5978.5)">
 <title>%3</title>
-<polygon fill="white" stroke="transparent" points="-4,4 -4,-5954.5 1555,-5954.5 1555,4 -4,4"/>
+<polygon fill="white" stroke="transparent" points="-4,4 -4,-5978.5 1555,-5978.5 1555,4 -4,4"/>
 <!-- ab_permission -->
 <g id="node1" class="node">
 <title>ab_permission</title>
@@ -1228,28 +1228,28 @@
 <g id="edge48" class="edge">
 <title>task_instance&#45;&#45;xcom</title>
 <path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" d="M1166.17,-482.51C1190.54,-462.01 1215.43,-441.43 1239,-422 1245.31,-416.8 1251.76,-411.48 1258.29,-406.14"/>
-<text text-anchor="start" x="1248.29" y="-394.94" font-family="Times,serif" font-size="14.00">1</text>
+<text text-anchor="start" x="1227.29" y="-394.94" font-family="Times,serif" font-size="14.00">0..N</text>
 <text text-anchor="start" x="1166.17" y="-471.31" font-family="Times,serif" font-size="14.00">1</text>
 </g>
 <!-- task_instance&#45;&#45;xcom -->
 <g id="edge49" class="edge">
 <title>task_instance&#45;&#45;xcom</title>
 <path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" d="M1166.17,-500.3C1190.54,-480.01 1215.43,-459.43 1239,-440 1245.31,-434.8 1251.76,-429.48 1258.29,-424.12"/>
-<text text-anchor="start" x="1227.29" y="-412.92" font-family="Times,serif" font-size="14.00">0..N</text>
+<text text-anchor="start" x="1248.29" y="-412.92" font-family="Times,serif" font-size="14.00">1</text>
 <text text-anchor="start" x="1166.17" y="-489.1" font-family="Times,serif" font-size="14.00">1</text>
 </g>
 <!-- task_instance&#45;&#45;xcom -->
 <g id="edge50" class="edge">
 <title>task_instance&#45;&#45;xcom</title>
 <path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" d="M1166.17,-518.08C1190.54,-498.01 1215.43,-477.43 1239,-458 1246.11,-452.14 1253.42,-446.12 1260.79,-440.05"/>
-<text text-anchor="start" x="1250.79" y="-443.85" font-family="Times,serif" font-size="14.00">1</text>
+<text text-anchor="start" x="1229.79" y="-443.85" font-family="Times,serif" font-size="14.00">0..N</text>
 <text text-anchor="start" x="1166.17" y="-506.88" font-family="Times,serif" font-size="14.00">1</text>
 </g>
 <!-- task_instance&#45;&#45;xcom -->
 <g id="edge51" class="edge">
 <title>task_instance&#45;&#45;xcom</title>
 <path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" d="M1166.17,-535.87C1190.54,-516.01 1215.43,-495.43 1239,-476 1252.96,-464.49 1267.64,-452.42 1282.18,-440.26"/>
-<text text-anchor="start" x="1282.18" y="-444.06" font-family="Times,serif" font-size="14.00">0..N</text>
+<text text-anchor="start" x="1272.18" y="-444.06" font-family="Times,serif" font-size="14.00">1</text>
 <text text-anchor="start" x="1166.17" y="-524.67" font-family="Times,serif" font-size="14.00">1</text>
 </g>
 <!-- log_template -->
@@ -1582,43 +1582,47 @@
 <!-- slot_pool -->
 <g id="node41" class="node">
 <title>slot_pool</title>
-<polygon fill="none" stroke="black" points="103.5,-5739 103.5,-5767 279.5,-5767 279.5,-5739 103.5,-5739"/>
-<text text-anchor="start" x="150.5" y="-5750.2" font-family="Helvetica,sans-Serif" font-weight="bold" font-size="16.00">slot_pool</text>
-<polygon fill="none" stroke="black" points="103.5,-5714 103.5,-5739 279.5,-5739 279.5,-5714 103.5,-5714"/>
-<text text-anchor="start" x="108.5" y="-5723.8" font-family="Helvetica,sans-Serif" text-decoration="underline" font-size="14.00">id</text>
-<text text-anchor="start" x="121.5" y="-5723.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="198.5" y="-5723.8" font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="103.5,-5689 103.5,-5714 279.5,-5714 279.5,-5689 103.5,-5689"/>
-<text text-anchor="start" x="108.5" y="-5698.8" font-family="Helvetica,sans-Serif" font-size="14.00">description</text>
-<text text-anchor="start" x="186.5" y="-5698.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
-<polygon fill="none" stroke="black" points="103.5,-5664 103.5,-5689 279.5,-5689 279.5,-5664 103.5,-5664"/>
-<text text-anchor="start" x="108.5" y="-5673.8" font-family="Helvetica,sans-Serif" font-size="14.00">pool</text>
-<text text-anchor="start" x="138.5" y="-5673.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(256)]</text>
-<polygon fill="none" stroke="black" points="103.5,-5639 103.5,-5664 279.5,-5664 279.5,-5639 103.5,-5639"/>
-<text text-anchor="start" x="108.5" y="-5648.8" font-family="Helvetica,sans-Serif" font-size="14.00">slots</text>
-<text text-anchor="start" x="141.5" y="-5648.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="48.5,-5763 48.5,-5791 334.5,-5791 334.5,-5763 48.5,-5763"/>
+<text text-anchor="start" x="150.5" y="-5774.2" font-family="Helvetica,sans-Serif" font-weight="bold" font-size="16.00">slot_pool</text>
+<polygon fill="none" stroke="black" points="48.5,-5738 48.5,-5763 334.5,-5763 334.5,-5738 48.5,-5738"/>
+<text text-anchor="start" x="53.5" y="-5747.8" font-family="Helvetica,sans-Serif" text-decoration="underline" font-size="14.00">id</text>
+<text text-anchor="start" x="66.5" y="-5747.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="143.5" y="-5747.8" font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="48.5,-5713 48.5,-5738 334.5,-5738 334.5,-5713 48.5,-5713"/>
+<text text-anchor="start" x="53.5" y="-5722.8" font-family="Helvetica,sans-Serif" font-size="14.00">description</text>
+<text text-anchor="start" x="131.5" y="-5722.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
+<polygon fill="none" stroke="black" points="48.5,-5688 48.5,-5713 334.5,-5713 334.5,-5688 48.5,-5688"/>
+<text text-anchor="start" x="53.5" y="-5697.8" font-family="Helvetica,sans-Serif" font-size="14.00">include_deferred</text>
+<text text-anchor="start" x="169.5" y="-5697.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [BOOLEAN]</text>
+<text text-anchor="start" x="253.5" y="-5697.8" font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="48.5,-5663 48.5,-5688 334.5,-5688 334.5,-5663 48.5,-5663"/>
+<text text-anchor="start" x="53.5" y="-5672.8" font-family="Helvetica,sans-Serif" font-size="14.00">pool</text>
+<text text-anchor="start" x="83.5" y="-5672.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(256)]</text>
+<polygon fill="none" stroke="black" points="48.5,-5638 48.5,-5663 334.5,-5663 334.5,-5638 48.5,-5638"/>
+<text text-anchor="start" x="53.5" y="-5647.8" font-family="Helvetica,sans-Serif" font-size="14.00">slots</text>
+<text text-anchor="start" x="86.5" y="-5647.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
 </g>
 <!-- variable -->
 <g id="node42" class="node">
 <title>variable</title>
-<polygon fill="none" stroke="black" points="100.5,-5918 100.5,-5946 283.5,-5946 283.5,-5918 100.5,-5918"/>
-<text text-anchor="start" x="155.5" y="-5929.2" font-family="Helvetica,sans-Serif" font-weight="bold" font-size="16.00">variable</text>
-<polygon fill="none" stroke="black" points="100.5,-5893 100.5,-5918 283.5,-5918 283.5,-5893 100.5,-5893"/>
-<text text-anchor="start" x="105.5" y="-5902.8" font-family="Helvetica,sans-Serif" text-decoration="underline" font-size="14.00">id</text>
-<text text-anchor="start" x="118.5" y="-5902.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="195.5" y="-5902.8" font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="100.5,-5868 100.5,-5893 283.5,-5893 283.5,-5868 100.5,-5868"/>
-<text text-anchor="start" x="105.5" y="-5877.8" font-family="Helvetica,sans-Serif" font-size="14.00">description</text>
-<text text-anchor="start" x="183.5" y="-5877.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
-<polygon fill="none" stroke="black" points="100.5,-5843 100.5,-5868 283.5,-5868 283.5,-5843 100.5,-5843"/>
-<text text-anchor="start" x="105.5" y="-5852.8" font-family="Helvetica,sans-Serif" font-size="14.00">is_encrypted</text>
-<text text-anchor="start" x="194.5" y="-5852.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [BOOLEAN]</text>
-<polygon fill="none" stroke="black" points="100.5,-5818 100.5,-5843 283.5,-5843 283.5,-5818 100.5,-5818"/>
-<text text-anchor="start" x="105.5" y="-5827.8" font-family="Helvetica,sans-Serif" font-size="14.00">key</text>
-<text text-anchor="start" x="130.5" y="-5827.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<polygon fill="none" stroke="black" points="100.5,-5793 100.5,-5818 283.5,-5818 283.5,-5793 100.5,-5793"/>
-<text text-anchor="start" x="105.5" y="-5802.8" font-family="Helvetica,sans-Serif" font-size="14.00">val</text>
-<text text-anchor="start" x="126.5" y="-5802.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
+<polygon fill="none" stroke="black" points="100.5,-5942 100.5,-5970 283.5,-5970 283.5,-5942 100.5,-5942"/>
+<text text-anchor="start" x="155.5" y="-5953.2" font-family="Helvetica,sans-Serif" font-weight="bold" font-size="16.00">variable</text>
+<polygon fill="none" stroke="black" points="100.5,-5917 100.5,-5942 283.5,-5942 283.5,-5917 100.5,-5917"/>
+<text text-anchor="start" x="105.5" y="-5926.8" font-family="Helvetica,sans-Serif" text-decoration="underline" font-size="14.00">id</text>
+<text text-anchor="start" x="118.5" y="-5926.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="195.5" y="-5926.8" font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="100.5,-5892 100.5,-5917 283.5,-5917 283.5,-5892 100.5,-5892"/>
+<text text-anchor="start" x="105.5" y="-5901.8" font-family="Helvetica,sans-Serif" font-size="14.00">description</text>
+<text text-anchor="start" x="183.5" y="-5901.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
+<polygon fill="none" stroke="black" points="100.5,-5867 100.5,-5892 283.5,-5892 283.5,-5867 100.5,-5867"/>
+<text text-anchor="start" x="105.5" y="-5876.8" font-family="Helvetica,sans-Serif" font-size="14.00">is_encrypted</text>
+<text text-anchor="start" x="194.5" y="-5876.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [BOOLEAN]</text>
+<polygon fill="none" stroke="black" points="100.5,-5842 100.5,-5867 283.5,-5867 283.5,-5842 100.5,-5842"/>
+<text text-anchor="start" x="105.5" y="-5851.8" font-family="Helvetica,sans-Serif" font-size="14.00">key</text>
+<text text-anchor="start" x="130.5" y="-5851.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<polygon fill="none" stroke="black" points="100.5,-5817 100.5,-5842 283.5,-5842 283.5,-5817 100.5,-5817"/>
+<text text-anchor="start" x="105.5" y="-5826.8" font-family="Helvetica,sans-Serif" font-size="14.00">val</text>
+<text text-anchor="start" x="126.5" y="-5826.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
 </g>
 </g>
 </svg>
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index be796354da..a3a42654f6 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | Revision ID                     | Revises ID        | Airflow Version   | Description                                                  |
 +=================================+===================+===================+==============================================================+
-| ``788397e78828`` (head)         | ``937cbd173ca1``  | ``2.7.0``         | Add custom_operator_name column                              |
+| ``405de8318b3a`` (head)         | ``788397e78828``  | ``2.7.0``         | add include_deferred column to pool                          |
++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
+| ``788397e78828``                | ``937cbd173ca1``  | ``2.7.0``         | Add custom_operator_name column                              |
 +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | ``937cbd173ca1``                | ``c804e5c76e3e``  | ``2.7.0``         | Add index to task_instance table                             |
 +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
diff --git a/tests/api_connexion/endpoints/test_pool_endpoint.py b/tests/api_connexion/endpoints/test_pool_endpoint.py
index c12e2473f2..a0121fccd1 100644
--- a/tests/api_connexion/endpoints/test_pool_endpoint.py
+++ b/tests/api_connexion/endpoints/test_pool_endpoint.py
@@ -63,7 +63,7 @@ class TestBasePoolEndpoints:
 
 class TestGetPools(TestBasePoolEndpoints):
     def test_response_200(self, session):
-        pool_model = Pool(pool="test_pool_a", slots=3)
+        pool_model = Pool(pool="test_pool_a", slots=3, include_deferred=True)
         session.add(pool_model)
         session.commit()
         result = session.query(Pool).all()
@@ -79,8 +79,10 @@ class TestGetPools(TestBasePoolEndpoints):
                     "running_slots": 0,
                     "queued_slots": 0,
                     "scheduled_slots": 0,
+                    "deferred_slots": 0,
                     "open_slots": 128,
                     "description": "Default pool",
+                    "include_deferred": False,
                 },
                 {
                     "name": "test_pool_a",
@@ -89,15 +91,17 @@ class TestGetPools(TestBasePoolEndpoints):
                     "running_slots": 0,
                     "queued_slots": 0,
                     "scheduled_slots": 0,
+                    "deferred_slots": 0,
                     "open_slots": 3,
                     "description": None,
+                    "include_deferred": True,
                 },
             ],
             "total_entries": 2,
         } == response.json
 
     def test_response_200_with_order_by(self, session):
-        pool_model = Pool(pool="test_pool_a", slots=3)
+        pool_model = Pool(pool="test_pool_a", slots=3, include_deferred=True)
         session.add(pool_model)
         session.commit()
         result = session.query(Pool).all()
@@ -113,8 +117,10 @@ class TestGetPools(TestBasePoolEndpoints):
                     "running_slots": 0,
                     "queued_slots": 0,
                     "scheduled_slots": 0,
+                    "deferred_slots": 0,
                     "open_slots": 3,
                     "description": None,
+                    "include_deferred": True,
                 },
                 {
                     "name": "default_pool",
@@ -123,8 +129,10 @@ class TestGetPools(TestBasePoolEndpoints):
                     "running_slots": 0,
                     "queued_slots": 0,
                     "scheduled_slots": 0,
+                    "deferred_slots": 0,
                     "open_slots": 128,
                     "description": "Default pool",
+                    "include_deferred": False,
                 },
             ],
             "total_entries": 2,
@@ -164,7 +172,7 @@ class TestGetPoolsPagination(TestBasePoolEndpoints):
     )
     @provide_session
     def test_limit_and_offset(self, url, expected_pool_ids, session):
-        pools = [Pool(pool=f"test_pool{i}", slots=1) for i in range(1, 121)]
+        pools = [Pool(pool=f"test_pool{i}", slots=1, include_deferred=False) for i in range(1, 121)]
         session.add_all(pools)
         session.commit()
         result = session.query(Pool).count()
@@ -175,7 +183,7 @@ class TestGetPoolsPagination(TestBasePoolEndpoints):
         assert pool_ids == expected_pool_ids
 
     def test_should_respect_page_size_limit_default(self, session):
-        pools = [Pool(pool=f"test_pool{i}", slots=1) for i in range(1, 121)]
+        pools = [Pool(pool=f"test_pool{i}", slots=1, include_deferred=False) for i in range(1, 121)]
         session.add_all(pools)
         session.commit()
         result = session.query(Pool).count()
@@ -185,7 +193,7 @@ class TestGetPoolsPagination(TestBasePoolEndpoints):
         assert len(response.json["pools"]) == 100
 
     def test_should_raise_400_for_invalid_orderby(self, session):
-        pools = [Pool(pool=f"test_pool{i}", slots=1) for i in range(1, 121)]
+        pools = [Pool(pool=f"test_pool{i}", slots=1, include_deferred=False) for i in range(1, 121)]
         session.add_all(pools)
         session.commit()
         result = session.query(Pool).count()
@@ -199,7 +207,7 @@ class TestGetPoolsPagination(TestBasePoolEndpoints):
 
     @conf_vars({("api", "maximum_page_limit"): "150"})
     def test_should_return_conf_max_if_req_max_above_conf(self, session):
-        pools = [Pool(pool=f"test_pool{i}", slots=1) for i in range(1, 200)]
+        pools = [Pool(pool=f"test_pool{i}", slots=1, include_deferred=False) for i in range(1, 200)]
         session.add_all(pools)
         session.commit()
         result = session.query(Pool).count()
@@ -211,7 +219,7 @@ class TestGetPoolsPagination(TestBasePoolEndpoints):
 
 class TestGetPool(TestBasePoolEndpoints):
     def test_response_200(self, session):
-        pool_model = Pool(pool="test_pool_a", slots=3)
+        pool_model = Pool(pool="test_pool_a", slots=3, include_deferred=True)
         session.add(pool_model)
         session.commit()
         response = self.client.get("/api/v1/pools/test_pool_a", environ_overrides={"REMOTE_USER": "test"})
@@ -223,8 +231,10 @@ class TestGetPool(TestBasePoolEndpoints):
             "running_slots": 0,
             "queued_slots": 0,
             "scheduled_slots": 0,
+            "deferred_slots": 0,
             "open_slots": 3,
             "description": None,
+            "include_deferred": True,
         } == response.json
 
     def test_response_404(self):
@@ -246,7 +256,7 @@ class TestGetPool(TestBasePoolEndpoints):
 class TestDeletePool(TestBasePoolEndpoints):
     def test_response_204(self, session):
         pool_name = "test_pool"
-        pool_instance = Pool(pool=pool_name, slots=3)
+        pool_instance = Pool(pool=pool_name, slots=3, include_deferred=False)
         session.add(pool_instance)
         session.commit()
 
@@ -268,7 +278,7 @@ class TestDeletePool(TestBasePoolEndpoints):
 
     def test_should_raises_401_unauthenticated(self, session):
         pool_name = "test_pool"
-        pool_instance = Pool(pool=pool_name, slots=3)
+        pool_instance = Pool(pool=pool_name, slots=3, include_deferred=False)
         session.add(pool_instance)
         session.commit()
 
@@ -285,7 +295,7 @@ class TestPostPool(TestBasePoolEndpoints):
     def test_response_200(self):
         response = self.client.post(
             "api/v1/pools",
-            json={"name": "test_pool_a", "slots": 3, "description": "test pool"},
+            json={"name": "test_pool_a", "slots": 3, "description": "test pool", "include_deferred": True},
             environ_overrides={"REMOTE_USER": "test"},
         )
         assert response.status_code == 200
@@ -296,18 +306,20 @@ class TestPostPool(TestBasePoolEndpoints):
             "running_slots": 0,
             "queued_slots": 0,
             "scheduled_slots": 0,
+            "deferred_slots": 0,
             "open_slots": 3,
             "description": "test pool",
+            "include_deferred": True,
         } == response.json
 
     def test_response_409(self, session):
         pool_name = "test_pool_a"
-        pool_instance = Pool(pool=pool_name, slots=3)
+        pool_instance = Pool(pool=pool_name, slots=3, include_deferred=False)
         session.add(pool_instance)
         session.commit()
         response = self.client.post(
             "api/v1/pools",
-            json={"name": "test_pool_a", "slots": 3},
+            json={"name": "test_pool_a", "slots": 3, "include_deferred": False},
             environ_overrides={"REMOTE_USER": "test"},
         )
         assert response.status_code == 409
@@ -334,7 +346,7 @@ class TestPostPool(TestBasePoolEndpoints):
             pytest.param(
                 {},
                 "Missing required property(ies): ['name', 'slots']",
-                id="for missing pool name AND slots",
+                id="for missing pool name AND slots AND include_deferred",
             ),
             pytest.param(
                 {"name": "invalid_pool", "slots": 3, "extra_field_1": "extra"},
@@ -363,12 +375,12 @@ class TestPostPool(TestBasePoolEndpoints):
 
 class TestPatchPool(TestBasePoolEndpoints):
     def test_response_200(self, session):
-        pool = Pool(pool="test_pool", slots=2)
+        pool = Pool(pool="test_pool", slots=2, include_deferred=True)
         session.add(pool)
         session.commit()
         response = self.client.patch(
             "api/v1/pools/test_pool",
-            json={"name": "test_pool_a", "slots": 3},
+            json={"name": "test_pool_a", "slots": 3, "include_deferred": False},
             environ_overrides={"REMOTE_USER": "test"},
         )
         assert response.status_code == 200
@@ -379,8 +391,10 @@ class TestPatchPool(TestBasePoolEndpoints):
             "open_slots": 3,
             "running_slots": 0,
             "scheduled_slots": 0,
+            "deferred_slots": 0,
             "slots": 3,
             "description": None,
+            "include_deferred": False,
         } == response.json
 
     @pytest.mark.parametrize(
@@ -393,13 +407,13 @@ class TestPatchPool(TestBasePoolEndpoints):
             # Extra properties
             (
                 "{'extra_field': ['Unknown field.']}",
-                {"name": "test_pool_a", "slots": 3, "extra_field": "extra"},
+                {"name": "test_pool_a", "slots": 3, "include_deferred": True, "extra_field": "extra"},
             ),
         ],
     )
     @provide_session
     def test_response_400(self, error_detail, request_json, session):
-        pool = Pool(pool="test_pool", slots=2)
+        pool = Pool(pool="test_pool", slots=2, include_deferred=False)
         session.add(pool)
         session.commit()
         response = self.client.patch(
@@ -414,7 +428,7 @@ class TestPatchPool(TestBasePoolEndpoints):
         } == response.json
 
     def test_should_raises_401_unauthenticated(self, session):
-        pool = Pool(pool="test_pool", slots=2)
+        pool = Pool(pool="test_pool", slots=2, include_deferred=False)
         session.add(pool)
         session.commit()
 
@@ -443,7 +457,7 @@ class TestModifyDefaultPool(TestBasePoolEndpoints):
             pytest.param(
                 400,
                 "api/v1/pools/default_pool",
-                {"name": "test_pool_a", "slots": 3},
+                {"name": "test_pool_a", "slots": 3, "include_deferred": False},
                 {
                     "detail": "Default Pool's name can't be modified",
                     "status": 400,
@@ -455,7 +469,7 @@ class TestModifyDefaultPool(TestBasePoolEndpoints):
             pytest.param(
                 400,
                 "api/v1/pools/default_pool?update_mask=name, slots",
-                {"name": "test_pool_a", "slots": 3},
+                {"name": "test_pool_a", "slots": 3, "include_deferred": False},
                 {
                     "detail": "Default Pool's name can't be modified",
                     "status": 400,
@@ -475,11 +489,49 @@ class TestModifyDefaultPool(TestBasePoolEndpoints):
                     "open_slots": 3,
                     "running_slots": 0,
                     "scheduled_slots": 0,
+                    "deferred_slots": 0,
                     "slots": 3,
                     "description": "Default pool",
+                    "include_deferred": False,
                 },
                 id="200 Update mask with slots",
             ),
+            pytest.param(
+                200,
+                "api/v1/pools/default_pool?update_mask=include_deferred",
+                {"name": "test_pool_a", "include_deferred": True},
+                {
+                    "occupied_slots": 0,
+                    "queued_slots": 0,
+                    "name": "default_pool",
+                    "open_slots": 128,
+                    "running_slots": 0,
+                    "scheduled_slots": 0,
+                    "deferred_slots": 0,
+                    "slots": 128,
+                    "description": "Default pool",
+                    "include_deferred": True,
+                },
+                id="200 Update mask with include_deferred",
+            ),
+            pytest.param(
+                200,
+                "api/v1/pools/default_pool?update_mask=slots,include_deferred",
+                {"name": "test_pool_a", "slots": 3, "include_deferred": True},
+                {
+                    "occupied_slots": 0,
+                    "queued_slots": 0,
+                    "name": "default_pool",
+                    "open_slots": 3,
+                    "running_slots": 0,
+                    "scheduled_slots": 0,
+                    "deferred_slots": 0,
+                    "slots": 3,
+                    "description": "Default pool",
+                    "include_deferred": True,
+                },
+                id="200 Update mask with slots AND include_deferred",
+            ),
             pytest.param(
                 200,
                 "api/v1/pools/default_pool?update_mask=name,slots",
@@ -491,8 +543,10 @@ class TestModifyDefaultPool(TestBasePoolEndpoints):
                     "open_slots": 3,
                     "running_slots": 0,
                     "scheduled_slots": 0,
+                    "deferred_slots": 0,
                     "slots": 3,
                     "description": "Default pool",
+                    "include_deferred": False,
                 },
                 id="200 Update mask with slots and name",
             ),
@@ -502,6 +556,7 @@ class TestModifyDefaultPool(TestBasePoolEndpoints):
                 {
                     "name": "default_pool",
                     "slots": 3,
+                    "include_deferred": True,
                 },
                 {
                     "occupied_slots": 0,
@@ -510,8 +565,10 @@ class TestModifyDefaultPool(TestBasePoolEndpoints):
                     "open_slots": 3,
                     "running_slots": 0,
                     "scheduled_slots": 0,
+                    "deferred_slots": 0,
                     "slots": 3,
                     "description": "Default pool",
+                    "include_deferred": True,
                 },
                 id="200 no update mask",
             ),
@@ -525,37 +582,50 @@ class TestModifyDefaultPool(TestBasePoolEndpoints):
 
 class TestPatchPoolWithUpdateMask(TestBasePoolEndpoints):
     @pytest.mark.parametrize(
-        "url, patch_json, expected_name, expected_slots",
+        "url, patch_json, expected_name, expected_slots, expected_include_deferred",
         [
             (
                 "api/v1/pools/test_pool?update_mask=name, slots",
                 {"name": "test_pool_a", "slots": 2},
                 "test_pool_a",
                 2,
+                False,
             ),
             (
                 "api/v1/pools/test_pool?update_mask=name",
                 {"name": "test_pool_a", "slots": 2},
                 "test_pool_a",
                 3,
+                False,
             ),
             (
                 "api/v1/pools/test_pool?update_mask=slots",
                 {"name": "test_pool_a", "slots": 2},
                 "test_pool",
                 2,
+                False,
             ),
             (
                 "api/v1/pools/test_pool?update_mask=slots",
                 {"slots": 2},
                 "test_pool",
                 2,
+                False,
+            ),
+            (
+                "api/v1/pools/test_pool?update_mask=include_deferred",
+                {"include_deferred": True},
+                "test_pool",
+                3,
+                True,
             ),
         ],
     )
     @provide_session
-    def test_response_200(self, url, patch_json, expected_name, expected_slots, session):
-        pool = Pool(pool="test_pool", slots=3)
+    def test_response_200(
+        self, url, patch_json, expected_name, expected_slots, expected_include_deferred, session
+    ):
+        pool = Pool(pool="test_pool", slots=3, include_deferred=False)
         session.add(pool)
         session.commit()
         response = self.client.patch(url, json=patch_json, environ_overrides={"REMOTE_USER": "test"})
@@ -567,8 +637,10 @@ class TestPatchPoolWithUpdateMask(TestBasePoolEndpoints):
             "running_slots": 0,
             "queued_slots": 0,
             "scheduled_slots": 0,
+            "deferred_slots": 0,
             "open_slots": expected_slots,
             "description": None,
+            "include_deferred": expected_include_deferred,
         } == response.json
 
     @pytest.mark.parametrize(
@@ -602,7 +674,7 @@ class TestPatchPoolWithUpdateMask(TestBasePoolEndpoints):
     )
     @provide_session
     def test_response_400(self, error_detail, url, patch_json, session):
-        pool = Pool(pool="test_pool", slots=3)
+        pool = Pool(pool="test_pool", slots=3, include_deferred=False)
         session.add(pool)
         session.commit()
         response = self.client.patch(url, json=patch_json, environ_overrides={"REMOTE_USER": "test"})
diff --git a/tests/api_connexion/schemas/test_pool_schemas.py b/tests/api_connexion/schemas/test_pool_schemas.py
index f0eb0c0a49..9c5a78994d 100644
--- a/tests/api_connexion/schemas/test_pool_schemas.py
+++ b/tests/api_connexion/schemas/test_pool_schemas.py
@@ -31,7 +31,7 @@ class TestPoolSchema:
 
     @provide_session
     def test_serialize(self, session):
-        pool_model = Pool(pool="test_pool", slots=2)
+        pool_model = Pool(pool="test_pool", slots=2, include_deferred=False)
         session.add(pool_model)
         session.commit()
         pool_instance = session.query(Pool).filter(Pool.pool == pool_model.pool).first()
@@ -43,13 +43,15 @@ class TestPoolSchema:
             "running_slots": 0,
             "queued_slots": 0,
             "scheduled_slots": 0,
+            "deferred_slots": 0,
             "open_slots": 2,
             "description": None,
+            "include_deferred": False,
         }
 
     @provide_session
     def test_deserialize(self, session):
-        pool_dict = {"name": "test_pool", "slots": 3}
+        pool_dict = {"name": "test_pool", "slots": 3, "include_deferred": True}
         deserialized_pool = pool_schema.load(pool_dict, session=session)
         assert not isinstance(deserialized_pool, Pool)  # Checks if load_instance is set to True
 
@@ -62,8 +64,8 @@ class TestPoolCollectionSchema:
         clear_db_pools()
 
     def test_serialize(self):
-        pool_model_a = Pool(pool="test_pool_a", slots=3)
-        pool_model_b = Pool(pool="test_pool_b", slots=3)
+        pool_model_a = Pool(pool="test_pool_a", slots=3, include_deferred=False)
+        pool_model_b = Pool(pool="test_pool_b", slots=3, include_deferred=True)
         instance = PoolCollection(pools=[pool_model_a, pool_model_b], total_entries=2)
         assert {
             "pools": [
@@ -74,8 +76,10 @@ class TestPoolCollectionSchema:
                     "running_slots": 0,
                     "queued_slots": 0,
                     "scheduled_slots": 0,
+                    "deferred_slots": 0,
                     "open_slots": 3,
                     "description": None,
+                    "include_deferred": False,
                 },
                 {
                     "name": "test_pool_b",
@@ -84,8 +88,10 @@ class TestPoolCollectionSchema:
                     "running_slots": 0,
                     "queued_slots": 0,
                     "scheduled_slots": 0,
+                    "deferred_slots": 0,
                     "open_slots": 3,
                     "description": None,
+                    "include_deferred": True,
                 },
             ],
             "total_entries": 2,
diff --git a/tests/api_connexion/test_auth.py b/tests/api_connexion/test_auth.py
index 35491ad7b2..b4cdc4a2bc 100644
--- a/tests/api_connexion/test_auth.py
+++ b/tests/api_connexion/test_auth.py
@@ -78,8 +78,10 @@ class TestBasicAuth(BaseTestAuth):
                     "running_slots": 0,
                     "queued_slots": 0,
                     "scheduled_slots": 0,
+                    "deferred_slots": 0,
                     "open_slots": 128,
                     "description": "Default pool",
+                    "include_deferred": False,
                 },
             ],
             "total_entries": 1,
@@ -153,8 +155,10 @@ class TestSessionAuth(BaseTestAuth):
                     "running_slots": 0,
                     "queued_slots": 0,
                     "scheduled_slots": 0,
+                    "deferred_slots": 0,
                     "open_slots": 128,
                     "description": "Default pool",
+                    "include_deferred": False,
                 },
             ],
             "total_entries": 1,
diff --git a/tests/api_experimental/client/test_local_client.py b/tests/api_experimental/client/test_local_client.py
index 575400a115..6724610670 100644
--- a/tests/api_experimental/client/test_local_client.py
+++ b/tests/api_experimental/client/test_local_client.py
@@ -181,22 +181,26 @@ class TestLocalClient:
             assert session.query(DagModel).filter(DagModel.dag_id == key).count() == 0
 
     def test_get_pool(self):
-        self.client.create_pool(name="foo", slots=1, description="")
+        self.client.create_pool(name="foo", slots=1, description="", include_deferred=False)
         pool = self.client.get_pool(name="foo")
-        assert pool == ("foo", 1, "")
+        assert pool == ("foo", 1, "", False)
 
     def test_get_pool_non_existing_raises(self):
         with pytest.raises(PoolNotFound):
             self.client.get_pool(name="foo")
 
     def test_get_pools(self):
-        self.client.create_pool(name="foo1", slots=1, description="")
-        self.client.create_pool(name="foo2", slots=2, description="")
+        self.client.create_pool(name="foo1", slots=1, description="", include_deferred=False)
+        self.client.create_pool(name="foo2", slots=2, description="", include_deferred=True)
         pools = sorted(self.client.get_pools(), key=lambda p: p[0])
-        assert pools == [("default_pool", 128, "Default pool"), ("foo1", 1, ""), ("foo2", 2, "")]
+        assert pools == [
+            ("default_pool", 128, "Default pool", False),
+            ("foo1", 1, "", False),
+            ("foo2", 2, "", True),
+        ]
 
     def test_create_pool(self):
-        pool = self.client.create_pool(name="foo", slots=1, description="")
+        pool = self.client.create_pool(name="foo", slots=1, description="", include_deferred=False)
         assert pool == ("foo", 1, "")
         with create_session() as session:
             assert session.query(Pool).count() == 2
@@ -207,6 +211,7 @@ class TestLocalClient:
                 name="foo",
                 slots="foo",
                 description="",
+                include_deferred=True,
             )
 
     def test_create_pool_name_too_long(self):
@@ -219,10 +224,11 @@ class TestLocalClient:
                 name=long_name,
                 slots=5,
                 description="",
+                include_deferred=False,
             )
 
     def test_delete_pool(self):
-        self.client.create_pool(name="foo", slots=1, description="")
+        self.client.create_pool(name="foo", slots=1, description="", include_deferred=False)
         with create_session() as session:
             assert session.query(Pool).count() == 2
         self.client.delete_pool(name="foo")
diff --git a/tests/api_experimental/common/experimental/test_pool.py b/tests/api_experimental/common/experimental/test_pool.py
index 4e6bdda7ac..d00f318ffa 100644
--- a/tests/api_experimental/common/experimental/test_pool.py
+++ b/tests/api_experimental/common/experimental/test_pool.py
@@ -45,6 +45,7 @@ class TestPool:
                 pool=name,
                 slots=i,
                 description=name,
+                include_deferred=True,
             )
             self.pools.append(pool)
         with create_session() as session:
@@ -77,10 +78,12 @@ class TestPool:
             assert session.query(models.Pool).count() == self.TOTAL_POOL_COUNT + 1
 
     def test_create_pool_existing(self):
-        pool = pool_api.create_pool(name=self.pools[0].pool, slots=5, description="")
-        assert pool.pool == self.pools[0].pool
+        pool = pool_api.create_pool(name=self.pools[1].pool, slots=5, description="")
+        assert pool.pool == self.pools[1].pool
         assert pool.slots == 5
         assert pool.description == ""
+        # do not change include_deferred on an existing pool
+        assert pool.include_deferred is True
         with create_session() as session:
             assert session.query(models.Pool).count() == self.TOTAL_POOL_COUNT
 
diff --git a/tests/cli/commands/test_pool_command.py b/tests/cli/commands/test_pool_command.py
index ec175912d8..ca76a96514 100644
--- a/tests/cli/commands/test_pool_command.py
+++ b/tests/cli/commands/test_pool_command.py
@@ -19,7 +19,6 @@ from __future__ import annotations
 
 import io
 import json
-import os
 from contextlib import redirect_stdout
 
 import pytest
@@ -67,6 +66,18 @@ class TestCliPools:
         pool_command.pool_set(self.parser.parse_args(["pools", "set", "foo", "1", "test"]))
         assert self.session.query(Pool).count() == 2
 
+    def test_pool_update_deferred(self):
+        pool_command.pool_set(self.parser.parse_args(["pools", "set", "foo", "1", "test"]))
+        assert self.session.query(Pool).filter(Pool.pool == "foo").first().include_deferred is False
+
+        pool_command.pool_set(
+            self.parser.parse_args(["pools", "set", "foo", "1", "test", "--include-deferred"])
+        )
+        assert self.session.query(Pool).filter(Pool.pool == "foo").first().include_deferred is True
+
+        pool_command.pool_set(self.parser.parse_args(["pools", "set", "foo", "1", "test"]))
+        assert self.session.query(Pool).filter(Pool.pool == "foo").first().include_deferred is False
+
     def test_pool_get(self):
         pool_command.pool_set(self.parser.parse_args(["pools", "set", "foo", "1", "test"]))
         pool_command.pool_get(self.parser.parse_args(["pools", "get", "foo"]))
@@ -80,39 +91,57 @@ class TestCliPools:
         with pytest.raises(SystemExit):
             pool_command.pool_import(self.parser.parse_args(["pools", "import", "nonexistent.json"]))
 
-    def test_pool_import_invalid_json(self):
-        with open("pools_import_invalid.json", mode="w") as file:
+    def test_pool_import_invalid_json(self, tmp_path):
+        invalid_pool_import_file_path = tmp_path / "pools_import_invalid.json"
+        with open(invalid_pool_import_file_path, mode="w") as file:
             file.write("not valid json")
 
         with pytest.raises(SystemExit):
-            pool_command.pool_import(self.parser.parse_args(["pools", "import", "pools_import_invalid.json"]))
-
-    def test_pool_import_invalid_pools(self):
-        pool_config_input = {"foo": {"description": "foo_test"}}
-        with open("pools_import_invalid.json", mode="w") as file:
+            pool_command.pool_import(
+                self.parser.parse_args(["pools", "import", str(invalid_pool_import_file_path)])
+            )
+
+    def test_pool_import_invalid_pools(self, tmp_path):
+        invalid_pool_import_file_path = tmp_path / "pools_import_invalid.json"
+        pool_config_input = {"foo": {"description": "foo_test", "include_deferred": False}}
+        with open(invalid_pool_import_file_path, mode="w") as file:
             json.dump(pool_config_input, file)
 
         with pytest.raises(SystemExit):
-            pool_command.pool_import(self.parser.parse_args(["pools", "import", "pools_import_invalid.json"]))
+            pool_command.pool_import(
+                self.parser.parse_args(["pools", "import", str(invalid_pool_import_file_path)])
+            )
 
-    def test_pool_import_export(self):
-        # Create two pools first
+    def test_pool_import_backwards_compatibility(self, tmp_path):
+        pool_import_file_path = tmp_path / "pools_import.json"
         pool_config_input = {
+            # JSON before version 2.7.0 does not contain `include_deferred`
             "foo": {"description": "foo_test", "slots": 1},
-            "default_pool": {"description": "Default pool", "slots": 128},
-            "baz": {"description": "baz_test", "slots": 2},
         }
-        with open("pools_import.json", mode="w") as file:
+        with open(pool_import_file_path, mode="w") as file:
+            json.dump(pool_config_input, file)
+
+        pool_command.pool_import(self.parser.parse_args(["pools", "import", str(pool_import_file_path)]))
+
+        assert self.session.query(Pool).filter(Pool.pool == "foo").first().include_deferred is False
+
+    def test_pool_import_export(self, tmp_path):
+        pool_import_file_path = tmp_path / "pools_import.json"
+        pool_export_file_path = tmp_path / "pools_export.json"
+        pool_config_input = {
+            "foo": {"description": "foo_test", "slots": 1, "include_deferred": True},
+            "default_pool": {"description": "Default pool", "slots": 128, "include_deferred": False},
+            "baz": {"description": "baz_test", "slots": 2, "include_deferred": False},
+        }
+        with open(pool_import_file_path, mode="w") as file:
             json.dump(pool_config_input, file)
 
         # Import json
-        pool_command.pool_import(self.parser.parse_args(["pools", "import", "pools_import.json"]))
+        pool_command.pool_import(self.parser.parse_args(["pools", "import", str(pool_import_file_path)]))
 
         # Export json
-        pool_command.pool_export(self.parser.parse_args(["pools", "export", "pools_export.json"]))
+        pool_command.pool_export(self.parser.parse_args(["pools", "export", str(pool_export_file_path)]))
 
-        with open("pools_export.json") as file:
+        with open(pool_export_file_path) as file:
             pool_config_output = json.load(file)
             assert pool_config_input == pool_config_output, "Input and output pool files are not same"
-        os.remove("pools_import.json")
-        os.remove("pools_export.json")
diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
index a96c5cbddf..eac5fe6802 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -813,7 +813,7 @@ class TestLogsfromTaskRunCommand:
 
         clear_db_pools()
         with create_session() as session:
-            pool = Pool(pool=pool_name, slots=1)
+            pool = Pool(pool=pool_name, slots=1, include_deferred=False)
             session.add(pool)
             session.commit()
 
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index a11b8d5688..e4efd8682e 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -598,6 +598,7 @@ class TestBackfillJob:
         pool = Pool(
             pool="pool_with_two_slots",
             slots=slots,
+            include_deferred=False,
         )
         session.add(pool)
         session.commit()
@@ -961,7 +962,7 @@ class TestBackfillJob:
         Test that queued tasks are executed by BackfillJobRunner
         """
         session = settings.Session()
-        pool = Pool(pool="test_backfill_pooled_task_pool", slots=1)
+        pool = Pool(pool="test_backfill_pooled_task_pool", slots=1, include_deferred=False)
         session.add(pool)
         session.commit()
         session.close()
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 3a3d3f55b4..ed09d9cd18 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -605,8 +605,8 @@ class TestSchedulerJob:
         for ti in tis:
             ti.state = State.SCHEDULED
             session.merge(ti)
-        pool = Pool(pool="a", slots=1, description="haha")
-        pool2 = Pool(pool="b", slots=100, description="haha")
+        pool = Pool(pool="a", slots=1, description="haha", include_deferred=False)
+        pool2 = Pool(pool="b", slots=100, description="haha", include_deferred=False)
         session.add(pool)
         session.add(pool2)
         session.flush()
@@ -732,8 +732,8 @@ class TestSchedulerJob:
 
         dag_id = "SchedulerJobTest.test_find_executable_task_instances_order_priority_with_pools"
 
-        session.add(Pool(pool="pool1", slots=32))
-        session.add(Pool(pool="pool2", slots=32))
+        session.add(Pool(pool="pool1", slots=32, include_deferred=False))
+        session.add(Pool(pool="pool2", slots=32, include_deferred=False))
 
         with dag_maker(dag_id=dag_id, max_active_tasks=2):
             op1 = EmptyOperator(task_id="dummy1", priority_weight=1, pool="pool1")
@@ -888,7 +888,12 @@ class TestSchedulerJob:
         ti = dr.task_instances[0]
         ti.state = State.SCHEDULED
         session.merge(ti)
-        infinite_pool = Pool(pool="infinite_pool", slots=-1, description="infinite pool")
+        infinite_pool = Pool(
+            pool="infinite_pool",
+            slots=-1,
+            description="infinite pool",
+            include_deferred=False,
+        )
         session.add(infinite_pool)
         session.commit()
 
@@ -913,7 +918,7 @@ class TestSchedulerJob:
         ti = dr.task_instances[1]
         ti.state = State.SCHEDULED
         session.merge(ti)
-        some_pool = Pool(pool="some_pool", slots=2, description="my pool")
+        some_pool = Pool(pool="some_pool", slots=2, description="my pool", include_deferred=False)
         session.add(some_pool)
         session.commit()
         with caplog.at_level(logging.WARNING):
@@ -1336,8 +1341,8 @@ class TestSchedulerJob:
         self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
         session = settings.Session()
 
-        pool1 = Pool(pool="pool1", slots=1)
-        pool2 = Pool(pool="pool2", slots=1)
+        pool1 = Pool(pool="pool1", slots=1, include_deferred=False)
+        pool2 = Pool(pool="pool2", slots=1, include_deferred=False)
 
         session.add(pool1)
         session.add(pool2)
@@ -2578,7 +2583,7 @@ class TestSchedulerJob:
                 )
 
             session = settings.Session()
-            pool = Pool(pool="test_scheduler_verify_pool_full", slots=1)
+            pool = Pool(pool="test_scheduler_verify_pool_full", slots=1, include_deferred=False)
             session.add(pool)
             session.flush()
 
@@ -2619,7 +2624,7 @@ class TestSchedulerJob:
                 bash_command="echo hi",
             )
 
-        pool = Pool(pool="test_scheduler_verify_pool_full_2_slots_per_task", slots=6)
+        pool = Pool(pool="test_scheduler_verify_pool_full_2_slots_per_task", slots=6, include_deferred=False)
         session.add(pool)
         session.flush()
 
@@ -2673,8 +2678,8 @@ class TestSchedulerJob:
         dag_d2 = dag_maker.dag
 
         session = settings.Session()
-        pool_p1 = Pool(pool="test_scheduler_keeps_scheduling_pool_full_p1", slots=1)
-        pool_p2 = Pool(pool="test_scheduler_keeps_scheduling_pool_full_p2", slots=10)
+        pool_p1 = Pool(pool="test_scheduler_keeps_scheduling_pool_full_p1", slots=1, include_deferred=False)
+        pool_p2 = Pool(pool="test_scheduler_keeps_scheduling_pool_full_p2", slots=10, include_deferred=False)
         session.add(pool_p1)
         session.add(pool_p2)
         session.flush()
@@ -2751,7 +2756,7 @@ class TestSchedulerJob:
             )
 
         session = settings.Session()
-        pool = Pool(pool="test_scheduler_verify_priority_and_slots", slots=2)
+        pool = Pool(pool="test_scheduler_verify_priority_and_slots", slots=2, include_deferred=False)
         session.add(pool)
         session.flush()
 
diff --git a/tests/models/test_pool.py b/tests/models/test_pool.py
index 2e142814c6..32d30b99bb 100644
--- a/tests/models/test_pool.py
+++ b/tests/models/test_pool.py
@@ -55,6 +55,7 @@ class TestPool:
                 pool=name,
                 slots=i,
                 description=name,
+                include_deferred=False,
             )
             self.pools.append(pool)
         with create_session() as session:
@@ -64,23 +65,27 @@ class TestPool:
         self.clean_db()
 
     def test_open_slots(self, dag_maker):
-        pool = Pool(pool="test_pool", slots=5)
+        pool = Pool(pool="test_pool", slots=5, include_deferred=False)
         with dag_maker(
             dag_id="test_open_slots",
             start_date=DEFAULT_DATE,
         ):
             op1 = EmptyOperator(task_id="dummy1", pool="test_pool")
             op2 = EmptyOperator(task_id="dummy2", pool="test_pool")
+            op3 = EmptyOperator(task_id="dummy3", pool="test_pool")
         dag_maker.create_dagrun()
         ti1 = TI(task=op1, execution_date=DEFAULT_DATE)
         ti2 = TI(task=op2, execution_date=DEFAULT_DATE)
+        ti3 = TI(task=op3, execution_date=DEFAULT_DATE)
         ti1.state = State.RUNNING
         ti2.state = State.QUEUED
+        ti3.state = State.DEFERRED
 
         session = settings.Session()
         session.add(pool)
         session.merge(ti1)
         session.merge(ti2)
+        session.merge(ti3)
         session.commit()
         session.close()
 
@@ -88,23 +93,69 @@ class TestPool:
         assert 1 == pool.running_slots()
         assert 1 == pool.queued_slots()
         assert 2 == pool.occupied_slots()
+        assert 1 == pool.deferred_slots()
         assert {
             "default_pool": {
                 "open": 128,
                 "queued": 0,
                 "total": 128,
                 "running": 0,
+                "deferred": 0,
             },
             "test_pool": {
                 "open": 3,
                 "queued": 1,
                 "running": 1,
+                "deferred": 1,
+                "total": 5,
+            },
+        } == pool.slots_stats()
+
+    def test_open_slots_including_deferred(self, dag_maker):
+        pool = Pool(pool="test_pool", slots=5, include_deferred=True)
+        with dag_maker(
+            dag_id="test_open_slots_including_deferred",
+            start_date=DEFAULT_DATE,
+        ):
+            op1 = EmptyOperator(task_id="dummy1", pool="test_pool")
+            op2 = EmptyOperator(task_id="dummy2", pool="test_pool")
+        dag_maker.create_dagrun()
+        ti1 = TI(task=op1, execution_date=DEFAULT_DATE)
+        ti2 = TI(task=op2, execution_date=DEFAULT_DATE)
+        ti1.state = State.RUNNING
+        ti2.state = State.DEFERRED
+
+        session = settings.Session()
+        session.add(pool)
+        session.merge(ti1)
+        session.merge(ti2)
+        session.commit()
+        session.close()
+
+        assert 3 == pool.open_slots()
+        assert 1 == pool.running_slots()
+        assert 0 == pool.queued_slots()
+        assert 1 == pool.deferred_slots()
+        assert 2 == pool.occupied_slots()
+        assert {
+            "default_pool": {
+                "open": 128,
+                "queued": 0,
+                "total": 128,
+                "running": 0,
+                "deferred": 0,
+            },
+            "test_pool": {
+                "open": 3,
+                "queued": 0,
+                "running": 1,
+                "deferred": 1,
                 "total": 5,
             },
         } == pool.slots_stats()
 
     def test_infinite_slots(self, dag_maker):
-        pool = Pool(pool="test_pool", slots=-1)
+        pool = Pool(pool="test_pool", slots=-1, include_deferred=False)
         with dag_maker(
             dag_id="test_infinite_slots",
         ):
@@ -133,12 +184,14 @@ class TestPool:
                 "queued": 0,
                 "total": 128,
                 "running": 0,
+                "deferred": 0,
             },
             "test_pool": {
                 "open": float("inf"),
                 "queued": 1,
                 "running": 1,
                 "total": float("inf"),
+                "deferred": 0,
             },
         } == pool.slots_stats()
 
@@ -170,6 +223,7 @@ class TestPool:
                 "queued": 2,
                 "total": 5,
                 "running": 1,
+                "deferred": 0,
             }
         } == Pool.slots_stats()
 
@@ -194,18 +248,22 @@ class TestPool:
 
     def test_create_pool(self, session):
         self.add_pools()
-        pool = Pool.create_or_update_pool(name="foo", slots=5, description="")
+        pool = Pool.create_or_update_pool(name="foo", slots=5, description="", include_deferred=True)
         assert pool.pool == "foo"
         assert pool.slots == 5
         assert pool.description == ""
+        assert pool.include_deferred is True
         assert session.query(Pool).count() == self.TOTAL_POOL_COUNT + 1
 
     def test_create_pool_existing(self, session):
         self.add_pools()
-        pool = Pool.create_or_update_pool(name=self.pools[0].pool, slots=5, description="")
+        pool = Pool.create_or_update_pool(
+            name=self.pools[0].pool, slots=5, description="", include_deferred=False
+        )
         assert pool.pool == self.pools[0].pool
         assert pool.slots == 5
         assert pool.description == ""
+        assert pool.include_deferred is False
         assert session.query(Pool).count() == self.TOTAL_POOL_COUNT
 
     def test_delete_pool(self, session):
@@ -223,7 +281,9 @@ class TestPool:
             Pool.delete_pool(Pool.DEFAULT_POOL_NAME)
 
     def test_is_default_pool(self):
-        pool = Pool.create_or_update_pool(name="not_default_pool", slots=1, description="test")
+        pool = Pool.create_or_update_pool(
+            name="not_default_pool", slots=1, description="test", include_deferred=False
+        )
         default_pool = Pool.get_default_pool()
         assert not Pool.is_default_pool(id=pool.id)
         assert Pool.is_default_pool(str(default_pool.id))
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 9e7f5eb48a..ac2f7f7174 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -96,7 +96,7 @@ from tests.test_utils.mock_operators import MockOperator
 @pytest.fixture
 def test_pool():
     with create_session() as session:
-        test_pool = Pool(pool="test_pool", slots=1)
+        test_pool = Pool(pool="test_pool", slots=1, include_deferred=False)
         session.add(test_pool)
         session.flush()
         yield test_pool
diff --git a/tests/operators/test_subdag_operator.py b/tests/operators/test_subdag_operator.py
index 8df1ee9fe3..c814e64a7e 100644
--- a/tests/operators/test_subdag_operator.py
+++ b/tests/operators/test_subdag_operator.py
@@ -88,8 +88,8 @@ class TestSubDagOperator:
         subdag = DAG("parent.child", default_args=default_args)
 
         session = airflow.settings.Session()
-        pool_1 = airflow.models.Pool(pool="test_pool_1", slots=1)
-        pool_10 = airflow.models.Pool(pool="test_pool_10", slots=10)
+        pool_1 = airflow.models.Pool(pool="test_pool_1", slots=1, include_deferred=False)
+        pool_10 = airflow.models.Pool(pool="test_pool_10", slots=10, include_deferred=False)
         session.add(pool_1)
         session.add(pool_10)
         session.commit()
@@ -116,8 +116,8 @@ class TestSubDagOperator:
         subdag = DAG("parent.child", default_args=default_args)
 
         session = airflow.settings.Session()
-        pool_1 = airflow.models.Pool(pool="test_pool_1", slots=1)
-        pool_10 = airflow.models.Pool(pool="test_pool_10", slots=10)
+        pool_1 = airflow.models.Pool(pool="test_pool_1", slots=1, include_deferred=False)
+        pool_10 = airflow.models.Pool(pool="test_pool_10", slots=10, include_deferred=False)
         session.add(pool_1)
         session.add(pool_10)
         session.commit()
diff --git a/tests/ti_deps/deps/test_pool_slots_available_dep.py b/tests/ti_deps/deps/test_pool_slots_available_dep.py
index cba53ef65b..08abba29e4 100644
--- a/tests/ti_deps/deps/test_pool_slots_available_dep.py
+++ b/tests/ti_deps/deps/test_pool_slots_available_dep.py
@@ -23,6 +23,7 @@ from airflow.models import Pool
 from airflow.ti_deps.dependencies_states import EXECUTION_STATES
 from airflow.ti_deps.deps.pool_slots_available_dep import PoolSlotsAvailableDep
 from airflow.utils.session import create_session
+from airflow.utils.state import TaskInstanceState
 from tests.test_utils import db
 
 
@@ -30,8 +31,9 @@ class TestPoolSlotsAvailableDep:
     def setup_method(self):
         db.clear_db_pools()
         with create_session() as session:
-            test_pool = Pool(pool="test_pool")
-            session.add(test_pool)
+            test_pool = Pool(pool="test_pool", include_deferred=False)
+            test_includes_deferred_pool = Pool(pool="test_includes_deferred_pool", include_deferred=True)
+            session.add_all([test_pool, test_includes_deferred_pool])
             session.commit()
 
     def teardown_method(self):
@@ -53,6 +55,13 @@ class TestPoolSlotsAvailableDep:
             ti = Mock(pool="test_pool", state=state, pool_slots=1)
             assert PoolSlotsAvailableDep().is_met(ti=ti)
 
+    @patch("airflow.models.Pool.open_slots", return_value=0)
+    def test_deferred_pooled_task_pass(self, mock_open_slots):
+        ti = Mock(pool="test_includes_deferred_pool", state=TaskInstanceState.DEFERRED, pool_slots=1)
+        assert PoolSlotsAvailableDep().is_met(ti=ti)
+        ti_to_fail = Mock(pool="test_pool", state=TaskInstanceState.DEFERRED, pool_slots=1)
+        assert not PoolSlotsAvailableDep().is_met(ti=ti_to_fail)
+
     def test_task_with_nonexistent_pool(self):
         ti = Mock(pool="nonexistent_pool", pool_slots=1)
         assert not PoolSlotsAvailableDep().is_met(ti=ti)
diff --git a/tests/www/api/experimental/test_endpoints.py b/tests/www/api/experimental/test_endpoints.py
index 86a145f5b8..d42cfb2237 100644
--- a/tests/www/api/experimental/test_endpoints.py
+++ b/tests/www/api/experimental/test_endpoints.py
@@ -383,6 +383,7 @@ class TestPoolApiExperimental(TestBase):
                 pool=name,
                 slots=i,
                 description=name,
+                include_deferred=False,
             )
             self.session.add(pool)
             self.pools.append(pool)
@@ -434,6 +435,7 @@ class TestPoolApiExperimental(TestBase):
         assert pool["pool"] == "foo"
         assert pool["slots"] == 1
         assert pool["description"] == ""
+        assert pool["include_deferred"] is False
         assert self._get_pool_count() == self.TOTAL_POOL_COUNT + 1
 
     def test_create_pool_with_bad_name(self):
diff --git a/tests/www/views/test_views_pool.py b/tests/www/views/test_views_pool.py
index aebcd2c844..e79ec46c56 100644
--- a/tests/www/views/test_views_pool.py
+++ b/tests/www/views/test_views_pool.py
@@ -29,6 +29,7 @@ POOL = {
     "pool": "test-pool",
     "slots": 777,
     "description": "test-pool-description",
+    "include_deferred": False,
 }
 
 
@@ -90,9 +91,13 @@ def test_list(app, admin_client, pool_factory):
         url = flask.url_for("TaskInstanceModelView.list", _flt_3_pool="test-pool", _flt_3_state="scheduled")
         scheduled_tag = markupsafe.Markup("<a href='{url}'>{slots}</a>").format(url=url, slots=0)
 
+        url = flask.url_for("TaskInstanceModelView.list", _flt_3_pool="test-pool", _flt_3_state="deferred")
+        deferred_tag = markupsafe.Markup("<a href='{url}'>{slots}</a>").format(url=url, slots=0)
+
     check_content_in_response(used_tag, resp)
     check_content_in_response(queued_tag, resp)
     check_content_in_response(scheduled_tag, resp)
+    check_content_in_response(deferred_tag, resp)
 
 
 def test_pool_muldelete(session, admin_client, pool_factory):