You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/01 09:16:25 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #17945: Fix max_active_runs properly

ephraimbuddy opened a new pull request #17945:
URL: https://github.com/apache/airflow/pull/17945


   Details later
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dhuang commented on pull request #17945: Fix max_active_runs not allowing moving of queued dagruns to running

Posted by GitBox <gi...@apache.org>.
dhuang commented on pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#issuecomment-925321911


   FWIW can confirm this fixed my issue in https://github.com/apache/airflow/issues/17638, so big thanks! Note I did have an issue after upgrading where the `max_active_runs` columns for some DAGs remained `NULL` even after scheduler parsed them. This seemed to prevent their tasks from getting scheduled. I eventually set it manually in the database and that fixed everything, but not sure if I did something wrong to get in that state in the first place 🤷 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700920851



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -970,48 +971,61 @@ def _start_queued_dagruns(
         session: Session,
     ) -> int:
         """Find DagRuns in queued state and decide moving them to running state"""
-        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
-
-        active_runs_of_dags = defaultdict(
-            lambda: 0,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
-                DagRun.state == State.RUNNING,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+        active_dags = (
+            session.query(DM)
+            .filter(DM.is_active == expression.true(), DM.is_paused == expression.false())
+            .all()
         )
+        max_number = conf.getint('scheduler', 'max_dagruns_per_loop_to_schedule', fallback=20)
+        # There's a possibility of exceeding this max_number or going below it as a result of the logic below
+        if len(active_dags) > max_number:
+            max_number = 1
+        elif len(active_dags) > 0:

Review comment:
       Why 1? I don't get this logic




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700921878



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -970,48 +971,61 @@ def _start_queued_dagruns(
         session: Session,
     ) -> int:
         """Find DagRuns in queued state and decide moving them to running state"""
-        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
-
-        active_runs_of_dags = defaultdict(
-            lambda: 0,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
-                DagRun.state == State.RUNNING,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+        active_dags = (
+            session.query(DM)
+            .filter(DM.is_active == expression.true(), DM.is_paused == expression.false())

Review comment:
       There’s a loop below that iterates through `active_dags`. (I was thinking the same until realising the loop’s existence.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700869586



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -970,48 +971,61 @@ def _start_queued_dagruns(
         session: Session,
     ) -> int:
         """Find DagRuns in queued state and decide moving them to running state"""
-        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
-
-        active_runs_of_dags = defaultdict(
-            lambda: 0,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
-                DagRun.state == State.RUNNING,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+        active_dags = (
+            session.query(DM)
+            .filter(DM.is_active == expression.true(), DM.is_paused == expression.false())
+            .all()
         )
+        max_number = conf.getint('scheduler', 'max_dagruns_per_loop_to_schedule', fallback=20)
+        # There's a possibility of exceeding this max_number or going below it as a result of the logic below
+        if len(active_dags) > max_number:
+            max_number = 1
+        elif len(active_dags) > 0:
+            max_number = round(max_number / len(active_dags))
 
-        def _update_state(dag_run):
-            dag_run.state = State.RUNNING
-            dag_run.start_date = timezone.utcnow()
-            expected_start_date = dag.following_schedule(dag_run.execution_date)
-            if expected_start_date:
-                schedule_delay = dag_run.start_date - expected_start_date
-                Stats.timing(
-                    f'dagrun.schedule_delay.{dag.dag_id}',
-                    schedule_delay,
-                )
+        for dag_model in active_dags:
 
-        for dag_run in dag_runs:
-            try:
-                dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
-            except SerializedDagNotFound:
-                self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
-                continue
-            active_runs = active_runs_of_dags[dag_run.dag_id]
-            if dag.max_active_runs and active_runs >= dag.max_active_runs:
-                self.log.debug(
-                    "DAG %s already has %d active runs, not moving any more runs to RUNNING state %s",
-                    dag.dag_id,
-                    active_runs,
-                    dag_run.execution_date,
+            queued_dagruns = DR.get_next_queued_dagruns_for_dag(dag_model.dag_id, session, max_number)
+
+            active_runs_of_dags = defaultdict(
+                lambda: 0,
+                session.query(DagRun.dag_id, func.count('*'))
+                .filter(
+                    DagRun.dag_id == dag_model.dag_id,
+                    DagRun.state == State.RUNNING,
                 )

Review comment:
       ```suggestion
               active_runs_of_dags = defaultdict(
                   int,
                   session.query(DagRun.dag_id, func.count('*'))
                   .filter(
                       DagRun.dag_id == dag_model.dag_id,
                       DagRun.state == State.RUNNING,
                   )
   ```
   
   (Not particularly hard on this, just personaly preference.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700923927



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -970,48 +971,61 @@ def _start_queued_dagruns(
         session: Session,
     ) -> int:
         """Find DagRuns in queued state and decide moving them to running state"""
-        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
-
-        active_runs_of_dags = defaultdict(
-            lambda: 0,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
-                DagRun.state == State.RUNNING,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+        active_dags = (
+            session.query(DM)
+            .filter(DM.is_active == expression.true(), DM.is_paused == expression.false())
+            .all()
         )
+        max_number = conf.getint('scheduler', 'max_dagruns_per_loop_to_schedule', fallback=20)
+        # There's a possibility of exceeding this max_number or going below it as a result of the logic below
+        if len(active_dags) > max_number:
+            max_number = 1
+        elif len(active_dags) > 0:

Review comment:
       For example, the total active dags are 30, which is greater than `max_dagruns_per_loop_to_schedule`(20), so we check 1 dagrun from each dag which will now exceed the `max_dagruns_per_loop_to_schedule` at the end




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17945: Fix max_active_runs not allowing moving of queued dagruns to running

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#issuecomment-925327425


   > FWIW can confirm this fixed my issue in #17638, so big thanks! Note I did have an issue after upgrading where the `max_active_runs` columns for some DAGs remained `NULL` even after scheduler parsed them. This seemed to prevent their tasks from getting scheduled. I eventually set it manually in the database and that fixed everything, but not sure if I did something wrong to get in that state in the first place 🤷
   
   Interesting 
   cc: @kaxil 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700921368



##########
File path: airflow/models/dagrun.py
##########
@@ -224,6 +224,26 @@ def next_dagruns_to_examine(
             query.limit(max_number), of=cls, session=session, **skip_locked(session=session)
         )
 
+    @classmethod
+    def get_next_queued_dagruns_for_dag(cls, dag_id: str, session: Session, max_number: Optional[int] = None):
+        """Get queued dagruns for a particular dag to examine"""
+        if max_number is None:
+            max_number = cls.DEFAULT_DAGRUNS_TO_EXAMINE
+        query = (
+            session.query(cls)
+            .filter(cls.dag_id == dag_id, cls.state == State.QUEUED)
+            .order_by(
+                nulls_first(cls.last_scheduling_decision, session=session),
+                cls.execution_date,
+            )
+        )
+        if not settings.ALLOW_FUTURE_EXEC_DATES:
+            query = query.filter(DagRun.execution_date <= func.now())
+
+        return with_row_locks(
+            query.limit(max_number), of=cls, session=session, **skip_locked(session=session)
+        )

Review comment:
       I feel it should be here so we can test it separately? The method calling it from scheduler is already big. Fine with me to move it to where it's used but will like to hear what you think again




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700866674



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -970,48 +971,61 @@ def _start_queued_dagruns(
         session: Session,
     ) -> int:
         """Find DagRuns in queued state and decide moving them to running state"""
-        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
-
-        active_runs_of_dags = defaultdict(
-            lambda: 0,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
-                DagRun.state == State.RUNNING,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+        active_dags = (
+            session.query(DM)
+            .filter(DM.is_active == expression.true(), DM.is_paused == expression.false())

Review comment:
       I think this can just be `DM.is_active == True, DM.is_paused == False`, but no hard preferences on this (the `==` boolean call is very ugly anyway).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17945: Fix max_active_runs not allowing moving of queued dagruns to running

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#issuecomment-911552412


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700868168



##########
File path: airflow/models/dagrun.py
##########
@@ -224,6 +224,26 @@ def next_dagruns_to_examine(
             query.limit(max_number), of=cls, session=session, **skip_locked(session=session)
         )
 
+    @classmethod
+    def get_next_queued_dagruns_for_dag(cls, dag_id: str, session: Session, max_number: Optional[int] = None):
+        """Get queued dagruns for a particular dag to examine"""
+        if max_number is None:
+            max_number = cls.DEFAULT_DAGRUNS_TO_EXAMINE
+        query = (
+            session.query(cls)
+            .filter(cls.dag_id == dag_id, cls.state == State.QUEUED)
+            .order_by(
+                nulls_first(cls.last_scheduling_decision, session=session),
+                cls.execution_date,
+            )
+        )
+        if not settings.ALLOW_FUTURE_EXEC_DATES:
+            query = query.filter(DagRun.execution_date <= func.now())
+
+        return with_row_locks(
+            query.limit(max_number), of=cls, session=session, **skip_locked(session=session)
+        )

Review comment:
       This whole function, especially the `with_row_locks` part, feels much too specific, and should be directly inlined where it’s used instead of being a function on DagRun.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#issuecomment-911495466


   > > will not schedule the newer dagruns with effect that only one dagruns would be in running at any time
   > 
   > When you say wont schedule -- do you mean that the dagruns will take a long time to get out of queued state, but once a dag run is in running state tasks should be scheduled fine -- cos the code you have changed only affects DagRuns going from queued -> running state -- nothing there will affect task scheduling.
   
   You are right. Not scheduling but moving from queued to running, I will update it rightly now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#issuecomment-911357411


   What happens is that the method `DagRun.next_dagruns_to_examine` gets the earliest dagruns without considering the dag that has the dagrun.
   For example:
   If you have a dag with execution_date 2020,1,1 and set catchup=True, max_active_runs=1, schedule_interval='@daily' and another dag with execution_date 2021,1,1 and also set catchup=True, schedule_interval='@daily'.
   When you unpause the two dags(the one with max_active_runs first), the dagruns would be created but only one dagrun would be active because of how DagRun.next_dagruns_to_examine works.
   (worried about performance on this PR. Please advice)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#issuecomment-911493695


   > will not schedule the newer dagruns with effect that only one dagruns would be in running at any time
   
   When you say wont schedule -- do you mean that the dagruns will take a long time to get out of queued state, but once a dag run is in running state tasks should be scheduled fine -- cos the code you have changed only affects DagRuns going from queued -> running state -- nothing there  will affect task scheduling.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700918376



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -970,48 +971,61 @@ def _start_queued_dagruns(
         session: Session,
     ) -> int:
         """Find DagRuns in queued state and decide moving them to running state"""
-        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
-
-        active_runs_of_dags = defaultdict(
-            lambda: 0,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
-                DagRun.state == State.RUNNING,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+        active_dags = (
+            session.query(DM)
+            .filter(DM.is_active == expression.true(), DM.is_paused == expression.false())

Review comment:
       Yeah that is mainly for MSSQL




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700941549



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -970,48 +971,61 @@ def _start_queued_dagruns(
         session: Session,
     ) -> int:
         """Find DagRuns in queued state and decide moving them to running state"""
-        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
-
-        active_runs_of_dags = defaultdict(
-            lambda: 0,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
-                DagRun.state == State.RUNNING,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+        active_dags = (
+            session.query(DM)
+            .filter(DM.is_active == expression.true(), DM.is_paused == expression.false())
+            .all()
         )
+        max_number = conf.getint('scheduler', 'max_dagruns_per_loop_to_schedule', fallback=20)
+        # There's a possibility of exceeding this max_number or going below it as a result of the logic below
+        if len(active_dags) > max_number:
+            max_number = 1

Review comment:
       This isn't the right behaviour -- there could be 1000 "active" dags, but if they don't have any active dag runs then they shouldn't affect scheduling.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700919999



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -970,48 +971,61 @@ def _start_queued_dagruns(
         session: Session,
     ) -> int:
         """Find DagRuns in queued state and decide moving them to running state"""
-        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
-
-        active_runs_of_dags = defaultdict(
-            lambda: 0,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
-                DagRun.state == State.RUNNING,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+        active_dags = (
+            session.query(DM)
+            .filter(DM.is_active == expression.true(), DM.is_paused == expression.false())

Review comment:
       Do you just need count of those DAGs @ephraimbuddy not actual Dag Models? If so use func.count:
   
   ```
   session.query(func.count(DM.dag_id).filter(DM.is_active == expression.true(), DM.is_paused == expression.false())).scalar()
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700880381



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -970,48 +971,61 @@ def _start_queued_dagruns(
         session: Session,
     ) -> int:
         """Find DagRuns in queued state and decide moving them to running state"""
-        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
-
-        active_runs_of_dags = defaultdict(
-            lambda: 0,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
-                DagRun.state == State.RUNNING,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+        active_dags = (
+            session.query(DM)
+            .filter(DM.is_active == expression.true(), DM.is_paused == expression.false())

Review comment:
       Yeah, I did that but unfortunately, it's not passing the CI




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700938336



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -970,48 +971,61 @@ def _start_queued_dagruns(
         session: Session,
     ) -> int:
         """Find DagRuns in queued state and decide moving them to running state"""
-        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
-
-        active_runs_of_dags = defaultdict(
-            lambda: 0,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
-                DagRun.state == State.RUNNING,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+        active_dags = (
+            session.query(DM)
+            .filter(DM.is_active == expression.true(), DM.is_paused == expression.false())
+            .all()
         )
+        max_number = conf.getint('scheduler', 'max_dagruns_per_loop_to_schedule', fallback=20)
+        # There's a possibility of exceeding this max_number or going below it as a result of the logic below
+        if len(active_dags) > max_number:
+            max_number = 1
+        elif len(active_dags) > 0:

Review comment:
       Another thing I have thought about is to allow it to use the `max_dagruns_per_loop_to_schedule` on each dag since it's only to  update the `dagrun` state at each loop and nothing more.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17945: Fix max_active_runs not allowing moving of queued dagruns to running

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700980444



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2881,6 +2881,44 @@ def test_do_schedule_max_active_runs_and_manual_trigger(self, dag_maker):
         # Assert that the other one is queued
         assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 1
 
+    def test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags(self, dag_maker):
+        session = settings.Session()
+        with dag_maker('test_dag1', max_active_runs=1) as dag:
+            DummyOperator(task_id='mytask')
+        # dag_maker.create_dagrun()

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700869979



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -970,48 +971,61 @@ def _start_queued_dagruns(
         session: Session,
     ) -> int:
         """Find DagRuns in queued state and decide moving them to running state"""
-        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
-
-        active_runs_of_dags = defaultdict(
-            lambda: 0,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
-                DagRun.state == State.RUNNING,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+        active_dags = (
+            session.query(DM)
+            .filter(DM.is_active == expression.true(), DM.is_paused == expression.false())
+            .all()
         )
+        max_number = conf.getint('scheduler', 'max_dagruns_per_loop_to_schedule', fallback=20)
+        # There's a possibility of exceeding this max_number or going below it as a result of the logic below
+        if len(active_dags) > max_number:
+            max_number = 1
+        elif len(active_dags) > 0:
+            max_number = round(max_number / len(active_dags))
 
-        def _update_state(dag_run):
-            dag_run.state = State.RUNNING
-            dag_run.start_date = timezone.utcnow()
-            expected_start_date = dag.following_schedule(dag_run.execution_date)
-            if expected_start_date:
-                schedule_delay = dag_run.start_date - expected_start_date
-                Stats.timing(
-                    f'dagrun.schedule_delay.{dag.dag_id}',
-                    schedule_delay,
-                )
+        for dag_model in active_dags:
 
-        for dag_run in dag_runs:
-            try:
-                dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
-            except SerializedDagNotFound:
-                self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
-                continue
-            active_runs = active_runs_of_dags[dag_run.dag_id]
-            if dag.max_active_runs and active_runs >= dag.max_active_runs:
-                self.log.debug(
-                    "DAG %s already has %d active runs, not moving any more runs to RUNNING state %s",
-                    dag.dag_id,
-                    active_runs,
-                    dag_run.execution_date,
+            queued_dagruns = DR.get_next_queued_dagruns_for_dag(dag_model.dag_id, session, max_number)
+
+            active_runs_of_dags = defaultdict(
+                lambda: 0,
+                session.query(DagRun.dag_id, func.count('*'))
+                .filter(
+                    DagRun.dag_id == dag_model.dag_id,
+                    DagRun.state == State.RUNNING,
                 )
-            else:
-                active_runs_of_dags[dag_run.dag_id] += 1
-                _update_state(dag_run)
+                .group_by(DagRun.dag_id)
+                .all(),
+            )
+
+            def _update_state(dag_run):
+                dag_run.state = State.RUNNING
+                dag_run.start_date = timezone.utcnow()
+                expected_start_date = dag.following_schedule(dag_run.execution_date)

Review comment:
       `following_schedule` is deprecated so we should avoid using it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #17945: Fix max_active_runs not allowing moving of queued dagruns to running

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#issuecomment-925327425


   > FWIW can confirm this fixed my issue in #17638, so big thanks! Note I did have an issue after upgrading where the `max_active_runs` columns for some DAGs remained `NULL` even after scheduler parsed them. This seemed to prevent their tasks from getting scheduled. I eventually set it manually in the database and that fixed everything, but not sure if I did something wrong to get in that state in the first place 🤷
   
   Interesting.
   Did you run `airflow db upgrade` after upgrade?
   cc: @kaxil 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#issuecomment-911357411


   What happens is that the method `DagRun.next_dagruns_to_examine` gets the earliest dagruns without considering the dag that has the dagrun.
   For example:
   If you have a dag with execution_date 2020,1,1 and set `catchup=True, max_active_runs=1, schedule_interval='@daily'` and another dag with execution_date 2021,1,1 and also set `catchup=True, schedule_interval='@daily'`.
   When you unpause the two dags(the one with max_active_runs first), the dagruns would be created but only one dagrun would be active because of how DagRun.next_dagruns_to_examine works.
   (worried about performance on this PR. Please advice)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700930108



##########
File path: airflow/models/dagrun.py
##########
@@ -224,6 +224,26 @@ def next_dagruns_to_examine(
             query.limit(max_number), of=cls, session=session, **skip_locked(session=session)
         )
 
+    @classmethod
+    def get_next_queued_dagruns_for_dag(cls, dag_id: str, session: Session, max_number: Optional[int] = None):
+        """Get queued dagruns for a particular dag to examine"""
+        if max_number is None:
+            max_number = cls.DEFAULT_DAGRUNS_TO_EXAMINE
+        query = (
+            session.query(cls)
+            .filter(cls.dag_id == dag_id, cls.state == State.QUEUED)
+            .order_by(
+                nulls_first(cls.last_scheduling_decision, session=session),
+                cls.execution_date,
+            )
+        )
+        if not settings.ALLOW_FUTURE_EXEC_DATES:
+            query = query.filter(DagRun.execution_date <= func.now())
+
+        return with_row_locks(
+            query.limit(max_number), of=cls, session=session, **skip_locked(session=session)
+        )

Review comment:
       How about making this a seprate function in `scheduler_job.py`? That would make this testable; my main concern is only about putting this on `DagRun`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700978441



##########
File path: airflow/models/dagrun.py
##########
@@ -224,6 +224,26 @@ def next_dagruns_to_examine(
             query.limit(max_number), of=cls, session=session, **skip_locked(session=session)
         )
 
+    @classmethod
+    def get_next_queued_dagruns_for_dag(cls, dag_id: str, session: Session, max_number: Optional[int] = None):
+        """Get queued dagruns for a particular dag to examine"""
+        if max_number is None:
+            max_number = cls.DEFAULT_DAGRUNS_TO_EXAMINE
+        query = (
+            session.query(cls)
+            .filter(cls.dag_id == dag_id, cls.state == State.QUEUED)
+            .order_by(
+                nulls_first(cls.last_scheduling_decision, session=session),
+                cls.execution_date,
+            )
+        )
+        if not settings.ALLOW_FUTURE_EXEC_DATES:
+            query = query.filter(DagRun.execution_date <= func.now())
+
+        return with_row_locks(
+            query.limit(max_number), of=cls, session=session, **skip_locked(session=session)
+        )

Review comment:
       Thanks to @ash and @kaxil, it's a simpler solution now :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700923927



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -970,48 +971,61 @@ def _start_queued_dagruns(
         session: Session,
     ) -> int:
         """Find DagRuns in queued state and decide moving them to running state"""
-        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
-
-        active_runs_of_dags = defaultdict(
-            lambda: 0,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
-                DagRun.state == State.RUNNING,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+        active_dags = (
+            session.query(DM)
+            .filter(DM.is_active == expression.true(), DM.is_paused == expression.false())
+            .all()
         )
+        max_number = conf.getint('scheduler', 'max_dagruns_per_loop_to_schedule', fallback=20)
+        # There's a possibility of exceeding this max_number or going below it as a result of the logic below
+        if len(active_dags) > max_number:
+            max_number = 1
+        elif len(active_dags) > 0:

Review comment:
       For example, the total dags are 30, which is greater than `max_dagruns_per_loop_to_schedule`(20), so we check 1 dagrun from each dag which will now exceed the `max_dagruns_per_loop_to_schedule` at the end




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #17945: Fix max_active_runs not allowing moving of queued dagruns to running

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #17945:
URL: https://github.com/apache/airflow/pull/17945


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17945: Fix max_active_runs not allowing scheduling of other dagruns

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17945:
URL: https://github.com/apache/airflow/pull/17945#discussion_r700919999



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -970,48 +971,61 @@ def _start_queued_dagruns(
         session: Session,
     ) -> int:
         """Find DagRuns in queued state and decide moving them to running state"""
-        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
-
-        active_runs_of_dags = defaultdict(
-            lambda: 0,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
-                DagRun.state == State.RUNNING,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+        active_dags = (
+            session.query(DM)
+            .filter(DM.is_active == expression.true(), DM.is_paused == expression.false())

Review comment:
       Do you just need count of those DAGs @ephraimbuddy not actual Dag Models? If so use func.count:
   
   ```
   session.query(func.count(DM.dag_id).filter(DM.is_active == expression.true(), DM.is_paused == expression.false())).scalar()
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org