You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/22 15:24:13 UTC

[GitHub] [airflow] tanelk opened a new pull request #19747: Fix Tasks get stuck in scheduled state

tanelk opened a new pull request #19747:
URL: https://github.com/apache/airflow/pull/19747


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   closes #19622
   
   The `scheduler_job` can get stuck in a state, where it is not able to queue new tasks. It will get out of this state on its own, but the time taken depends on the runtime of current tasks - this could be several hours or even days.
   
   If the scheduler can't queue any tasks because of different concurrency limits (per pool, dag or task), then on next iterations of the scheduler loop it will try to queue the same tasks. Meanwhile there could be some scheduled tasks with lower `priority_weight` that could be queued, but they will remain waiting.
   
   The proposed solution is to increase the offset of the query if we were unable to queue any tasks.
   This is not an ideal solution and I'm open for suggestions. 
   
   I did consider some other solutions, but they felt even worse:
   1.  remove the limit from the query - the query result size would have no upper bound and could cause problems with large instances with many DAGs and tasks
   2.  do the concurrency limit filtering in the query - would require window functions, that might not be supported by all database backends and the `task_concurrency_limit` can not be checked in this way (requires accesing the dags json blob)
   3. introduce some randomness to the query ordering - does not seem to have advantages over the current solution.
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanelk commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
tanelk commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-997346716


   > Tests are failing, can you take a look plz @tanelk
   
   I will be afk for about a week. Will take a look after that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#discussion_r806800672



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -266,192 +266,254 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
-            return executable_tis
+            return []
 
         max_tis = min(max_tis, pool_slots_free)
 
-        # Get all task instances associated with scheduled
-        # DagRuns which are not backfilled, in the given states,
-        # and the dag is not paused
-        query = (
-            session.query(TI)
-            .join(TI.dag_run)
-            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
-            .join(TI.dag_model)
-            .filter(not_(DM.is_paused))
-            .filter(TI.state == TaskInstanceState.SCHEDULED)
-            .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
-        )
-        starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
-        if starved_pools:
-            query = query.filter(not_(TI.pool.in_(starved_pools)))
-
-        query = query.limit(max_tis)
-
-        task_instances_to_examine: List[TI] = with_row_locks(
-            query,
-            of=TI,
-            session=session,
-            **skip_locked(session=session),
-        ).all()
-        # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
-        # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
-
-        if len(task_instances_to_examine) == 0:
-            self.log.debug("No tasks to consider for execution.")
-            return executable_tis
-
-        # Put one task instance on each line
-        task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
-        self.log.info("%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str)
-
-        pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list)
-        for task_instance in task_instances_to_examine:
-            pool_to_task_instances[task_instance.pool].append(task_instance)
+        starved_pools = {pool_name for pool_name, stats in pools.items() if stats['open'] <= 0}
 
         # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks.
-        dag_max_active_tasks_map: DefaultDict[str, int]
+        dag_active_tasks_map: DefaultDict[str, int]
         task_concurrency_map: DefaultDict[Tuple[str, str], int]

Review comment:
       Hmmm, I wonder if this needs to include map_index now so,
   
   ```suggestion
           task_concurrency_map: DefaultDict[Tuple[str, str, int], int]
   ```
   
   Making the tuple (dag_id, task_id, map_index).
   
   See https://github.com/apache/airflow/pull/21210 for a bit more info.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-1075435499


   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-1014354372


   I think we need to benchmark this approach and #20391 and see which one performs better under load


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanelk commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
tanelk commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-976302121


   > Apologies if this comment won't help the discussion, but in my opinion the better approach could may be something being done around the "limit" clause - ` query = query.limit(max_tis)` ([origin](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/jobs/scheduler_job.py#L928)) and ` max_tis = min(self.max_tis_per_query, self.executor.slots_available)` ([origin](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/jobs/scheduler_job.py#L1141))
   > 
   > Because it seems that the assumption that it is a better to limit query of tasks waiting to be queued by potentially available slots might not be the best practice in some cases.
   
   Yes reducing the value of `max_tis` will make this situation worse, but removing the `max_tis = min(...)` does not guarantee that the issue will get solved.
   
   As long as we are using any limit value, there is a chance that none of the selected task instancess can be queued but there are some task instances beyond the limit that could have been scheduled.
   
   I can think of 3 possible solutions, that will never cause the scheduler to get stuck:
   1. Remove the limit - could risk memory issues on a large airflow installation (many dags and tasks)
   2. Build all the concurrency filters into the SQL query - the task instance limit seems to be impossible/very difficult in the current data model.
   3. Some sort of iterative approach to look at the task instances further down - this is proposed in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#discussion_r757474674



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -451,7 +451,8 @@ def test_find_executable_task_instances_pool(self, dag_maker):
         dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
         dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED)
 
-        tis = dr1.task_instances + dr2.task_instances
+        # Sort to avoid flaky tests
+        tis = sorted(dr1.task_instances + dr2.task_instances, key=lambda ti: (ti.run_id, ti.task_id))

Review comment:
       ```suggestion
           tis = sorted(dr1.task_instances + dr2.task_instances, key=lambda ti: ti.key)
   ```
   
   No need for the comment really




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#discussion_r806840464



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -266,192 +266,254 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
-            return executable_tis
+            return []
 
         max_tis = min(max_tis, pool_slots_free)
 
-        # Get all task instances associated with scheduled
-        # DagRuns which are not backfilled, in the given states,
-        # and the dag is not paused
-        query = (
-            session.query(TI)
-            .join(TI.dag_run)
-            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
-            .join(TI.dag_model)
-            .filter(not_(DM.is_paused))
-            .filter(TI.state == TaskInstanceState.SCHEDULED)
-            .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
-        )
-        starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
-        if starved_pools:
-            query = query.filter(not_(TI.pool.in_(starved_pools)))
-
-        query = query.limit(max_tis)
-
-        task_instances_to_examine: List[TI] = with_row_locks(
-            query,
-            of=TI,
-            session=session,
-            **skip_locked(session=session),
-        ).all()
-        # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
-        # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
-
-        if len(task_instances_to_examine) == 0:
-            self.log.debug("No tasks to consider for execution.")
-            return executable_tis
-
-        # Put one task instance on each line
-        task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
-        self.log.info("%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str)
-
-        pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list)
-        for task_instance in task_instances_to_examine:
-            pool_to_task_instances[task_instance.pool].append(task_instance)
+        starved_pools = {pool_name for pool_name, stats in pools.items() if stats['open'] <= 0}
 
         # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks.
-        dag_max_active_tasks_map: DefaultDict[str, int]
+        dag_active_tasks_map: DefaultDict[str, int]
         task_concurrency_map: DefaultDict[Tuple[str, str], int]

Review comment:
       >If you map a task over 10 values, and that task has concurrency limit of 5, then am I right, that we should run max 5 instances at a time. 
   
   Yes, that is correct. each mapped task instance is a separate TI
   
   > Based on that we should count different map_index values just as another task instances - as it is currently.
   
   Cool!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-1074562235


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanelk commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
tanelk commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-1039889302


   @kaxil , @ashb , @ephraimbuddy - any chance to get another review for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanelk commented on a change in pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
tanelk commented on a change in pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#discussion_r806838914



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -266,192 +266,254 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
-            return executable_tis
+            return []
 
         max_tis = min(max_tis, pool_slots_free)
 
-        # Get all task instances associated with scheduled
-        # DagRuns which are not backfilled, in the given states,
-        # and the dag is not paused
-        query = (
-            session.query(TI)
-            .join(TI.dag_run)
-            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
-            .join(TI.dag_model)
-            .filter(not_(DM.is_paused))
-            .filter(TI.state == TaskInstanceState.SCHEDULED)
-            .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
-        )
-        starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
-        if starved_pools:
-            query = query.filter(not_(TI.pool.in_(starved_pools)))
-
-        query = query.limit(max_tis)
-
-        task_instances_to_examine: List[TI] = with_row_locks(
-            query,
-            of=TI,
-            session=session,
-            **skip_locked(session=session),
-        ).all()
-        # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
-        # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
-
-        if len(task_instances_to_examine) == 0:
-            self.log.debug("No tasks to consider for execution.")
-            return executable_tis
-
-        # Put one task instance on each line
-        task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
-        self.log.info("%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str)
-
-        pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list)
-        for task_instance in task_instances_to_examine:
-            pool_to_task_instances[task_instance.pool].append(task_instance)
+        starved_pools = {pool_name for pool_name, stats in pools.items() if stats['open'] <= 0}
 
         # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks.
-        dag_max_active_tasks_map: DefaultDict[str, int]
+        dag_active_tasks_map: DefaultDict[str, int]
         task_concurrency_map: DefaultDict[Tuple[str, str], int]

Review comment:
       I'm not very familiar with thw new map API, so I might be off.
   
   If you map a task over 10 values, and that task has concurrency limit of 5, then am I right, that we should run max 5 instances at a time. Based on that we should count different map_index values just as another task instances - as it is currently.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#discussion_r806802351



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -266,192 +266,254 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
-            return executable_tis
+            return []
 
         max_tis = min(max_tis, pool_slots_free)
 
-        # Get all task instances associated with scheduled
-        # DagRuns which are not backfilled, in the given states,
-        # and the dag is not paused
-        query = (
-            session.query(TI)
-            .join(TI.dag_run)
-            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
-            .join(TI.dag_model)
-            .filter(not_(DM.is_paused))
-            .filter(TI.state == TaskInstanceState.SCHEDULED)
-            .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
-        )
-        starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
-        if starved_pools:
-            query = query.filter(not_(TI.pool.in_(starved_pools)))
-
-        query = query.limit(max_tis)
-
-        task_instances_to_examine: List[TI] = with_row_locks(
-            query,
-            of=TI,
-            session=session,
-            **skip_locked(session=session),
-        ).all()
-        # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
-        # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
-
-        if len(task_instances_to_examine) == 0:
-            self.log.debug("No tasks to consider for execution.")
-            return executable_tis
-
-        # Put one task instance on each line
-        task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
-        self.log.info("%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str)
-
-        pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list)
-        for task_instance in task_instances_to_examine:
-            pool_to_task_instances[task_instance.pool].append(task_instance)
+        starved_pools = {pool_name for pool_name, stats in pools.items() if stats['open'] <= 0}
 
         # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks.
-        dag_max_active_tasks_map: DefaultDict[str, int]
+        dag_active_tasks_map: DefaultDict[str, int]
         task_concurrency_map: DefaultDict[Tuple[str, str], int]

Review comment:
       (I don't know if it does -- just needs thinking about)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanelk commented on a change in pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
tanelk commented on a change in pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#discussion_r806838914



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -266,192 +266,254 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
-            return executable_tis
+            return []
 
         max_tis = min(max_tis, pool_slots_free)
 
-        # Get all task instances associated with scheduled
-        # DagRuns which are not backfilled, in the given states,
-        # and the dag is not paused
-        query = (
-            session.query(TI)
-            .join(TI.dag_run)
-            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
-            .join(TI.dag_model)
-            .filter(not_(DM.is_paused))
-            .filter(TI.state == TaskInstanceState.SCHEDULED)
-            .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
-        )
-        starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
-        if starved_pools:
-            query = query.filter(not_(TI.pool.in_(starved_pools)))
-
-        query = query.limit(max_tis)
-
-        task_instances_to_examine: List[TI] = with_row_locks(
-            query,
-            of=TI,
-            session=session,
-            **skip_locked(session=session),
-        ).all()
-        # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
-        # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
-
-        if len(task_instances_to_examine) == 0:
-            self.log.debug("No tasks to consider for execution.")
-            return executable_tis
-
-        # Put one task instance on each line
-        task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
-        self.log.info("%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str)
-
-        pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list)
-        for task_instance in task_instances_to_examine:
-            pool_to_task_instances[task_instance.pool].append(task_instance)
+        starved_pools = {pool_name for pool_name, stats in pools.items() if stats['open'] <= 0}
 
         # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks.
-        dag_max_active_tasks_map: DefaultDict[str, int]
+        dag_active_tasks_map: DefaultDict[str, int]
         task_concurrency_map: DefaultDict[Tuple[str, str], int]

Review comment:
       I'm not very familiar with thw new map API, so I might be off.
   
   If you map a task over 10 values, and that task has concurrency limit of 5, then am I right, that we should run max 5 instances at a time. Based on that we should count different map_index values just as another task instances - as it is currently.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-997018489


   Just rebased on latest main branch to fix merge conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-1076477408


   Thanks @tanelk! Congrats on your first commit πŸŽ‰


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#discussion_r806800672



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -266,192 +266,254 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
-            return executable_tis
+            return []
 
         max_tis = min(max_tis, pool_slots_free)
 
-        # Get all task instances associated with scheduled
-        # DagRuns which are not backfilled, in the given states,
-        # and the dag is not paused
-        query = (
-            session.query(TI)
-            .join(TI.dag_run)
-            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
-            .join(TI.dag_model)
-            .filter(not_(DM.is_paused))
-            .filter(TI.state == TaskInstanceState.SCHEDULED)
-            .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
-        )
-        starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
-        if starved_pools:
-            query = query.filter(not_(TI.pool.in_(starved_pools)))
-
-        query = query.limit(max_tis)
-
-        task_instances_to_examine: List[TI] = with_row_locks(
-            query,
-            of=TI,
-            session=session,
-            **skip_locked(session=session),
-        ).all()
-        # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
-        # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
-
-        if len(task_instances_to_examine) == 0:
-            self.log.debug("No tasks to consider for execution.")
-            return executable_tis
-
-        # Put one task instance on each line
-        task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
-        self.log.info("%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str)
-
-        pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list)
-        for task_instance in task_instances_to_examine:
-            pool_to_task_instances[task_instance.pool].append(task_instance)
+        starved_pools = {pool_name for pool_name, stats in pools.items() if stats['open'] <= 0}
 
         # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks.
-        dag_max_active_tasks_map: DefaultDict[str, int]
+        dag_active_tasks_map: DefaultDict[str, int]
         task_concurrency_map: DefaultDict[Tuple[str, str], int]

Review comment:
       Hmmm, I wonder if this needs to include map_index now so,
   
   ```suggestion
           task_concurrency_map: DefaultDict[Tuple[str, str, int], int]
   ```
   
   Making the tuple (dag_id, task_id, map_index).
   
   See https://github.com/apache/airflow/pull/21210 for a bit more info.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -266,192 +266,254 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
-            return executable_tis
+            return []
 
         max_tis = min(max_tis, pool_slots_free)
 
-        # Get all task instances associated with scheduled
-        # DagRuns which are not backfilled, in the given states,
-        # and the dag is not paused
-        query = (
-            session.query(TI)
-            .join(TI.dag_run)
-            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
-            .join(TI.dag_model)
-            .filter(not_(DM.is_paused))
-            .filter(TI.state == TaskInstanceState.SCHEDULED)
-            .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
-        )
-        starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
-        if starved_pools:
-            query = query.filter(not_(TI.pool.in_(starved_pools)))
-
-        query = query.limit(max_tis)
-
-        task_instances_to_examine: List[TI] = with_row_locks(
-            query,
-            of=TI,
-            session=session,
-            **skip_locked(session=session),
-        ).all()
-        # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
-        # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
-
-        if len(task_instances_to_examine) == 0:
-            self.log.debug("No tasks to consider for execution.")
-            return executable_tis
-
-        # Put one task instance on each line
-        task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
-        self.log.info("%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str)
-
-        pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list)
-        for task_instance in task_instances_to_examine:
-            pool_to_task_instances[task_instance.pool].append(task_instance)
+        starved_pools = {pool_name for pool_name, stats in pools.items() if stats['open'] <= 0}
 
         # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks.
-        dag_max_active_tasks_map: DefaultDict[str, int]
+        dag_active_tasks_map: DefaultDict[str, int]
         task_concurrency_map: DefaultDict[Tuple[str, str], int]

Review comment:
       (I don't know if it does -- just needs thinking about)

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -266,192 +266,254 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
-            return executable_tis
+            return []
 
         max_tis = min(max_tis, pool_slots_free)
 
-        # Get all task instances associated with scheduled
-        # DagRuns which are not backfilled, in the given states,
-        # and the dag is not paused
-        query = (
-            session.query(TI)
-            .join(TI.dag_run)
-            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
-            .join(TI.dag_model)
-            .filter(not_(DM.is_paused))
-            .filter(TI.state == TaskInstanceState.SCHEDULED)
-            .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
-        )
-        starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
-        if starved_pools:
-            query = query.filter(not_(TI.pool.in_(starved_pools)))
-
-        query = query.limit(max_tis)
-
-        task_instances_to_examine: List[TI] = with_row_locks(
-            query,
-            of=TI,
-            session=session,
-            **skip_locked(session=session),
-        ).all()
-        # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
-        # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
-
-        if len(task_instances_to_examine) == 0:
-            self.log.debug("No tasks to consider for execution.")
-            return executable_tis
-
-        # Put one task instance on each line
-        task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
-        self.log.info("%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str)
-
-        pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list)
-        for task_instance in task_instances_to_examine:
-            pool_to_task_instances[task_instance.pool].append(task_instance)
+        starved_pools = {pool_name for pool_name, stats in pools.items() if stats['open'] <= 0}
 
         # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks.
-        dag_max_active_tasks_map: DefaultDict[str, int]
+        dag_active_tasks_map: DefaultDict[str, int]
         task_concurrency_map: DefaultDict[Tuple[str, str], int]

Review comment:
       >If you map a task over 10 values, and that task has concurrency limit of 5, then am I right, that we should run max 5 instances at a time. 
   
   Yes, that is correct. each mapped task instance is a separate TI
   
   > Based on that we should count different map_index values just as another task instances - as it is currently.
   
   Cool!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-980034274


   A lot more work this way, but I think this looks good. I'll need to take a look at this again with fresher eyes next week.
   
   How much/where have you tested this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #19747: Fix Tasks get stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-975638341


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better πŸš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanelk commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
tanelk commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-976704453


   > This offset approach won't work well with multiple schedulers running
   
   @ashb I now reworked the method


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanelk commented on a change in pull request #19747: Fix Tasks get stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
tanelk commented on a change in pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#discussion_r754384388



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -339,15 +342,15 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
                 # reached.
                 dag_id = task_instance.dag_id
 
-                current_max_active_tasks_per_dag = dag_max_active_tasks_map[dag_id]
+                current_active_tasks_per_dag = dag_active_tasks_map[dag_id]
                 max_active_tasks_per_dag_limit = task_instance.dag_model.max_active_tasks
                 self.log.info(
                     "DAG %s has %s/%s running and queued tasks",
                     dag_id,
-                    current_max_active_tasks_per_dag,
+                    current_active_tasks_per_dag,
                     max_active_tasks_per_dag_limit,
                 )
-                if current_max_active_tasks_per_dag >= max_active_tasks_per_dag_limit:
+                if current_active_tasks_per_dag >= max_active_tasks_per_dag_limit:

Review comment:
       unrelated, but corrected a variable name, that caused some confusion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-1074433640


   @tanelk please rebase. I guess the failing tests have been resolved in main


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #19747:
URL: https://github.com/apache/airflow/pull/19747


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-1028455240


   Okay, I benchmarked 2.2.3, #19747 on latest main, and #20391 on latest main with:
   
   - 100 DAGs, 10 runs each, 10 task per DAG for 10k total TIs
   - 2 workers (1 CPU core each)
   - 1 scheduler (1 CPU core)
   
   It took each this long to get through them all:
   
   * 2.2.3: 51 minutes
   * 19747: 38 minutes
   * 20391: 38 minutes
   
   #19747 however does spew this into the scheduler logs ~16x per second (quite chatty!):
   
   ```
   [2022-02-02 23:21:32,947] {scheduler_job.py:522} INFO - Setting the following tasks to queued state:
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanelk commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
tanelk commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-1003897449


   > Tests are failing, can you take a look plz @tanelk
   
   I fixed the merge issue. The `test_scheduler_job` suite passes now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] vapiravfif commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
vapiravfif commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-976268420


   Apologies if this comment won't help the discussion, but in my opinion the better approach could may be something being done around the "limit" clause - 
   `        query = query.limit(max_tis)` ([origin](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/jobs/scheduler_job.py#L928))
   and 
   `            max_tis = min(self.max_tis_per_query, self.executor.slots_available)` ([origin](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/jobs/scheduler_job.py#L1141))
   
   Because it seems that the assumption that it is a better to limit query of tasks waiting to be queued by potentially available slots might not be the best practice in some cases.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#discussion_r757474205



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -246,159 +246,218 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
-            return executable_tis
+            return []
 
         max_tis = min(max_tis, pool_slots_free)
 
-        # Get all task instances associated with scheduled
-        # DagRuns which are not backfilled, in the given states,
-        # and the dag is not paused
-        query = (
-            session.query(TI)
-            .join(TI.dag_run)
-            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state != DagRunState.QUEUED)
-            .join(TI.dag_model)
-            .filter(not_(DM.is_paused))
-            .filter(TI.state == State.SCHEDULED)
-            .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
-        )
-        starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
-        if starved_pools:
-            query = query.filter(not_(TI.pool.in_(starved_pools)))
-
-        query = query.limit(max_tis)
-
-        task_instances_to_examine: List[TI] = with_row_locks(
-            query,
-            of=TI,
-            session=session,
-            **skip_locked(session=session),
-        ).all()
-        # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
-        # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
-
-        if len(task_instances_to_examine) == 0:
-            self.log.debug("No tasks to consider for execution.")
-            return executable_tis
-
-        # Put one task instance on each line
-        task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
-        self.log.info("%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str)
-
-        pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list)
-        for task_instance in task_instances_to_examine:
-            pool_to_task_instances[task_instance.pool].append(task_instance)
+        starved_pools = {pool_name for pool_name, stats in pools.items() if stats['open'] <= 0}
 
         # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks.
-        dag_max_active_tasks_map: DefaultDict[str, int]
+        dag_active_tasks_map: DefaultDict[str, int]
         task_concurrency_map: DefaultDict[Tuple[str, str], int]
-        dag_max_active_tasks_map, task_concurrency_map = self.__get_concurrency_maps(
+        dag_active_tasks_map, task_concurrency_map = self.__get_concurrency_maps(
             states=list(EXECUTION_STATES), session=session
         )
 
         num_tasks_in_executor = 0
         # Number of tasks that cannot be scheduled because of no open slot in pool
         num_starving_tasks_total = 0
 
-        # Go through each pool, and queue up a task for execution if there are
-        # any open slots in the pool.
+        # dag and task ids that can't be queued because of concurrency limits
+        starved_dags: Set[str] = set()
+        starved_tasks: Set[Tuple[str, str]] = set()
 
-        for pool, task_instances in pool_to_task_instances.items():
-            pool_name = pool
-            if pool not in pools:
-                self.log.warning("Tasks using non-existent pool '%s' will not be scheduled", pool)
-                continue
+        pool_num_starving_tasks = defaultdict(int)
 
-            open_slots = pools[pool]["open"]
+        for loop_count in itertools.count(start=1):
 
-            num_ready = len(task_instances)
-            self.log.info(
-                "Figuring out tasks to run in Pool(name=%s) with %s open slots "
-                "and %s task instances ready to be queued",
-                pool,
-                open_slots,
-                num_ready,
+            num_starved_pools = len(starved_pools)
+            num_starved_dags = len(starved_dags)
+            num_starved_tasks = len(starved_tasks)
+
+            # Get task instances associated with scheduled
+            # DagRuns which are not backfilled, in the given states,
+            # and the dag is not paused
+            query = (
+                session.query(TI)
+                .join(TI.dag_run)
+                .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state != DagRunState.QUEUED)
+                .join(TI.dag_model)
+                .filter(not_(DM.is_paused))
+                .filter(TI.state == State.SCHEDULED)
+                .options(selectinload('dag_model'))
+                .order_by(-TI.priority_weight, DR.execution_date)
             )
 
-            priority_sorted_task_instances = sorted(
-                task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date)
+            if starved_pools:
+                query = query.filter(not_(TI.pool.in_(starved_pools)))
+
+            if starved_dags:
+                query = query.filter(not_(TI.dag_id.in_(starved_dags)))
+
+            if starved_tasks:
+                if settings.Session.bind.dialect.name == 'mssql':
+                    task_filter = or_(
+                        and_(
+                            TaskInstance.dag_id == dag_id,
+                            TaskInstance.task_id == task_id,
+                        )
+                        for (dag_id, task_id) in starved_tasks
+                    )
+                else:
+                    task_filter = tuple_(TaskInstance.dag_id, TaskInstance.task_id).in_(starved_tasks)
+
+                query = query.filter(not_(task_filter))
+
+            query = query.limit(max_tis)
+
+            task_instances_to_examine: List[TI] = with_row_locks(
+                query,
+                of=TI,
+                session=session,
+                **skip_locked(session=session),
+            ).all()
+            # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
+            # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
+
+            if len(task_instances_to_examine) == 0:
+                self.log.debug("No tasks to consider for execution.")
+                break
+
+            # Put one task instance on each line
+            task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
+            self.log.info(
+                "%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str
             )
 
-            num_starving_tasks = 0
-            for current_index, task_instance in enumerate(priority_sorted_task_instances):
-                if open_slots <= 0:
-                    self.log.info("Not scheduling since there are %s open slots in pool %s", open_slots, pool)
-                    # Can't schedule any more since there are no more open slots.
-                    num_unhandled = len(priority_sorted_task_instances) - current_index
-                    num_starving_tasks += num_unhandled
-                    num_starving_tasks_total += num_unhandled
-                    break
-
-                # Check to make sure that the task max_active_tasks of the DAG hasn't been
-                # reached.
-                dag_id = task_instance.dag_id
-
-                current_max_active_tasks_per_dag = dag_max_active_tasks_map[dag_id]
-                max_active_tasks_per_dag_limit = task_instance.dag_model.max_active_tasks
+            pool_to_task_instances: DefaultDict[str, List[TI]] = defaultdict(list)
+            for task_instance in task_instances_to_examine:
+                pool_to_task_instances[task_instance.pool].append(task_instance)
+
+            # Go through each pool, and queue up a task for execution if there are
+            # any open slots in the pool.
+
+            for pool, task_instances in pool_to_task_instances.items():
+                pool_name = pool
+                if pool not in pools:
+                    self.log.warning("Tasks using non-existent pool '%s' will not be scheduled", pool)
+                    starved_pools.add(pool_name)
+                    continue
+
+                open_slots = pools[pool]["open"]
+
+                num_ready = len(task_instances)
                 self.log.info(
-                    "DAG %s has %s/%s running and queued tasks",
-                    dag_id,
-                    current_max_active_tasks_per_dag,
-                    max_active_tasks_per_dag_limit,
+                    "Figuring out tasks to run in Pool(name=%s) with %s open slots "
+                    "and %s task instances ready to be queued",
+                    pool,
+                    open_slots,
+                    num_ready,
+                )
+
+                priority_sorted_task_instances = sorted(
+                    task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date)
                 )
-                if current_max_active_tasks_per_dag >= max_active_tasks_per_dag_limit:
+
+                for current_index, task_instance in enumerate(priority_sorted_task_instances):
+                    if open_slots <= 0:
+                        self.log.info(
+                            "Not scheduling since there are %s open slots in pool %s", open_slots, pool
+                        )
+                        # Can't schedule any more since there are no more open slots.
+                        num_unhandled = len(priority_sorted_task_instances) - current_index
+                        pool_num_starving_tasks[pool_name] += num_unhandled
+                        num_starving_tasks_total += num_unhandled
+                        starved_pools.add(pool_name)
+                        break
+
+                    # Check to make sure that the task max_active_tasks of the DAG hasn't been
+                    # reached.
+                    dag_id = task_instance.dag_id
+
+                    current_active_tasks_per_dag = dag_active_tasks_map[dag_id]
+                    max_active_tasks_per_dag_limit = task_instance.dag_model.max_active_tasks
                     self.log.info(
-                        "Not executing %s since the number of tasks running or queued "
-                        "from DAG %s is >= to the DAG's max_active_tasks limit of %s",
-                        task_instance,
+                        "DAG %s has %s/%s running and queued tasks",
                         dag_id,
+                        current_active_tasks_per_dag,
                         max_active_tasks_per_dag_limit,
                     )
-                    continue
+                    if current_active_tasks_per_dag >= max_active_tasks_per_dag_limit:
+                        self.log.info(
+                            "Not executing %s since the number of tasks running or queued "
+                            "from DAG %s is >= to the DAG's max_active_tasks limit of %s",
+                            task_instance,
+                            dag_id,
+                            max_active_tasks_per_dag_limit,
+                        )
+                        starved_dags.add(dag_id)
+                        continue
+
+                    task_concurrency_limit: Optional[int] = None
+                    if task_instance.dag_model.has_task_concurrency_limits:
+                        # Many dags don't have a task_concurrency, so where we can avoid loading the full
+                        # serialized DAG the better.
+                        serialized_dag = self.dagbag.get_dag(dag_id, session=session)
+                        if serialized_dag.has_task(task_instance.task_id):
+                            task_concurrency_limit = serialized_dag.get_task(
+                                task_instance.task_id
+                            ).max_active_tis_per_dag
+
+                        if task_concurrency_limit is not None:
+                            current_task_concurrency = task_concurrency_map[
+                                (task_instance.dag_id, task_instance.task_id)
+                            ]
+
+                            if current_task_concurrency >= task_concurrency_limit:
+                                self.log.info(
+                                    "Not executing %s since the task concurrency for"
+                                    " this task has been reached.",
+                                    task_instance,
+                                )
+                                starved_tasks.add((task_instance.dag_id, task_instance.task_id))
+                                continue
+
+                    if task_instance.pool_slots > open_slots:
+                        self.log.info(
+                            "Not executing %s since it requires %s slots "
+                            "but there are %s open slots in the pool %s.",
+                            task_instance,
+                            task_instance.pool_slots,
+                            open_slots,
+                            pool,
+                        )
+                        pool_num_starving_tasks[pool_name] += 1
+                        num_starving_tasks_total += 1
+                        starved_tasks.add((task_instance.dag_id, task_instance.task_id))
+                        # Though we can execute tasks with lower priority if there's enough room
+                        continue
+
+                    executable_tis.append(task_instance)
+                    open_slots -= task_instance.pool_slots
+                    dag_active_tasks_map[dag_id] += 1
+                    task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1
+
+                pools[pool]["open"] = open_slots
+
+            is_done = executable_tis or len(task_instances_to_examine) < max_tis
+            # Check this to avoid accidental infinite loops
+            found_new_filters = (
+                len(starved_pools) > num_starved_pools
+                or len(starved_dags) > num_starved_dags
+                or len(starved_tasks) > num_starved_tasks
+            )
 
-                task_concurrency_limit: Optional[int] = None
-                if task_instance.dag_model.has_task_concurrency_limits:
-                    # Many dags don't have a task_concurrency, so where we can avoid loading the full
-                    # serialized DAG the better.
-                    serialized_dag = self.dagbag.get_dag(dag_id, session=session)
-                    if serialized_dag.has_task(task_instance.task_id):
-                        task_concurrency_limit = serialized_dag.get_task(
-                            task_instance.task_id
-                        ).max_active_tis_per_dag
-
-                    if task_concurrency_limit is not None:
-                        current_task_concurrency = task_concurrency_map[
-                            (task_instance.dag_id, task_instance.task_id)
-                        ]
-
-                        if current_task_concurrency >= task_concurrency_limit:
-                            self.log.info(
-                                "Not executing %s since the task concurrency for"
-                                " this task has been reached.",
-                                task_instance,
-                            )
-                            continue
-
-                if task_instance.pool_slots > open_slots:
-                    self.log.info(
-                        "Not executing %s since it requires %s slots "
-                        "but there are %s open slots in the pool %s.",
-                        task_instance,
-                        task_instance.pool_slots,
-                        open_slots,
-                        pool,
-                    )
-                    num_starving_tasks += 1
-                    num_starving_tasks_total += 1
-                    # Though we can execute tasks with lower priority if there's enough room
-                    continue
+            if is_done or not found_new_filters:
+                break
 
-                executable_tis.append(task_instance)
-                open_slots -= task_instance.pool_slots
-                dag_max_active_tasks_map[dag_id] += 1
-                task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1
+            self.log.info(
+                "Found no task instances to queue on the %s. iteration "
+                "but there could be more candidate task instances to check.",
+                loop_count,
+            )

Review comment:
       This one probably feels more appropriate as a debug log




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-997050389


   Tests are failing, can you take a look plz @tanelk 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-1004286263


   I think I prefer this to #20391 which didn't deal with pools


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham edited a comment on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
jedcunningham edited a comment on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-1028455240


   Okay, I benchmarked 2.2.3, #19747 on latest main, and #20391 on latest main with:
   
   - 100 DAGs, 10 runs each, 10 task per DAG for 10k total TIs
   - 2 workers (1 CPU core each)
   - 1 scheduler (1 CPU core)
   
   It took each this long to get through them all:
   
   * 2.2.3: 51 minutes
   * 19747: 38 minutes
   * 20391: 38 minutes
   
   (I should say, yes, they took the same amount of time, I double checked my data)
   
   #19747 however does spew this into the scheduler logs ~16x per second (quite chatty!):
   
   ```
   [2022-02-02 23:21:32,947] {scheduler_job.py:522} INFO - Setting the following tasks to queued state:
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanelk commented on a change in pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
tanelk commented on a change in pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#discussion_r799234159



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -266,192 +266,262 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
-            return executable_tis
+            return []
 
         max_tis = min(max_tis, pool_slots_free)
 
-        # Get all task instances associated with scheduled
-        # DagRuns which are not backfilled, in the given states,
-        # and the dag is not paused
-        query = (
-            session.query(TI)
-            .join(TI.dag_run)
-            .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
-            .join(TI.dag_model)
-            .filter(not_(DM.is_paused))
-            .filter(TI.state == TaskInstanceState.SCHEDULED)
-            .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
-        )
-        starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
-        if starved_pools:
-            query = query.filter(not_(TI.pool.in_(starved_pools)))
-
-        query = query.limit(max_tis)
-
-        task_instances_to_examine: List[TI] = with_row_locks(
-            query,
-            of=TI,
-            session=session,
-            **skip_locked(session=session),
-        ).all()
-        # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
-        # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
-
-        if len(task_instances_to_examine) == 0:
-            self.log.debug("No tasks to consider for execution.")
-            return executable_tis
-
-        # Put one task instance on each line
-        task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
-        self.log.info("%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str)
-
-        pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list)
-        for task_instance in task_instances_to_examine:
-            pool_to_task_instances[task_instance.pool].append(task_instance)
+        starved_pools = {pool_name for pool_name, stats in pools.items() if stats['open'] <= 0}
 
         # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks.
-        dag_max_active_tasks_map: DefaultDict[str, int]
+        dag_active_tasks_map: DefaultDict[str, int]
         task_concurrency_map: DefaultDict[Tuple[str, str], int]
-        dag_max_active_tasks_map, task_concurrency_map = self.__get_concurrency_maps(
+        dag_active_tasks_map, task_concurrency_map = self.__get_concurrency_maps(
             states=list(EXECUTION_STATES), session=session
         )
 
         num_tasks_in_executor = 0
         # Number of tasks that cannot be scheduled because of no open slot in pool
         num_starving_tasks_total = 0
 
-        # Go through each pool, and queue up a task for execution if there are
-        # any open slots in the pool.
+        # dag and task ids that can't be queued because of concurrency limits
+        starved_dags: Set[str] = set()
+        starved_tasks: Set[Tuple[str, str]] = set()
 
-        for pool, task_instances in pool_to_task_instances.items():
-            pool_name = pool
-            if pool not in pools:
-                self.log.warning("Tasks using non-existent pool '%s' will not be scheduled", pool)
-                continue
+        pool_num_starving_tasks = defaultdict(int)
+
+        for loop_count in itertools.count(start=1):
+
+            num_starved_pools = len(starved_pools)
+            num_starved_dags = len(starved_dags)
+            num_starved_tasks = len(starved_tasks)
+
+            # Get task instances associated with scheduled
+            # DagRuns which are not backfilled, in the given states,
+            # and the dag is not paused
+            query = (
+                session.query(TI)
+                .join(TI.dag_run)
+                .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
+                .join(TI.dag_model)
+                .filter(not_(DM.is_paused))
+                .filter(TI.state == TaskInstanceState.SCHEDULED)
+                .options(selectinload('dag_model'))
+                .order_by(-TI.priority_weight, DR.execution_date)
+            )
+
+            if starved_pools:
+                query = query.filter(not_(TI.pool.in_(starved_pools)))
+
+            if starved_dags:
+                query = query.filter(not_(TI.dag_id.in_(starved_dags)))
 
-            pool_total = pools[pool]["total"]
-            for task_instance in task_instances:
-                if task_instance.pool_slots > pool_total:
-                    self.log.warning(
-                        "Not executing %s. Requested pool slots (%s) are greater than "
-                        "total pool slots: '%s' for pool: %s.",
-                        task_instance,
-                        task_instance.pool_slots,
-                        pool_total,
-                        pool,
+            if starved_tasks:
+                if settings.Session.bind.dialect.name == 'mssql':
+                    task_filter = or_(
+                        and_(
+                            TaskInstance.dag_id == dag_id,
+                            TaskInstance.task_id == task_id,
+                        )
+                        for (dag_id, task_id) in starved_tasks
                     )
-                    task_instances.remove(task_instance)
+                else:
+                    task_filter = tuple_(TaskInstance.dag_id, TaskInstance.task_id).in_(starved_tasks)
+
+                query = query.filter(not_(task_filter))
+
+            query = query.limit(max_tis)
 
-            open_slots = pools[pool]["open"]
+            task_instances_to_examine: List[TI] = with_row_locks(
+                query,
+                of=TI,
+                session=session,
+                **skip_locked(session=session),
+            ).all()
+            # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
+            # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
 
-            num_ready = len(task_instances)
+            if len(task_instances_to_examine) == 0:
+                self.log.debug("No tasks to consider for execution.")
+                break
+
+            # Put one task instance on each line
+            task_instance_str = "\n\t".join(repr(x) for x in task_instances_to_examine)
             self.log.info(
-                "Figuring out tasks to run in Pool(name=%s) with %s open slots "
-                "and %s task instances ready to be queued",
-                pool,
-                open_slots,
-                num_ready,
+                "%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str
             )
 
-            priority_sorted_task_instances = sorted(
-                task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date)
-            )
+            pool_to_task_instances: DefaultDict[str, List[TI]] = defaultdict(list)
+            for task_instance in task_instances_to_examine:
+                pool_to_task_instances[task_instance.pool].append(task_instance)
+
+            # Go through each pool, and queue up a task for execution if there are
+            # any open slots in the pool.
 
-            num_starving_tasks = 0
-            for current_index, task_instance in enumerate(priority_sorted_task_instances):
-                if open_slots <= 0:
-                    self.log.info("Not scheduling since there are %s open slots in pool %s", open_slots, pool)
-                    # Can't schedule any more since there are no more open slots.
-                    num_unhandled = len(priority_sorted_task_instances) - current_index
-                    num_starving_tasks += num_unhandled
-                    num_starving_tasks_total += num_unhandled
-                    break
-
-                # Check to make sure that the task max_active_tasks of the DAG hasn't been
-                # reached.
-                dag_id = task_instance.dag_id
-
-                current_max_active_tasks_per_dag = dag_max_active_tasks_map[dag_id]
-                max_active_tasks_per_dag_limit = task_instance.dag_model.max_active_tasks
+            for pool, task_instances in pool_to_task_instances.items():
+                pool_name = pool
+                if pool not in pools:
+                    self.log.warning("Tasks using non-existent pool '%s' will not be scheduled", pool)
+                    starved_pools.add(pool_name)
+                    continue
+
+                pool_total = pools[pool]["total"]
+                open_slots = pools[pool]["open"]
+
+                num_ready = len(task_instances)
                 self.log.info(
-                    "DAG %s has %s/%s running and queued tasks",
-                    dag_id,
-                    current_max_active_tasks_per_dag,
-                    max_active_tasks_per_dag_limit,
+                    "Figuring out tasks to run in Pool(name=%s) with %s open slots "
+                    "and %s task instances ready to be queued",
+                    pool,
+                    open_slots,
+                    num_ready,
+                )
+
+                priority_sorted_task_instances = sorted(
+                    task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date)
                 )
-                if current_max_active_tasks_per_dag >= max_active_tasks_per_dag_limit:
+
+                for current_index, task_instance in enumerate(priority_sorted_task_instances):
+                    if open_slots <= 0:
+                        self.log.info(
+                            "Not scheduling since there are %s open slots in pool %s", open_slots, pool
+                        )
+                        # Can't schedule any more since there are no more open slots.
+                        num_unhandled = len(priority_sorted_task_instances) - current_index
+                        pool_num_starving_tasks[pool_name] += num_unhandled
+                        num_starving_tasks_total += num_unhandled
+                        starved_pools.add(pool_name)
+                        break
+
+                    if task_instance.pool_slots > pool_total:
+                        self.log.warning(
+                            "Not executing %s. Requested pool slots (%s) are greater than "
+                            "total pool slots: '%s' for pool: %s.",
+                            task_instance,
+                            task_instance.pool_slots,
+                            pool_total,
+                            pool,
+                        )
+
+                        starved_tasks.add((task_instance.dag_id, task_instance.task_id))
+                        continue
+
+                    # Check to make sure that the task max_active_tasks of the DAG hasn't been
+                    # reached.
+                    dag_id = task_instance.dag_id
+
+                    current_active_tasks_per_dag = dag_active_tasks_map[dag_id]
+                    max_active_tasks_per_dag_limit = task_instance.dag_model.max_active_tasks
                     self.log.info(
-                        "Not executing %s since the number of tasks running or queued "
-                        "from DAG %s is >= to the DAG's max_active_tasks limit of %s",
-                        task_instance,
+                        "DAG %s has %s/%s running and queued tasks",
                         dag_id,
+                        current_active_tasks_per_dag,
                         max_active_tasks_per_dag_limit,
                     )
-                    continue
-
-                task_concurrency_limit: Optional[int] = None
-                if task_instance.dag_model.has_task_concurrency_limits:
-                    # Many dags don't have a task_concurrency, so where we can avoid loading the full
-                    # serialized DAG the better.
-                    serialized_dag = self.dagbag.get_dag(dag_id, session=session)
-                    # If the dag is missing, fail the task and continue to the next task.
-                    if not serialized_dag:
-                        self.log.error(
-                            "DAG '%s' for task instance %s not found in serialized_dag table",
-                            dag_id,
+                    if current_active_tasks_per_dag >= max_active_tasks_per_dag_limit:
+                        self.log.info(
+                            "Not executing %s since the number of tasks running or queued "
+                            "from DAG %s is >= to the DAG's max_active_tasks limit of %s",
                             task_instance,
+                            dag_id,
+                            max_active_tasks_per_dag_limit,
                         )
-                        session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update(
-                            {TI.state: State.FAILED}, synchronize_session='fetch'
-                        )
+                        starved_dags.add(dag_id)
                         continue
-                    if serialized_dag.has_task(task_instance.task_id):
-                        task_concurrency_limit = serialized_dag.get_task(
-                            task_instance.task_id
-                        ).max_active_tis_per_dag
-
-                    if task_concurrency_limit is not None:
-                        current_task_concurrency = task_concurrency_map[
-                            (task_instance.dag_id, task_instance.task_id)
-                        ]
-
-                        if current_task_concurrency >= task_concurrency_limit:
-                            self.log.info(
-                                "Not executing %s since the task concurrency for"
-                                " this task has been reached.",
+
+                    task_concurrency_limit: Optional[int] = None
+                    if task_instance.dag_model.has_task_concurrency_limits:
+                        # Many dags don't have a task_concurrency, so where we can avoid loading the full
+                        # serialized DAG the better.
+                        serialized_dag = self.dagbag.get_dag(dag_id, session=session)
+                        # If the dag is missing, fail the task and continue to the next task.
+                        if not serialized_dag:
+                            self.log.error(
+                                "DAG '%s' for task instance %s not found in serialized_dag table",
+                                dag_id,
                                 task_instance,
                             )
+                            session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update(
+                                {TI.state: State.FAILED}, synchronize_session='fetch'
+                            )
                             continue
+                        if serialized_dag.has_task(task_instance.task_id):
+                            task_concurrency_limit = serialized_dag.get_task(
+                                task_instance.task_id
+                            ).max_active_tis_per_dag
+                    task_concurrency_limit: Optional[int] = None
+                    if task_instance.dag_model.has_task_concurrency_limits:
+                        # Many dags don't have a task_concurrency, so where we can avoid loading the full
+                        # serialized DAG the better.
+                        serialized_dag = self.dagbag.get_dag(dag_id, session=session)
+                        if serialized_dag.has_task(task_instance.task_id):
+                            task_concurrency_limit = serialized_dag.get_task(
+                                task_instance.task_id
+                            ).max_active_tis_per_dag
+
+                        if task_concurrency_limit is not None:
+                            current_task_concurrency = task_concurrency_map[
+                                (task_instance.dag_id, task_instance.task_id)
+                            ]
+
+                            if current_task_concurrency >= task_concurrency_limit:
+                                self.log.info(
+                                    "Not executing %s since the task concurrency for"
+                                    " this task has been reached.",
+                                    task_instance,
+                                )
+                                starved_tasks.add((task_instance.dag_id, task_instance.task_id))
+                                continue
+
+                    if task_instance.pool_slots > open_slots:
+                        self.log.info(
+                            "Not executing %s since it requires %s slots "
+                            "but there are %s open slots in the pool %s.",
+                            task_instance,
+                            task_instance.pool_slots,
+                            open_slots,
+                            pool,
+                        )
+                        pool_num_starving_tasks[pool_name] += 1
+                        num_starving_tasks_total += 1
+                        starved_tasks.add((task_instance.dag_id, task_instance.task_id))
+                        # Though we can execute tasks with lower priority if there's enough room
+                        continue
 
-                if task_instance.pool_slots > open_slots:
-                    self.log.info(
-                        "Not executing %s since it requires %s slots "
-                        "but there are %s open slots in the pool %s.",
-                        task_instance,
-                        task_instance.pool_slots,
-                        open_slots,
-                        pool,
-                    )
-                    num_starving_tasks += 1
-                    num_starving_tasks_total += 1
-                    # Though we can execute tasks with lower priority if there's enough room
-                    continue
+                    executable_tis.append(task_instance)
+                    open_slots -= task_instance.pool_slots
+                    dag_active_tasks_map[dag_id] += 1
+                    task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1
+
+                pools[pool]["open"] = open_slots
+
+            is_done = executable_tis or len(task_instances_to_examine) < max_tis
+            # Check this to avoid accidental infinite loops
+            found_new_filters = (
+                len(starved_pools) > num_starved_pools
+                or len(starved_dags) > num_starved_dags
+                or len(starved_tasks) > num_starved_tasks
+            )
 
-                executable_tis.append(task_instance)
-                open_slots -= task_instance.pool_slots
-                dag_max_active_tasks_map[dag_id] += 1
-                task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1
+            if is_done or not found_new_filters:
+                break
+
+            self.log.debug(
+                "Found no task instances to queue on the %s. iteration "
+                "but there could be more candidate task instances to check.",
+                loop_count,
+            )
 
+        for pool_name, num_starving_tasks in pool_num_starving_tasks.items():
             Stats.gauge(f'pool.starving_tasks.{pool_name}', num_starving_tasks)
 
         Stats.gauge('scheduler.tasks.starving', num_starving_tasks_total)
         Stats.gauge('scheduler.tasks.running', num_tasks_in_executor)
         Stats.gauge('scheduler.tasks.executable', len(executable_tis))
 
-        task_instance_str = "\n\t".join(repr(x) for x in executable_tis)
-        self.log.info("Setting the following tasks to queued state:\n\t%s", task_instance_str)
         if len(executable_tis) > 0:
+            task_instance_str = "\n\t".join(repr(x) for x in executable_tis)
+            self.log.info("Setting the following tasks to queued state:\n\t%s", task_instance_str)
+

Review comment:
       Moved the noisy logging into the if block




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanelk commented on pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
tanelk commented on pull request #19747:
URL: https://github.com/apache/airflow/pull/19747#issuecomment-1039889302


   @kaxil , @ashb , @ephraimbuddy - any chance to get another review for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy merged pull request #19747: Fix Tasks getting stuck in scheduled state

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #19747:
URL: https://github.com/apache/airflow/pull/19747


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org