You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/10 19:57:07 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #19528: Fix missing dagruns when catchup=True

ephraimbuddy opened a new pull request #19528:
URL: https://github.com/apache/airflow/pull/19528


   Closes: https://github.com/apache/airflow/issues/19461
   There's a bug that when the max_active_runs is reached, run dates could skip.
   This PR fixes it
   
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] massamany commented on pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
massamany commented on pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#issuecomment-977753237


   Hi,
   
   I'm not 100% sure, but I think that I'm experiencing a bug related to this issue.
   
   I currently use Airflow 2.2.0 to schedule several DAGs. I've one which is scheduled to run every 2 minutes.
   In this DAG, I'm using the latest_only operator with the catchup activated.
   The start date of the DAG is computed before launching Airflow and is valued with the start date of the previous scheduling period.
   
   Description of my reproduction case :
   - At startup, the database is empty.
   - The DAG is paused at first launch by default.
   - After Airflow is up, I wait several minutes and then activate the DAG.
   
   
   In 2.2.0 :
   - Airflow up at 11:11
   - Start date computed to be 11:08 as expected
   - when the DAG is activated at 11:18, the missed execution are launched, but skipped thanks to the latest_only.
   - the last execution runs OK.
   - then the DAG is correctly scheduled every 3 minutes.
   - conclusion : everything is OK
   
   See screeshots of the DAG details :
   ![image](https://user-images.githubusercontent.com/28742025/143223637-6f6ef17d-0000-4130-8b22-e53d766e731f.png)
   
   See screenshots of the Tree view after several minutes (last execution at 11:22 for the scheduling period [11:20-11:22]) :
   ![image](https://user-images.githubusercontent.com/28742025/143223726-94bfd3e1-1c69-4e7e-aa48-8c25b6204791.png)
   
   
   In 2.2.1 AND 2.2.2 :
   - Airflow up at 11:24
   - Start date computed to be 11:22 as expected
   - when the DAG is activated at 11:32, the first missed execution (for 11:22) is launched, but skipped thanks to the latest_only.
   - then 2 minutes later (at 11:34), the second execution (for 11:24) is launched, but skipped thanks to the latest_only.
   - then the DAG is stuck in this state, the "next run" field still indicates 11:24 and no execution will never be scheduled anymore.
   - conclusion : may-be I'm doing something wrong but it seems to me that there might be a bug here related to this current issue.
   
   See screeshots of the DAG details :
   ![image](https://user-images.githubusercontent.com/28742025/143223773-eb6db141-c15b-4938-9215-dd67d71a28ab.png)
   
   See screenshots of the full Tree view screen after several minutes (taken at 11:41) :
   ![image](https://user-images.githubusercontent.com/28742025/143223802-b181e212-f231-45f5-9e61-760aeaa2c482.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r747005296



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -913,26 +913,23 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
                     creating_job_id=self.id,
                 )
                 active_runs_of_dags[dag.dag_id] += 1
-            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
+            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):

Review comment:
       I still wonder if having `data_interval` as an optional kwarg to `_update_dag_next_dagruns` would keep this cleaner. Then this case can pass it, the others that don't can get it from `get_run_data_interval`, and we don't have to add an extra conditional and call to `calculate_dagrun_date_fields` in more than 1 place.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r747123879



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2874,6 +2874,66 @@ def test_more_runs_are_not_created_when_max_active_runs_is_reached(self, dag_mak
             == 0
         )
 
+    def test_max_active_runs_creation_phasing(self, dag_maker, session):
+        """
+        Test that when creating runs once max_active_runs is reached that the runs come in the right order
+        without gaps
+        """
+
+        def complete_one_dagrun():
+            ti = (
+                session.query(TaskInstance)
+                .join(TaskInstance.dag_run)
+                .filter(TaskInstance.state != State.SUCCESS)
+                .order_by(DagRun.execution_date)
+                .first()
+            )
+            if ti:
+                ti.state = State.SUCCESS
+                session.flush()
+
+        with dag_maker(max_active_runs=3, session=session) as dag:
+            # Need to use something that doesn't immediately get marked as success by the scheduler
+            BashOperator(task_id='task', bash_command='true')
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=True)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+        DagModel.dags_needing_dagruns(session).all()
+        for _ in range(3):
+            self.scheduler_job._do_scheduling(session)
+
+        model: DagModel = session.query(DagModel).get(dag.dag_id)
+
+        # Pre-condition
+        assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}
+
+        assert model.next_dagrun == timezone.convert_to_utc(
+            timezone.DateTime(
+                2016,
+                1,
+                3,
+            )
+        )
+        assert model.next_dagrun_create_after is None
+
+        complete_one_dagrun()
+
+        assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}
+
+        for _ in range(5):
+            self.scheduler_job._do_scheduling(session)
+            complete_one_dagrun()
+            print(session.query(DagRun).order_by(DagRun.execution_date).all())

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r746969082



##########
File path: airflow/models/dag.py
##########
@@ -630,6 +630,7 @@ def get_next_data_interval(self, dag_model: "DagModel") -> Optional[DataInterval
         data_interval = dag_model.next_dagrun_data_interval
         if data_interval is not None:
             return data_interval
+        self.log.warning("%s needed to infer interval", self.dag_id)

Review comment:
       This was just my debugging/testing and we don't need this here I don't think.
   
   Also it can stay if we think it's useful




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #19528:
URL: https://github.com/apache/airflow/pull/19528


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r747010338



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -913,26 +913,23 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
                     creating_job_id=self.id,
                 )
                 active_runs_of_dags[dag.dag_id] += 1
-            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
+            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):

Review comment:
       (I don't have a strong preference here though)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r746975591



##########
File path: airflow/models/dag.py
##########
@@ -630,6 +630,7 @@ def get_next_data_interval(self, dag_model: "DagModel") -> Optional[DataInterval
         data_interval = dag_model.next_dagrun_data_interval
         if data_interval is not None:
             return data_interval
+        self.log.warning("%s needed to infer interval", self.dag_id)

Review comment:
       The message does not make sense.
   
   "'example_bash_operator' needed to infer interval"
   
   ```suggestion
   ```

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2874,6 +2875,66 @@ def test_more_runs_are_not_created_when_max_active_runs_is_reached(self, dag_mak
             == 0
         )
 
+    def test_max_active_runs_creation_phasing(self, dag_maker, session):
+        """
+        Test that when creating runs once max_active_runs is reached that the dags come in the right order
+        without gaps
+        """
+
+        def complete_one_dagrun():
+            ti = (
+                session.query(TaskInstance)
+                .join(TaskInstance.dag_run)
+                .filter(TaskInstance.state != State.SUCCESS)
+                .order_by(DagRun.execution_date)
+                .first()
+            )
+            if ti:
+                ti.state = State.SUCCESS
+                session.flush()
+
+        with dag_maker(max_active_runs=3, session=session) as dag:
+            # Need to use something that doesn't immediately get marked as success by the scheduler
+            BashOperator(task_id='task', bash_command='true')
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=True)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+        DagModel.dags_needing_dagruns(session).all()
+        for _ in range(3):
+            self.scheduler_job._do_scheduling(session)
+
+        model: DagModel = session.query(DagModel).get(dag.dag_id)
+
+        # Pre-condition
+        assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}
+
+        assert model.next_dagrun == timezone.convert_to_utc(
+            timezone.DateTime(
+                2016,
+                1,
+                3,
+            )
+        )
+        assert model.next_dagrun_create_after is None
+
+        complete_one_dagrun()
+
+        assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}, "Test only. XXX Remove me"

Review comment:
       @ephraimbuddy  ^^

##########
File path: airflow/models/dag.py
##########
@@ -630,6 +630,7 @@ def get_next_data_interval(self, dag_model: "DagModel") -> Optional[DataInterval
         data_interval = dag_model.next_dagrun_data_interval
         if data_interval is not None:
             return data_interval
+        self.log.warning("%s needed to infer interval", self.dag_id)

Review comment:
       If however, we want to add it, we probably add it in `infer_automated_data_interval`, something like:
   
   "Inferring Data Interval for self.dag_id"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r746945206



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1274,7 +1274,8 @@ def test_dagrun_timeout_fails_run_and_update_next_dagrun(self, dag_maker):
         session.flush()
         session.refresh(dr)
         assert dr.state == State.FAILED
-        # check that next_dagrun has been updated by Schedulerjob._update_dag_next_dagruns
+        self.scheduler_job._do_scheduling(session)

Review comment:
       I'm not sure if this test is correct or not, the assert below was failing, that was why I made the change here. Could be another bug, please review




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] massamany commented on pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
massamany commented on pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#issuecomment-977861481


   Hi. The start date is computed before startup and will never change after.
   In fact the DAG python file is generated and start date is computed by generation.
   So we can consider it's static from the point of view of Airflow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r746944369



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2874,6 +2875,66 @@ def test_more_runs_are_not_created_when_max_active_runs_is_reached(self, dag_mak
             == 0
         )
 
+    def test_max_active_runs_creation_phasing(self, dag_maker, session):
+        """
+        Test that when creating runs once max_active_runs is reached that the dags come in the right order

Review comment:
       ```suggestion
           Test that when creating runs once max_active_runs is reached that the runs come in the right order
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -913,7 +913,8 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
                     creating_job_id=self.id,
                 )
                 active_runs_of_dags[dag.dag_id] += 1
-            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
+            if self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):
+                dag_model.calculate_dagrun_date_fields(dag, data_interval)

Review comment:
       With this change I think the name `_update_dag_next_dagruns` is misleading. Either rename to `_should_update_dag_next_dagruns`, or keep calling `calculate_dagrun_date_fields` and get the interval from `dag.get_run_data_interval(dag_run)`?

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2874,6 +2875,66 @@ def test_more_runs_are_not_created_when_max_active_runs_is_reached(self, dag_mak
             == 0
         )
 
+    def test_max_active_runs_creation_phasing(self, dag_maker, session):
+        """
+        Test that when creating runs once max_active_runs is reached that the dags come in the right order
+        without gaps
+        """
+
+        def complete_one_dagrun():
+            ti = (
+                session.query(TaskInstance)
+                .join(TaskInstance.dag_run)
+                .filter(TaskInstance.state != State.SUCCESS)
+                .order_by(DagRun.execution_date)
+                .first()
+            )
+            if ti:
+                ti.state = State.SUCCESS
+                session.flush()
+
+        with dag_maker(max_active_runs=3, session=session) as dag:
+            # Need to use something that doesn't immediately get marked as success by the scheduler
+            BashOperator(task_id='task', bash_command='true')
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=True)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+        DagModel.dags_needing_dagruns(session).all()
+        for _ in range(3):
+            self.scheduler_job._do_scheduling(session)
+
+        model: DagModel = session.query(DagModel).get(dag.dag_id)
+
+        # Pre-condition
+        assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}
+
+        assert model.next_dagrun == timezone.convert_to_utc(
+            timezone.DateTime(
+                2016,
+                1,
+                3,
+            )
+        )
+        assert model.next_dagrun_create_after is None
+
+        complete_one_dagrun()
+
+        assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}, "Test only. XXX Remove me"

Review comment:
       ```suggestion
           assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r747250661



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -913,26 +913,23 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
                     creating_job_id=self.id,
                 )
                 active_runs_of_dags[dag.dag_id] += 1
-            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
+            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):
+                dag_model.calculate_dagrun_date_fields(dag, data_interval)
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> None:
-        """
-        Update the next_dagrun, next_dagrun_data_interval_start/end
-        and next_dagrun_create_after for this dag.
-        """
-        if total_active_runs >= dag_model.max_active_runs:
+    def _should_update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> bool:
+        """Check if the dag's next_dagruns_create_after should be updated."""
+        if total_active_runs >= dag.max_active_runs:
             self.log.info(
                 "DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
                 dag_model.dag_id,
                 total_active_runs,
-                dag_model.max_active_runs,
+                dag.max_active_runs,

Review comment:
       It's been worked on at https://github.com/apache/airflow/pull/19367. I just decided to update it here as the other PR deals with some other things and is not included in this release




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] massamany commented on pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
massamany commented on pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#issuecomment-977917817


   If you need the airflow config file I can provide it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r746971960



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -913,26 +913,26 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
                     creating_job_id=self.id,
                 )
                 active_runs_of_dags[dag.dag_id] += 1
-            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
+            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):
+                dag_model.calculate_dagrun_date_fields(dag, data_interval)
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> None:
+    def _should_update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> None:

Review comment:
       ```suggestion
       def _should_update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> bool:
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r747133986



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -913,26 +913,23 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
                     creating_job_id=self.id,
                 )
                 active_runs_of_dags[dag.dag_id] += 1
-            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
+            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):

Review comment:
       No strong preference either but I might merge it tomorrow to cut rc2 and we can refactor if need be after it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r746974506



##########
File path: airflow/models/dag.py
##########
@@ -630,6 +630,7 @@ def get_next_data_interval(self, dag_model: "DagModel") -> Optional[DataInterval
         data_interval = dag_model.next_dagrun_data_interval
         if data_interval is not None:
             return data_interval
+        self.log.warning("%s needed to infer interval", self.dag_id)

Review comment:
       Happy to remove it then!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r747240771



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -913,26 +913,23 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
                     creating_job_id=self.id,
                 )
                 active_runs_of_dags[dag.dag_id] += 1
-            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
+            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):
+                dag_model.calculate_dagrun_date_fields(dag, data_interval)
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> None:
-        """
-        Update the next_dagrun, next_dagrun_data_interval_start/end
-        and next_dagrun_create_after for this dag.
-        """
-        if total_active_runs >= dag_model.max_active_runs:
+    def _should_update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> bool:
+        """Check if the dag's next_dagruns_create_after should be updated."""
+        if total_active_runs >= dag.max_active_runs:
             self.log.info(
                 "DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
                 dag_model.dag_id,
                 total_active_runs,
-                dag_model.max_active_runs,
+                dag.max_active_runs,

Review comment:
       I seem to recall there’s another issue on why `max_active_runs` (or something else) does not agree on `DAG` and `DagModel`. Is this change related to that? Did we ever figure out why those disagree?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#issuecomment-965897753


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r746957754



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -913,7 +913,8 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
                     creating_job_id=self.id,
                 )
                 active_runs_of_dags[dag.dag_id] += 1
-            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
+            if self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):
+                dag_model.calculate_dagrun_date_fields(dag, data_interval)

Review comment:
       I will go with renaming cause calling `calculate_dagrun_date_fields` only won't handle the `max_active_runs` for the dag




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r747009307



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -913,26 +913,23 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
                     creating_job_id=self.id,
                 )
                 active_runs_of_dags[dag.dag_id] += 1
-            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
+            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):

Review comment:
       e.g:
   ```
   --- a/airflow/jobs/scheduler_job.py
   +++ b/airflow/jobs/scheduler_job.py
   @@ -913,11 +913,11 @@ class SchedulerJob(BaseJob):
                        creating_job_id=self.id,
                    )
                    active_runs_of_dags[dag.dag_id] += 1
   -            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
   +            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id], data_interval)
            # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
            # memory for larger dags? or expunge_all()
    
   -    def _update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> None:
   +    def _update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs, data_interval = None) -> None:
            """
            Update the next_dagrun, next_dagrun_data_interval_start/end
            and next_dagrun_create_after for this dag.
   @@ -931,7 +931,8 @@ class SchedulerJob(BaseJob):
                )
                dag_model.next_dagrun_create_after = None
            else:
   -            data_interval = dag.get_next_data_interval(dag_model)
   +            if not data_interval:
   +                data_interval = dag.get_next_data_interval(dag_model)
                dag_model.calculate_dagrun_date_fields(dag, data_interval)
   ```
   
   Probably safer with a sentinel instead of None, but I think it'd be okay.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r747235478



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -913,26 +913,23 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
                     creating_job_id=self.id,
                 )
                 active_runs_of_dags[dag.dag_id] += 1
-            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
+            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):

Review comment:
       This will still have the missing dagruns issue because it's calling `dag.get_next_data_interval` when a run has finished. This will skip it. Using the latest dagrun when the run has just finished and calling `dag.get_run_data_interval` resolves the issue. The `get_next_data_interval` gives us the `next` dagrun's data interval but `get_run_data_interval` when supplied with a recent dagrun gives a dagruns data interval that sets the DagRunInfo correctly especially the `next_dagrun_create_after` for the dag. That way, the nullified `dag_model.next_dagrun_create_after` is set again properly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r746974403



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -913,26 +913,26 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
                     creating_job_id=self.id,
                 )
                 active_runs_of_dags[dag.dag_id] += 1
-            self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
+            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):
+                dag_model.calculate_dagrun_date_fields(dag, data_interval)
         # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> None:
+    def _should_update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> None:

Review comment:
       Needs updating docstrings too in that case 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#issuecomment-977833937


   Dynamic start_date is not good and can give some unexpected behaviours. Can you change to using a static start_date and see if the issue still occurs? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] massamany commented on pull request #19528: Fix missing dagruns when catchup=True

Posted by GitBox <gi...@apache.org>.
massamany commented on pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#issuecomment-977917530


   To make it all simpler, I reproduced the problem with the very simple DAG below.
   The start date is static and should be changed to be for instance 10 minutes before startup.
   Everything goes on well on Airflow 2.2.0, the scheduling is blocked with 2.2.2.
   
   test_dag.py :
   ```
   import dateutil.parser
   from airflow import DAG
   from airflow.settings import TIMEZONE
   from airflow.operators.latest_only import LatestOnlyOperator
   from airflow.operators.python import PythonOperator
   
   
   def create_test_dag():
       start_date = dateutil.parser.isoparse("2021-11-24T15:00:00+01:00").astimezone(tz=TIMEZONE)
   
       my_test_dag = DAG(
           dag_id="test_dag",
           start_date=start_date,
           schedule_interval="0/2 * * * *",
           is_paused_upon_creation=True,
           catchup=True,
           default_args={'owner': 'airflow', 'start_date': start_date},
       )
   
       latest_task = LatestOnlyOperator(task_id="test_latest_only", dag=my_test_dag)
       test_task = PythonOperator(task_id="test_task", dag=my_test_dag, python_callable=lambda: print("Hello guys !"))
   
       latest_task >> test_task
   
       return my_test_dag
   
   
   my_test_dag = create_test_dag()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org