You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/15 09:53:06 UTC

[GitHub] [airflow] ashb commented on a change in pull request #16401: Add 'queued' state to DagRun

ashb commented on a change in pull request #16401:
URL: https://github.com/apache/airflow/pull/16401#discussion_r651561908



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1591,89 +1567,85 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
             # are not updated.
             # We opted to check DagRun existence instead
             # of catching an Integrity error and rolling back the session i.e
-            # we need to run self._update_dag_next_dagruns if the Dag Run already exists or if we
+            # we need to set dag.next_dagrun_info if the Dag Run already exists or if we
             # create a new one. This is so that in the next Scheduling loop we try to create new runs
             # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
-                run = dag.create_dagrun(
+            if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+                dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
-                    start_date=timezone.utcnow(),
-                    state=State.RUNNING,
+                    state=State.QUEUED,
                     external_trigger=False,
                     session=session,
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
-
-                expected_start_date = dag.following_schedule(run.execution_date)
-                if expected_start_date:
-                    schedule_delay = run.start_date - expected_start_date
-                    Stats.timing(
-                        f'dagrun.schedule_delay.{dag.dag_id}',
-                        schedule_delay,
-                    )
-
-        self._update_dag_next_dagruns(dag_models, session)
+            dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
+                dag_model.next_dagrun
+            )
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], session: Session) -> None:
+    def _start_dagrun(
+        self,
+        guard,
+        session: Session,
+    ) -> int:
         """
-        Bulk update the next_dagrun and next_dagrun_create_after for all the dags.
-
-        We batch the select queries to get info about all the dags at once
+        Find DagRuns in queued state and decide moving them to running state
+        :param dag_run: The DagRun to schedule

Review comment:
       ```suggestion
   
           :param dag_run: The DagRun to schedule
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1591,89 +1567,85 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
             # are not updated.
             # We opted to check DagRun existence instead
             # of catching an Integrity error and rolling back the session i.e
-            # we need to run self._update_dag_next_dagruns if the Dag Run already exists or if we
+            # we need to set dag.next_dagrun_info if the Dag Run already exists or if we
             # create a new one. This is so that in the next Scheduling loop we try to create new runs
             # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
-                run = dag.create_dagrun(
+            if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+                dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
-                    start_date=timezone.utcnow(),
-                    state=State.RUNNING,
+                    state=State.QUEUED,
                     external_trigger=False,
                     session=session,
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
-
-                expected_start_date = dag.following_schedule(run.execution_date)
-                if expected_start_date:
-                    schedule_delay = run.start_date - expected_start_date
-                    Stats.timing(
-                        f'dagrun.schedule_delay.{dag.dag_id}',
-                        schedule_delay,
-                    )
-
-        self._update_dag_next_dagruns(dag_models, session)
+            dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
+                dag_model.next_dagrun
+            )
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], session: Session) -> None:
+    def _start_dagrun(

Review comment:
       ```suggestion
       def _start_queued_dagruns(
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1591,89 +1567,85 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
             # are not updated.
             # We opted to check DagRun existence instead
             # of catching an Integrity error and rolling back the session i.e
-            # we need to run self._update_dag_next_dagruns if the Dag Run already exists or if we
+            # we need to set dag.next_dagrun_info if the Dag Run already exists or if we
             # create a new one. This is so that in the next Scheduling loop we try to create new runs
             # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
-                run = dag.create_dagrun(
+            if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+                dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
-                    start_date=timezone.utcnow(),
-                    state=State.RUNNING,
+                    state=State.QUEUED,
                     external_trigger=False,
                     session=session,
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
-
-                expected_start_date = dag.following_schedule(run.execution_date)
-                if expected_start_date:
-                    schedule_delay = run.start_date - expected_start_date
-                    Stats.timing(
-                        f'dagrun.schedule_delay.{dag.dag_id}',
-                        schedule_delay,
-                    )
-
-        self._update_dag_next_dagruns(dag_models, session)
+            dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
+                dag_model.next_dagrun
+            )
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], session: Session) -> None:
+    def _start_dagrun(
+        self,
+        guard,
+        session: Session,
+    ) -> int:
         """
-        Bulk update the next_dagrun and next_dagrun_create_after for all the dags.
-
-        We batch the select queries to get info about all the dags at once
+        Find DagRuns in queued state and decide moving them to running state
+        :param dag_run: The DagRun to schedule
         """
-        # Check max_active_runs, to see if we are _now_ at the limit for any of
-        # these dag? (we've just created a DagRun for them after all)
-        active_runs_of_dags = dict(
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(
-                DagRun.dag_id.in_([o.dag_id for o in dag_models]),
-                DagRun.state == State.RUNNING,  # pylint: disable=comparison-with-callable
-                DagRun.external_trigger == expression.false(),
-            )
-            .group_by(DagRun.dag_id)
-            .all()
-        )
+        dag_runs = self._get_next_queued_dagruns_to_examine(session)
+
+        def _update_state(dag_run):
+            dag_run.state = State.RUNNING
+            dag_run.start_date = timezone.utcnow()
+            expected_start_date = dag.following_schedule(dag_run.execution_date)
+            if expected_start_date:
+                schedule_delay = dag_run.start_date - expected_start_date
+                Stats.timing(
+                    f'dagrun.schedule_delay.{dag.dag_id}',
+                    schedule_delay,
+                )
+            # commit the session - Release the write lock on DagModel table.
+            # Commit one dag_run at a time
 
-        for dag_model in dag_models:
-            # Get the DAG in a try_except to not stop the Scheduler when a Serialized DAG is not found
-            # This takes care of Dynamic DAGs especially
+            guard.commit()
+            # END: create dagruns

Review comment:
       Additionally: let's not commit inside this function, but only outside -- the general rule is that if a function accepts a session object it is the caller's responsibility to commit.
   
   This means we also don't have to pass the `guard` in here.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1454,39 +1453,11 @@ def _do_scheduling(self, session) -> int:
             if settings.USE_JOB_SCHEDULE:
                 self._create_dagruns_for_dags(guard, session)
 
-            dag_runs = self._get_next_dagruns_to_examine(session)
+            self._start_dagrun(guard, session)

Review comment:
       ```suggestion
               self._start_queued_dagrun(session)
               guard.commit()
   ```
   
   (as per my other comments)

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1591,89 +1567,85 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
             # are not updated.
             # We opted to check DagRun existence instead
             # of catching an Integrity error and rolling back the session i.e
-            # we need to run self._update_dag_next_dagruns if the Dag Run already exists or if we
+            # we need to set dag.next_dagrun_info if the Dag Run already exists or if we
             # create a new one. This is so that in the next Scheduling loop we try to create new runs
             # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
-                run = dag.create_dagrun(
+            if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+                dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
-                    start_date=timezone.utcnow(),
-                    state=State.RUNNING,
+                    state=State.QUEUED,
                     external_trigger=False,
                     session=session,
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
-
-                expected_start_date = dag.following_schedule(run.execution_date)
-                if expected_start_date:
-                    schedule_delay = run.start_date - expected_start_date
-                    Stats.timing(
-                        f'dagrun.schedule_delay.{dag.dag_id}',
-                        schedule_delay,
-                    )
-
-        self._update_dag_next_dagruns(dag_models, session)
+            dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
+                dag_model.next_dagrun
+            )
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], session: Session) -> None:
+    def _start_dagrun(
+        self,
+        guard,
+        session: Session,
+    ) -> int:
         """
-        Bulk update the next_dagrun and next_dagrun_create_after for all the dags.
-
-        We batch the select queries to get info about all the dags at once
+        Find DagRuns in queued state and decide moving them to running state
+        :param dag_run: The DagRun to schedule
         """
-        # Check max_active_runs, to see if we are _now_ at the limit for any of
-        # these dag? (we've just created a DagRun for them after all)
-        active_runs_of_dags = dict(
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(
-                DagRun.dag_id.in_([o.dag_id for o in dag_models]),
-                DagRun.state == State.RUNNING,  # pylint: disable=comparison-with-callable
-                DagRun.external_trigger == expression.false(),
-            )
-            .group_by(DagRun.dag_id)
-            .all()
-        )
+        dag_runs = self._get_next_queued_dagruns_to_examine(session)
+
+        def _update_state(dag_run):
+            dag_run.state = State.RUNNING
+            dag_run.start_date = timezone.utcnow()
+            expected_start_date = dag.following_schedule(dag_run.execution_date)
+            if expected_start_date:
+                schedule_delay = dag_run.start_date - expected_start_date
+                Stats.timing(
+                    f'dagrun.schedule_delay.{dag.dag_id}',
+                    schedule_delay,
+                )
+            # commit the session - Release the write lock on DagModel table.
+            # Commit one dag_run at a time
 
-        for dag_model in dag_models:
-            # Get the DAG in a try_except to not stop the Scheduler when a Serialized DAG is not found
-            # This takes care of Dynamic DAGs especially
+            guard.commit()
+            # END: create dagruns

Review comment:
       This is too early -- it releases the lock after the first time it's called, but we want to keep the lock for the _entire_  `for dag_run in dag_runs` loop.

##########
File path: airflow/models/dagrun.py
##########
@@ -228,6 +228,51 @@ def next_dagruns_to_examine(
             query.limit(max_number), of=cls, session=session, **skip_locked(session=session)
         )
 
+    @classmethod
+    def next_queued_dagruns_to_examine(

Review comment:
       This is almost identical to the previous function -- rather than a new function let's keep it called `next_dagruns_to_examine` but add a new required`state` parameter.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1591,89 +1567,85 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
             # are not updated.
             # We opted to check DagRun existence instead
             # of catching an Integrity error and rolling back the session i.e
-            # we need to run self._update_dag_next_dagruns if the Dag Run already exists or if we
+            # we need to set dag.next_dagrun_info if the Dag Run already exists or if we
             # create a new one. This is so that in the next Scheduling loop we try to create new runs
             # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
-                run = dag.create_dagrun(
+            if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+                dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
-                    start_date=timezone.utcnow(),
-                    state=State.RUNNING,
+                    state=State.QUEUED,
                     external_trigger=False,
                     session=session,
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
-
-                expected_start_date = dag.following_schedule(run.execution_date)
-                if expected_start_date:
-                    schedule_delay = run.start_date - expected_start_date
-                    Stats.timing(
-                        f'dagrun.schedule_delay.{dag.dag_id}',
-                        schedule_delay,
-                    )
-
-        self._update_dag_next_dagruns(dag_models, session)
+            dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
+                dag_model.next_dagrun
+            )
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], session: Session) -> None:
+    def _start_dagrun(
+        self,
+        guard,
+        session: Session,
+    ) -> int:
         """
-        Bulk update the next_dagrun and next_dagrun_create_after for all the dags.
-
-        We batch the select queries to get info about all the dags at once
+        Find DagRuns in queued state and decide moving them to running state
+        :param dag_run: The DagRun to schedule
         """
-        # Check max_active_runs, to see if we are _now_ at the limit for any of
-        # these dag? (we've just created a DagRun for them after all)
-        active_runs_of_dags = dict(
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(
-                DagRun.dag_id.in_([o.dag_id for o in dag_models]),
-                DagRun.state == State.RUNNING,  # pylint: disable=comparison-with-callable
-                DagRun.external_trigger == expression.false(),
-            )
-            .group_by(DagRun.dag_id)
-            .all()
-        )
+        dag_runs = self._get_next_queued_dagruns_to_examine(session)
+
+        def _update_state(dag_run):
+            dag_run.state = State.RUNNING
+            dag_run.start_date = timezone.utcnow()
+            expected_start_date = dag.following_schedule(dag_run.execution_date)
+            if expected_start_date:
+                schedule_delay = dag_run.start_date - expected_start_date
+                Stats.timing(
+                    f'dagrun.schedule_delay.{dag.dag_id}',
+                    schedule_delay,
+                )
+            # commit the session - Release the write lock on DagModel table.
+            # Commit one dag_run at a time
 
-        for dag_model in dag_models:
-            # Get the DAG in a try_except to not stop the Scheduler when a Serialized DAG is not found
-            # This takes care of Dynamic DAGs especially
+            guard.commit()
+            # END: create dagruns
+
+        for dag_run in dag_runs:
             try:
-                dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
+                dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
             except SerializedDagNotFound:
-                self.log.exception("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
+                self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
                 continue
-            active_runs_of_dag = active_runs_of_dags.get(dag.dag_id, 0)
-            if dag.max_active_runs and active_runs_of_dag >= dag.max_active_runs:
-                self.log.info(
-                    "DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
+            # pylint: disable=comparison-with-callable
+            total_active_dagruns = (
+                session.query(func.count(DR.dag_id))
+                .filter(DR.state == State.RUNNING, DR.dag_id == dag.dag_id)
+                .scalar()
+            )

Review comment:
       This does one query per dag run loop -- which is "expensive", instead we should have the same query we had before (`active_runs_of_dags`) and then add 1 to the value in the dict when we start a new dag.

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -532,7 +535,7 @@ class TestGetDagRunsEndDateFilters(TestDagRunEndpoint):
             (
                 f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_lte="
                 f"{(timezone.utcnow() + timedelta(days=1)).isoformat()}",
-                ["TEST_DAG_RUN_ID_1"],
+                ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],

Review comment:
       Hmmm, why did these tests need changing? I would have expected they would have passed as they were.

##########
File path: airflow/www/views.py
##########
@@ -3889,7 +3889,7 @@ def duration_f(self):
         lazy_gettext('Clear'),
         lazy_gettext(
             'Are you sure you want to clear the state of the selected task'
-            ' instance(s) and set their dagruns to the running state?'
+            ' instance(s) and set their dagruns to the NONE state?'

Review comment:
       We should set it to the queued state, no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org