You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/10 20:03:58 UTC

[GitHub] [airflow] jedcunningham commented on a change in pull request #19528: Fix missing dagruns when catchup=True

jedcunningham commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r746944369



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2874,6 +2875,66 @@ def test_more_runs_are_not_created_when_max_active_runs_is_reached(self, dag_mak
             == 0
         )
 
+    def test_max_active_runs_creation_phasing(self, dag_maker, session):
+        """
+        Test that when creating runs once max_active_runs is reached that the dags come in the right order

Review comment:
       ```suggestion
           Test that when creating runs once max_active_runs is reached that the runs come in the right order
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -913,7 +913,8 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
                     creating_job_id=self.id,
                 )
                 active_runs_of_dags[dag.dag_id] += 1
-            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
+            if self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):
+                dag_model.calculate_dagrun_date_fields(dag, data_interval)

Review comment:
       With this change I think the name `_update_dag_next_dagruns` is misleading. Either rename to `_should_update_dag_next_dagruns`, or keep calling `calculate_dagrun_date_fields` and get the interval from `dag.get_run_data_interval(dag_run)`?

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2874,6 +2875,66 @@ def test_more_runs_are_not_created_when_max_active_runs_is_reached(self, dag_mak
             == 0
         )
 
+    def test_max_active_runs_creation_phasing(self, dag_maker, session):
+        """
+        Test that when creating runs once max_active_runs is reached that the dags come in the right order
+        without gaps
+        """
+
+        def complete_one_dagrun():
+            ti = (
+                session.query(TaskInstance)
+                .join(TaskInstance.dag_run)
+                .filter(TaskInstance.state != State.SUCCESS)
+                .order_by(DagRun.execution_date)
+                .first()
+            )
+            if ti:
+                ti.state = State.SUCCESS
+                session.flush()
+
+        with dag_maker(max_active_runs=3, session=session) as dag:
+            # Need to use something that doesn't immediately get marked as success by the scheduler
+            BashOperator(task_id='task', bash_command='true')
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=True)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+        DagModel.dags_needing_dagruns(session).all()
+        for _ in range(3):
+            self.scheduler_job._do_scheduling(session)
+
+        model: DagModel = session.query(DagModel).get(dag.dag_id)
+
+        # Pre-condition
+        assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}
+
+        assert model.next_dagrun == timezone.convert_to_utc(
+            timezone.DateTime(
+                2016,
+                1,
+                3,
+            )
+        )
+        assert model.next_dagrun_create_after is None
+
+        complete_one_dagrun()
+
+        assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}, "Test only. XXX Remove me"

Review comment:
       ```suggestion
           assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org