You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/02 12:57:06 UTC

[GitHub] [airflow] soltanianalytics opened a new pull request #13433: Schedule tasks of cleared dags

soltanianalytics opened a new pull request #13433:
URL: https://github.com/apache/airflow/pull/13433


   See https://github.com/apache/airflow/issues/13407
   
   This is alternative 2: Keep the `max_active_dags` restriction*, but run `DagRun`s in order of `execution_date` instead of using `TI` to infer what `running` `DagRun`s are really _active_ and which are merely _running_.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics commented on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
soltanianalytics commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-757374736


   > This needs unit tests adding to prevent future regressions
   
   I tried writing a test for this but didn't get too far. Could you hint me at how and what do to?
   
   I thought about something along those lines, but I'm pretty sure I'm missing some core concepts and in any case, it's not finished:
   
   ```Python
       def test_do_schedule_cleared_tasks_with_max_active_runs(self):
           """
           Test that the scheduler schedules tasks in DagRun in the order of execution
           date if tasks in more than max_active_runs DagRuns were cleared.
           """
           schedule_interval = timedelta(hours=1)
           task_id = "dummy1"
   
           job = SchedulerJob(subdir=os.devnull)
           job.executor = MockExecutor(do_update=False)
           job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
   
           with DAG(
               dag_id='test_schedule_cleared_tasks_with_max_active_runs',
               start_date=DEFAULT_DATE,
               end_date=DEFAULT_DATE + 3 * schedule_interval,
               schedule_interval=schedule_interval,
               max_active_runs=1,
           ) as dag:
               # Cant use DummyOperator as that goes straight to success
               task1 = BashOperator(task_id=task_id, bash_command='true')
   
           session = settings.Session()
           dagbag = DagBag(
               dag_folder=os.devnull,
               include_examples=False,
               read_dags_from_db=True,
           )
           dagbag.bag_dag(dag=dag, root_dag=dag)
           dagbag.sync_to_db(session=session)
   
           # Create three DagRuns with different execution_dates
           dag_runs = [
               dag.create_dagrun(
                   run_type=DagRunType.SCHEDULED,
                   execution_date=DEFAULT_DATE + i * schedule_interval,
                   state=State.RUNNING,
                   session=session,
               ) for i in range(3)
           ]
           dagbag.sync_to_db(session=session)
   
           # queue the task for the first DagRun, and no other
           assert job._do_scheduling(session) == 1
           assert dag_runs[0].get_task_instance(task_id).state == State.QUEUED
           assert dag_runs[1].get_task_instance(task_id).state == State.NONE
           assert dag_runs[2].get_task_instance(task_id).state == State.NONE
   
           # Do something so that the first two DagRuns succeed and the third is running
           # ???
   
   
           # Clear the first two DagRuns & their tasks somehow
           # ???
   
           # Test that job._do_scheduling(session) only queue the first TI
           assert job._do_scheduling(session) == 1
           assert dag_runs[0].get_task_instance(task_id).state == State.QUEUED
           assert dag_runs[1].get_task_instance(task_id).state == State.NONE
           assert dag_runs[2].get_task_instance(task_id).state == State.RUNNING
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics closed pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
soltanianalytics closed pull request #13433:
URL: https://github.com/apache/airflow/pull/13433


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-764200533


   The following tests are failing:
   
   ```
   FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_0
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_1_up_for_retry
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_2_up_for_reschedule
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_with_task_concurrency_0
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_with_task_concurrency_1_up_for_retry
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_with_task_concurrency_2_up_for_reschedule
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_runs_respected_after_clear
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_scheduler_job_add_new_task
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_should_mark_dummy_task_as_success
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_called_0_success
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_called_1_failed
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_not_added_when_callbacks_are_not_defined_0_success
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_not_added_when_callbacks_are_not_defined_1_failed
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_do_schedule_max_active_runs_upstream_failed
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_verify_pool_full
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_verify_pool_full_2_slots_per_task
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_verify_priority_and_slots
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_verify_integrity_if_dag_changed
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_verify_integrity_if_dag_not_changed
     = 22 failed, 2065 passed, 97 skipped, 1 xfailed, 1 warning in 280.68s (0:04:40) =
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics commented on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
soltanianalytics commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-859591992


   Frankly, I realized that I understood the airflow code base much less than I thought and I didn't continue with this. I won't be able to contribute in the near future so I'll close it for now. Thanks for reaching out!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-753492993


   [The Workflow run](https://github.com/apache/airflow/actions/runs/457889862) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-764200533


   The following tests are failing:
   
   ```
   FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_0
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_1_up_for_retry
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_2_up_for_reschedule
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_with_task_concurrency_0
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_with_task_concurrency_1_up_for_retry
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_with_task_concurrency_2_up_for_reschedule
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_runs_respected_after_clear
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_scheduler_job_add_new_task
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_should_mark_dummy_task_as_success
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_called_0_success
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_called_1_failed
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_not_added_when_callbacks_are_not_defined_0_success
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_not_added_when_callbacks_are_not_defined_1_failed
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_do_schedule_max_active_runs_upstream_failed
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_verify_pool_full
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_verify_pool_full_2_slots_per_task
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_verify_priority_and_slots
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_verify_integrity_if_dag_changed
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_verify_integrity_if_dag_not_changed
     = 22 failed, 2065 passed, 97 skipped, 1 xfailed, 1 warning in 280.68s (0:04:40) =
   ```
   
   Can you take a look please @soltanianalytics 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#discussion_r550889733



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1478,42 +1478,49 @@ def _do_scheduling(self, session) -> int:
                 guard.commit()
                 # END: create dagruns
 
-            dag_runs = DagRun.next_dagruns_to_examine(session)
+            dag_runs = list(DagRun.next_dagruns_to_examine(session))

Review comment:
       Is this necessary? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-753473743


   [The Workflow run](https://github.com/apache/airflow/actions/runs/457694858) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-753473718


   [The Workflow run](https://github.com/apache/airflow/actions/runs/457667111) is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics commented on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
soltanianalytics commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-757374736


   > This needs unit tests adding to prevent future regressions
   
   I tried writing a test for this but didn't get too far. Could you hint me at how and what do to?
   
   I thought about something along those lines, but I'm pretty sure I'm missing some core concepts and in any case, it's not finished:
   
   ```Python
       def test_do_schedule_cleared_tasks_with_max_active_runs(self):
           """
           Test that the scheduler schedules tasks in DagRun in the order of execution
           date if tasks in more than max_active_runs DagRuns were cleared.
           """
           schedule_interval = timedelta(hours=1)
           task_id = "dummy1"
   
           job = SchedulerJob(subdir=os.devnull)
           job.executor = MockExecutor(do_update=False)
           job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
   
           with DAG(
               dag_id='test_schedule_cleared_tasks_with_max_active_runs',
               start_date=DEFAULT_DATE,
               end_date=DEFAULT_DATE + 3 * schedule_interval,
               schedule_interval=schedule_interval,
               max_active_runs=1,
           ) as dag:
               # Cant use DummyOperator as that goes straight to success
               task1 = BashOperator(task_id=task_id, bash_command='true')
   
           session = settings.Session()
           dagbag = DagBag(
               dag_folder=os.devnull,
               include_examples=False,
               read_dags_from_db=True,
           )
           dagbag.bag_dag(dag=dag, root_dag=dag)
           dagbag.sync_to_db(session=session)
   
           # Create three DagRuns with different execution_dates
           dag_runs = [
               dag.create_dagrun(
                   run_type=DagRunType.SCHEDULED,
                   execution_date=DEFAULT_DATE + i * schedule_interval,
                   state=State.RUNNING,
                   session=session,
               ) for i in range(3)
           ]
           dagbag.sync_to_db(session=session)
   
           # queue the task for the first DagRun, and no other
           assert job._do_scheduling(session) == 1
           assert dag_runs[0].get_task_instance(task_id).state == State.QUEUED
           assert dag_runs[1].get_task_instance(task_id).state == State.NONE
           assert dag_runs[2].get_task_instance(task_id).state == State.NONE
   
           # Do something so that the first two DagRuns succeed and the third is running
           # ???
   
   
           # Clear the first two DagRuns & their tasks somehow
           # ???
   
           # Test that job._do_scheduling(session) only queue the first TI
           assert job._do_scheduling(session) == 1
           assert dag_runs[0].get_task_instance(task_id).state == State.QUEUED
           assert dag_runs[1].get_task_instance(task_id).state == State.NONE
           assert dag_runs[2].get_task_instance(task_id).state == State.RUNNING
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-764200533


   The following tests are failing:
   
   ```
   FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_0
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_1_up_for_retry
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_2_up_for_reschedule
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_with_task_concurrency_0
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_with_task_concurrency_1_up_for_retry
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_with_task_concurrency_2_up_for_reschedule
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_runs_respected_after_clear
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_scheduler_job_add_new_task
     FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_should_mark_dummy_task_as_success
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_called_0_success
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_called_1_failed
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_not_added_when_callbacks_are_not_defined_0_success
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_not_added_when_callbacks_are_not_defined_1_failed
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_do_schedule_max_active_runs_upstream_failed
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_verify_pool_full
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_verify_pool_full_2_slots_per_task
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_verify_priority_and_slots
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_verify_integrity_if_dag_changed
     FAILED tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_verify_integrity_if_dag_not_changed
     = 22 failed, 2065 passed, 97 skipped, 1 xfailed, 1 warning in 280.68s (0:04:40) =
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics commented on a change in pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
soltanianalytics commented on a change in pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#discussion_r554445287



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1483,37 +1483,46 @@ def _do_scheduling(self, session) -> int:
             # Bulk fetch the currently active dag runs for the dags we are
             # examining, rather than making one query per DagRun
 
-            # TODO: This query is probably horribly inefficient (though there is an
-            # index on (dag_id,state)). It is to deal with the case when a user
+            # This query will violate max_active_runs by exactly one if tasks in
+            # max_active_runs or more DAGs are cleared while another DAG is
+            # running. It is to deal with the case when a user
             # clears more than max_active_runs older tasks -- we don't want the
             # scheduler to suddenly go and start running tasks from all of the
             # runs. (AIRFLOW-137/GH #1442)
-            #
-            # The longer term fix would be to have `clear` do this, and put DagRuns
-            # in to the queued state, then take DRs out of queued before creating
-            # any new ones

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#discussion_r551856920



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1483,37 +1483,46 @@ def _do_scheduling(self, session) -> int:
             # Bulk fetch the currently active dag runs for the dags we are
             # examining, rather than making one query per DagRun
 
-            # TODO: This query is probably horribly inefficient (though there is an
-            # index on (dag_id,state)). It is to deal with the case when a user
+            # This query will violate max_active_runs by exactly one if tasks in
+            # max_active_runs or more DAGs are cleared while another DAG is
+            # running. It is to deal with the case when a user
             # clears more than max_active_runs older tasks -- we don't want the
             # scheduler to suddenly go and start running tasks from all of the
             # runs. (AIRFLOW-137/GH #1442)
-            #
-            # The longer term fix would be to have `clear` do this, and put DagRuns
-            # in to the queued state, then take DRs out of queued before creating
-            # any new ones

Review comment:
       Please keep this comment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics commented on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
soltanianalytics commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-753471587


   This and https://github.com/apache/airflow/pull/13408 are mutually exclusive


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-757337060


   [The Workflow run](https://github.com/apache/airflow/actions/runs/474339957) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-859676566


   Thanks for the update @soltanianalytics! If you want to take another crack at contributing in the future, I'm happy to help you find issues/bugs that are more approachable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics commented on a change in pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
soltanianalytics commented on a change in pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#discussion_r550893810



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1478,42 +1478,49 @@ def _do_scheduling(self, session) -> int:
                 guard.commit()
                 # END: create dagruns
 
-            dag_runs = DagRun.next_dagruns_to_examine(session)
+            dag_runs = list(DagRun.next_dagruns_to_examine(session))

Review comment:
       Indeed, it is not. Removing it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics commented on a change in pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
soltanianalytics commented on a change in pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#discussion_r554445287



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1483,37 +1483,46 @@ def _do_scheduling(self, session) -> int:
             # Bulk fetch the currently active dag runs for the dags we are
             # examining, rather than making one query per DagRun
 
-            # TODO: This query is probably horribly inefficient (though there is an
-            # index on (dag_id,state)). It is to deal with the case when a user
+            # This query will violate max_active_runs by exactly one if tasks in
+            # max_active_runs or more DAGs are cleared while another DAG is
+            # running. It is to deal with the case when a user
             # clears more than max_active_runs older tasks -- we don't want the
             # scheduler to suddenly go and start running tasks from all of the
             # runs. (AIRFLOW-137/GH #1442)
-            #
-            # The longer term fix would be to have `clear` do this, and put DagRuns
-            # in to the queued state, then take DRs out of queued before creating
-            # any new ones

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-858770123


   @soltanianalytics do you need any help moving this along or have questions about what needs to get done?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-753476558


   [The Workflow run](https://github.com/apache/airflow/actions/runs/457729102) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13433: Schedule tasks of cleared dags

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-757337060


   [The Workflow run](https://github.com/apache/airflow/actions/runs/474339957) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org