You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/02/26 10:19:07 UTC

[GitHub] [airflow] bperson opened a new pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

bperson opened a new pull request #14476:
URL: https://github.com/apache/airflow/pull/14476


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   Currently the scheduler can be stuck in a loop if the query tweaked in this PR continually returns a set of task instances that aren't schedule-able because all their respective pools are full. This is even more apparent when your `max_tis` gets low with the various `min`s in the scheduler main loop code path ( either because your overall pool open slots are low, your parallelism is low, or you've set `max_tis_per_query` too low ).
   
   The unit test reproduces the issue ( though I'm still not convinced it's the best way to show it, I've included further down a "setup", I guess this should be thrown into an integration test instead? ):
   
   The test setup idea is to fill the set of schedule-able TIs returned by that query full of TIs that will starve a first pool from open slots, and then add another set of schedule-able TIs that should be easy to schedule and execute because their execution pool is much bigger. Without filtering out TIs from the first pool ( that will be further discarded in that code path ) we end up with a scheduler stuck until the first pool gets through its work, this starves the second pool for no reason other than the scheduler never getting to them:
   
   Using a minimal "real-life" example ( you need to create a `starving_pool` with only a couple of slots )
   ```
   [2021-02-26 08:56:43,604] {scheduler_job.py:950} INFO - 2 tasks up for execution:
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:20:00+00:00 [scheduled]>
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:10:00+00:00 [scheduled]>
   [2021-02-26 08:56:43,606] {scheduler_job.py:979} INFO - Figuring out tasks to run in Pool(name=starving_pool) with 0 open slots and 2 task instances ready to be queued
   [2021-02-26 08:56:43,606] {scheduler_job.py:994} INFO - Not scheduling since there are 0 open slots in pool starving_pool
   [2021-02-26 08:56:43,606] {scheduler_job.py:1072} INFO - Setting the following tasks to queued state:
   
   [2021-02-26 08:56:44,757] {scheduler_job.py:950} INFO - 2 tasks up for execution:
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:20:00+00:00 [scheduled]>
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:10:00+00:00 [scheduled]>
   [2021-02-26 08:56:44,759] {scheduler_job.py:979} INFO - Figuring out tasks to run in Pool(name=starving_pool) with 0 open slots and 2 task instances ready to be queued
   [2021-02-26 08:56:44,759] {scheduler_job.py:994} INFO - Not scheduling since there are 0 open slots in pool starving_pool
   [2021-02-26 08:56:44,759] {scheduler_job.py:1072} INFO - Setting the following tasks to queued state:
   
   [2021-02-26 08:56:45,068] {scheduler_job.py:950} INFO - 2 tasks up for execution:
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:20:00+00:00 [scheduled]>
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:10:00+00:00 [scheduled]>
   [2021-02-26 08:56:45,070] {scheduler_job.py:979} INFO - Figuring out tasks to run in Pool(name=starving_pool) with 0 open slots and 2 task instances ready to be queued
   [2021-02-26 08:56:45,070] {scheduler_job.py:994} INFO - Not scheduling since there are 0 open slots in pool starving_pool
   [2021-02-26 08:56:45,070] {scheduler_job.py:1072} INFO - Setting the following tasks to queued state:
   
   [2021-02-26 08:56:45,309] {scheduler_job.py:950} INFO - 2 tasks up for execution:
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:20:00+00:00 [scheduled]>
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:10:00+00:00 [scheduled]>
   [2021-02-26 08:56:45,311] {scheduler_job.py:979} INFO - Figuring out tasks to run in Pool(name=starving_pool) with 0 open slots and 2 task instances ready to be queued
   [2021-02-26 08:56:45,311] {scheduler_job.py:994} INFO - Not scheduling since there are 0 open slots in pool starving_pool
   ```
   
   `starved_dag.py`
   ```
   """DAG starved even though its TIs are executable in a non starved pool ( default pool )"""
   from datetime import datetime, timedelta
   
   from airflow.models.dag import DAG
   from airflow.operators.bash import BashOperator
   from airflow.utils.dates import days_ago
   
   DAG_NAME = 'starved_dag'
   
   default_args = {'owner': 'airflow', 'start_date': days_ago(0), 'dagrun_timeout': timedelta(minutes=6)}
   dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *', default_args=default_args)
   
   echoer = BashOperator(
       task_id='echoer',
       bash_command='echo "bonjour"',
       dag=dag,
   )
   
   if __name__ == "__main__":
       dag.cli()
   ```
   
   `starving_pool.py`
   ```
   """DAG to starve schedulable TIs query"""
   from datetime import datetime, timedelta
   
   from airflow.models.dag import DAG
   from airflow.operators.bash import BashOperator
   from airflow.utils.dates import days_ago
   
   DAG_NAME = 'starving_pool'
   
   default_args = {'owner': 'airflow', 'start_date': days_ago(0), 'dagrun_timeout': timedelta(minutes=6)}
   dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *', default_args=default_args)
   
   sleeper = BashOperator(
       task_id='sleeper',
       bash_command='sleep 300',
       dag=dag,
       pool='starving_pool'
   )
   
   if __name__ == "__main__":
       dag.cli()
   ```
   
   We hit this issue in our production stack with real life DAGs. To bypass it, we're currently running with an insanely big unused pool ( `999999` slots :D ), an insanely big `max_tis_per_query`, and an insanely big `parallelism` ( even though we don't have the actual workers behind it ) to make sure that the number of TIs returned by the tweaked query will always be bigger than the maximum number of TIs stuck in starved pools at any point in time.
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#issuecomment-809231755


   Looks good. I pushed one slight change to not include the filter condition when there are no starved pools (SQLA handles this, and emits `WHERE 1 = 1` in the SQL, but this feels better)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#issuecomment-786717857


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#issuecomment-808380956


   @bperson Oh sorry, forgot about this. Could you rebase to latest master, and then ping me once the tests have re-run (pass or fail)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bperson commented on pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
bperson commented on pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#issuecomment-788917182


   @ashb @kaxil any idea about those failing CI runs?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#issuecomment-809295443


   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bperson commented on pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
bperson commented on pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#issuecomment-787121306


   Well, no idea why CI failed this time, looks like something SIGKILL-ed the containers but why?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bperson commented on pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
bperson commented on pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#issuecomment-806469321


   @ashb Anything I can do here? Running those locally through `breeze` with ( probably? ) beefier Docker memory settings than CI seems to work fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#issuecomment-786554069


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/master/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing locally, itโ€™s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better ๐Ÿš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bperson commented on a change in pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
bperson commented on a change in pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#discussion_r583635139



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2645,6 +2645,79 @@ def test_scheduler_verify_pool_full_2_slots_per_task(self):
         # As tasks require 2 slots, only 3 can fit into 6 available
         assert len(task_instances_list) == 3
 
+    def test_scheduler_keeps_scheduling_when_a_pool_is_full(self):
+        """
+        Test task instances in a pool that isn't full keep getting scheduled even when a pool is full.
+        """
+        dag_d1 = DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d1', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t1',
+            dag=dag_d1,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1',
+            bash_command='echo hi',
+        )
+
+        dag_d2 = DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d2', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t2',
+            dag=dag_d2,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2',
+            bash_command='echo hi',
+        )
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+        dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1)
+        dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2)
+        dagbag.sync_to_db()
+
+        session = settings.Session()
+        pool_p1 = Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1', slots=1)
+        pool_p2 = Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2', slots=10)
+        session.add(pool_p1)
+        session.add(pool_p2)
+        session.commit()
+
+        dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1))
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+        scheduler.processor_agent = mock.MagicMock()
+
+        # Create 5 dagruns for each DAG.
+        # To increase the chances the TIs from the "full" pool will get retrieved first, we schedule all
+        # TIs from the first dag first.
+        date = DEFAULT_DATE
+        for _ in range(5):
+            dr = dag_d1.create_dagrun(
+                run_type=DagRunType.SCHEDULED,
+                execution_date=date,
+                state=State.RUNNING,
+            )
+            scheduler._schedule_dag_run(dr, {}, session)
+            date = dag_d1.following_schedule(date)
+
+        date = DEFAULT_DATE
+        for _ in range(5):
+            dr = dag_d2.create_dagrun(
+                run_type=DagRunType.SCHEDULED,
+                execution_date=date,
+                state=State.RUNNING,
+            )
+            scheduler._schedule_dag_run(dr, {}, session)
+            date = dag_d2.following_schedule(date)
+
+        scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
+        task_instances_list2 = scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
+
+        # Make sure we get TIs from a non-full pool in the 2nd list
+        assert len(task_instances_list2) > 0
+        assert all(task_instance.pool != 'test_scheduler_keeps_scheduling_when_a_pool_is_full_p1'

Review comment:
       yeah I guess I will have to go through the `pre-commit` setup to make sure it's clean ;) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#discussion_r583632881



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2645,6 +2645,79 @@ def test_scheduler_verify_pool_full_2_slots_per_task(self):
         # As tasks require 2 slots, only 3 can fit into 6 available
         assert len(task_instances_list) == 3
 
+    def test_scheduler_keeps_scheduling_when_a_pool_is_full(self):
+        """
+        Test task instances in a pool that isn't full keep getting scheduled even when a pool is full.
+        """
+        dag_d1 = DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d1', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t1',
+            dag=dag_d1,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1',
+            bash_command='echo hi',
+        )
+
+        dag_d2 = DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d2', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t2',
+            dag=dag_d2,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2',
+            bash_command='echo hi',
+        )
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+        dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1)
+        dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2)
+        dagbag.sync_to_db()
+
+        session = settings.Session()
+        pool_p1 = Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1', slots=1)
+        pool_p2 = Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2', slots=10)
+        session.add(pool_p1)
+        session.add(pool_p2)
+        session.commit()
+
+        dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1))
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+        scheduler.processor_agent = mock.MagicMock()
+
+        # Create 5 dagruns for each DAG.
+        # To increase the chances the TIs from the "full" pool will get retrieved first, we schedule all TIs from the

Review comment:
       We test with sqlite, postgesql and mysql




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#discussion_r583614358



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2645,6 +2645,79 @@ def test_scheduler_verify_pool_full_2_slots_per_task(self):
         # As tasks require 2 slots, only 3 can fit into 6 available
         assert len(task_instances_list) == 3
 
+    def test_scheduler_keeps_scheduling_when_a_pool_is_full(self):
+        """
+        Test task instances in a pool that isn't full keep getting scheduled even when a pool is full.
+        """
+        dag_d1 = DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d1', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t1',
+            dag=dag_d1,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1',
+            bash_command='echo hi',
+        )
+
+        dag_d2 = DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d2', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t2',
+            dag=dag_d2,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2',
+            bash_command='echo hi',
+        )
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+        dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1)
+        dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2)
+        dagbag.sync_to_db()
+
+        session = settings.Session()
+        pool_p1 = Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1', slots=1)
+        pool_p2 = Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2', slots=10)
+        session.add(pool_p1)
+        session.add(pool_p2)
+        session.commit()
+
+        dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1))
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+        scheduler.processor_agent = mock.MagicMock()
+
+        # Create 5 dagruns for each DAG.
+        # To increase the chances the TIs from the "full" pool will get retrieved first, we schedule all TIs from the

Review comment:
       Chances? Is this test going to fail some of the time?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bperson commented on a change in pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
bperson commented on a change in pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#discussion_r583625358



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2645,6 +2645,79 @@ def test_scheduler_verify_pool_full_2_slots_per_task(self):
         # As tasks require 2 slots, only 3 can fit into 6 available
         assert len(task_instances_list) == 3
 
+    def test_scheduler_keeps_scheduling_when_a_pool_is_full(self):
+        """
+        Test task instances in a pool that isn't full keep getting scheduled even when a pool is full.
+        """
+        dag_d1 = DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d1', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t1',
+            dag=dag_d1,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1',
+            bash_command='echo hi',
+        )
+
+        dag_d2 = DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d2', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t2',
+            dag=dag_d2,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2',
+            bash_command='echo hi',
+        )
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+        dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1)
+        dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2)
+        dagbag.sync_to_db()
+
+        session = settings.Session()
+        pool_p1 = Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1', slots=1)
+        pool_p2 = Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2', slots=10)
+        session.add(pool_p1)
+        session.add(pool_p2)
+        session.commit()
+
+        dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1))
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+        scheduler.processor_agent = mock.MagicMock()
+
+        # Create 5 dagruns for each DAG.
+        # To increase the chances the TIs from the "full" pool will get retrieved first, we schedule all TIs from the

Review comment:
       I'd assume the order of TIs returned by the query are deterministic but since we're not `order`-ing the query results by anything, we're indeed relying on the underlying RDBMS' row ordering and I don't know enough about Airflow' unit test setup ( sqlite? ) to know for sure it's deterministic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#issuecomment-790682935


   @bperson Hmmm, not right now/anymore as the logs aren't availabe and GitHub are having problems so we can't retest.
   
   It _may_ just be a transient faliure, but given both mysql 8 jobs failed, we should re-run to make sure that it isn't a problem there. (It's unlikely, just playing safe)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #14476:
URL: https://github.com/apache/airflow/pull/14476


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bperson commented on pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
bperson commented on pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#issuecomment-787102163


   A bit of renaming / shortening the test name to fix the mysql / postgres error from CI because `pool` is still `50` chars long in [the TI model](https://github.com/apache/airflow/blob/master/airflow/models/taskinstance.py#L275):
   ```
   qlalchemy.exc.DataError: (_mysql_exceptions.DataError) (1406, "Data too long for column 'pool' at row 1")
   ```
   
   I guess there should be a follow-up to #7658 to increase it on the TI model as well?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#discussion_r583633158



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2645,6 +2645,79 @@ def test_scheduler_verify_pool_full_2_slots_per_task(self):
         # As tasks require 2 slots, only 3 can fit into 6 available
         assert len(task_instances_list) == 3
 
+    def test_scheduler_keeps_scheduling_when_a_pool_is_full(self):
+        """
+        Test task instances in a pool that isn't full keep getting scheduled even when a pool is full.
+        """
+        dag_d1 = DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d1', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t1',
+            dag=dag_d1,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1',
+            bash_command='echo hi',
+        )
+
+        dag_d2 = DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d2', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t2',
+            dag=dag_d2,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2',
+            bash_command='echo hi',
+        )
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+        dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1)
+        dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2)
+        dagbag.sync_to_db()
+
+        session = settings.Session()
+        pool_p1 = Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1', slots=1)
+        pool_p2 = Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2', slots=10)
+        session.add(pool_p1)
+        session.add(pool_p2)
+        session.commit()
+
+        dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1))
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+        scheduler.processor_agent = mock.MagicMock()
+
+        # Create 5 dagruns for each DAG.
+        # To increase the chances the TIs from the "full" pool will get retrieved first, we schedule all
+        # TIs from the first dag first.
+        date = DEFAULT_DATE
+        for _ in range(5):
+            dr = dag_d1.create_dagrun(
+                run_type=DagRunType.SCHEDULED,
+                execution_date=date,
+                state=State.RUNNING,
+            )
+            scheduler._schedule_dag_run(dr, {}, session)
+            date = dag_d1.following_schedule(date)
+
+        date = DEFAULT_DATE
+        for _ in range(5):
+            dr = dag_d2.create_dagrun(
+                run_type=DagRunType.SCHEDULED,
+                execution_date=date,
+                state=State.RUNNING,
+            )
+            scheduler._schedule_dag_run(dr, {}, session)
+            date = dag_d2.following_schedule(date)
+
+        scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
+        task_instances_list2 = scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
+
+        # Make sure we get TIs from a non-full pool in the 2nd list
+        assert len(task_instances_list2) > 0
+        assert all(task_instance.pool != 'test_scheduler_keeps_scheduling_when_a_pool_is_full_p1'

Review comment:
       Black is going to want to reformat this I think




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14476: Scheduler: Remove TIs from starved pools from the critical path.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#issuecomment-786578437


   [The Workflow run](https://github.com/apache/airflow/actions/runs/602521116) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org