You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/04 14:43:28 UTC

[GitHub] [airflow] ashb commented on a change in pull request #20391: Fix task instance concurrency limit in a DAG affecting other DAGs

ashb commented on a change in pull request #20391:
URL: https://github.com/apache/airflow/pull/20391#discussion_r778131474



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -289,7 +290,12 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
             .filter(not_(DM.is_paused))
             .filter(TI.state == TaskInstanceState.SCHEDULED)
             .options(selectinload('dag_model'))
-            .order_by(-TI.priority_weight, DR.execution_date)
+            .order_by(
+                TI.priority_weight.desc(),
+                desc(case([(TI.last_scheduling_decision.is_(None), 1)], else_=0)),
+                TI.last_scheduling_decision.desc(),

Review comment:
       Please use `airflow.utils.sqlalchemy.nulls_first` here if we can.
   
   It might not work out of the box for `DESC` sorted column.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -395,6 +401,15 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
                         dag_id,
                         max_active_tasks_per_dag_limit,
                     )
+                    # update the last_scheduling_decision for all task instances in this dag
+                    session.query(TI).filter(
+                        TI.dag_id == task_instance.dag_id, TI.state == State.SCHEDULED
+                    ).update(
+                        {
+                            TI.last_scheduling_decision: timezone.utcnow(),
+                        },
+                        synchronize_session=False,

Review comment:
       This update is overly broad -- it is going to update _every single task for this DAG_, even completed ones.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org