You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/09 00:14:44 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #16343: Fix DAG run state not updated while DAG is paused

ephraimbuddy opened a new pull request #16343:
URL: https://github.com/apache/airflow/pull/16343


   Closes: https://github.com/apache/airflow/issues/15439
       
   The state of a DAG run does not update while the DAG is paused.
   The tasks continue to run if the DAG run was kicked off before
   the DAG was paused and eventually finish and are marked correctly.
   The DAG run state does not get updated and stays in Running state until the DAG is unpaused.
       
    This change fixes it by running a check at intervals, updating states(if possible)
    of DagRuns that the tasks have finished running while the DAG is paused
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#issuecomment-858810600


   > > Nice but I think it may not work if the user disables mini scheduling?
   > 
   > Yes, but we'll likely remove that setting in a version or two -- it was mostly an escape hatch in case it had un-forseen bugs.
   
   Should I add it as a separate check outside the mini scheduling?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#discussion_r651656037



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -686,6 +688,46 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
+        """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
+        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+        op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
+
+        session = settings.Session()
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=dag.start_date,
+            next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
+            is_active=True,
+            is_paused=True,
+        )
+        session.add(orm_dag)
+        session.flush()
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            session=session,
+        )
+        ti = TaskInstance(op1, dr.execution_date)
+        ti.refresh_from_db()
+        ti.state = State.SUCCESS
+        session.commit()
+        assert dr.state == State.RUNNING
+
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        job1.run()
+        dr = session.query(DagRun).filter(DagRun.id == dr.id).first()

Review comment:
       ```suggestion
           session.refresh(dr)
   ```
   
   I think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#issuecomment-858806028


   > I wonder what is more efficient: doing this periodically (for paused dags, where the state is likely to never change) or expanding on the "mini scheduler run" to do a simpler version of `dag_run.update_state()` when the task that just finished was one of the leaf tasks in the dag.
   
   Nice but I think it may not work if the user disables mini scheduling? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#discussion_r652475937



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -686,6 +687,47 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
+        """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
+        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+        op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
+
+        session = settings.Session()
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=dag.start_date,
+            next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
+            is_active=True,
+            is_paused=True,
+        )
+        session.add(orm_dag)
+        session.flush()
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            session=session,
+        )
+        ti = TaskInstance(op1, dr.execution_date)
+        ti.refresh_from_db()
+        ti.state = State.SUCCESS

Review comment:
       Is this needed?
   
   
   ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#discussion_r652483030



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -686,6 +687,44 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
+        """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
+        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+        op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
+
+        session = settings.Session()
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=dag.start_date,
+            next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
+            is_active=True,
+            is_paused=True,
+        )
+        session.add(orm_dag)
+        session.flush()
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            session=session,
+        )
+        session.commit()

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#discussion_r651566456



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -264,3 +265,15 @@ def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
                 exc_info=True,
             )
             session.rollback()
+
+    @provide_session
+    def _update_dagrun_state_for_paused_dag(self, session=None):
+        """
+        Checks for paused dags with DagRuns in the running state and
+        update the DagRun state if possible
+        """
+        dag_run = self.task_instance.get_dagrun()
+        if dag_run:
+            dag = dag_run.dag = self.task_instance.task.dag
+            if dag.get_is_paused():
+                dag_run.update_state(session=session, execute_callbacks=False)

Review comment:
       ```suggestion
                   dag_run.update_state(session=session, execute_callbacks=True)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy merged pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #16343:
URL: https://github.com/apache/airflow/pull/16343


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16343:
URL: https://github.com/apache/airflow/pull/16343


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#issuecomment-858808709


   > Nice but I think it may not work if the user disables mini scheduling?
   
   Yes, but we'll likely remove that setting in a version or two -- it was mostly an escape hatch in case it had un-forseen bugs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#issuecomment-858660123


   I wonder what is more efficient: doing this periodically (for paused dags, where the state is likely to never change) or expanding on the "mini scheduler run" to do a simpler version of `dag_run.update_state()` when the task that just finished was one of the leaf tasks in the dag.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#discussion_r649449578



##########
File path: airflow/config_templates/config.yml
##########
@@ -1704,6 +1704,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: update_dagrun_state_for_paused_dag_interval
+      description: |
+        How often (in seconds) to check paused DAGs for DagRuns in running state
+        and update states of DagRuns whose tasks finished the DagRun
+      version_added: 2.1.1
+      type: float
+      example: ~
+      default: "30.0"

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#issuecomment-862129045


   @ashb I have applied the last review, please review once more


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#discussion_r649449800



##########
File path: airflow/config_templates/default_airflow.cfg
##########
@@ -849,6 +849,10 @@ job_heartbeat_sec = 5
 # that no longer have a matching DagRun
 clean_tis_without_dagrun_interval = 15.0
 
+# How often (in seconds) to check paused DAGs for DagRuns in running state
+# and update states of DagRuns whose tasks finished the DagRun
+update_dagrun_state_for_paused_dag_interval = 30.0
+

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#issuecomment-858100921


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#discussion_r652495542



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -686,6 +687,47 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
+        """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
+        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+        op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
+
+        session = settings.Session()
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=dag.start_date,
+            next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
+            is_active=True,
+            is_paused=True,
+        )
+        session.add(orm_dag)
+        session.flush()
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            session=session,
+        )
+        ti = TaskInstance(op1, dr.execution_date)
+        ti.refresh_from_db()
+        ti.state = State.SUCCESS

Review comment:
       Only ti = TaskInstance(op1, dr.execution_date) is needed. It's used by the job




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16343:
URL: https://github.com/apache/airflow/pull/16343


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16343: Fix DAG run state not updated while DAG is paused

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16343:
URL: https://github.com/apache/airflow/pull/16343#discussion_r651557538



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1344,7 +1344,6 @@ def _run_scheduler_loop(self) -> None:
             conf.getfloat('scheduler', 'clean_tis_without_dagrun_interval', fallback=15.0),
             self._clean_tis_without_dagrun,
         )
-
         for loop_count in itertools.count(start=1):

Review comment:
       ```suggestion
   
           for loop_count in itertools.count(start=1):
   ```
   
   (Just so we avoid touching this file)

##########
File path: airflow/jobs/local_task_job.py
##########
@@ -264,3 +265,15 @@ def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
                 exc_info=True,
             )
             session.rollback()
+
+    @provide_session
+    def _update_dagrun_state_for_paused_dag(self, session=None):
+        """
+        Checks for paused dags with DagRuns in the running state and
+        update the DagRun state if possible
+        """
+        dag_run = self.task_instance.get_dagrun()
+        if dag_run:
+            dag = dag_run.dag = self.task_instance.task.dag
+            if dag.get_is_paused():
+                dag_run.update_state(session=session, execute_callbacks=False)

Review comment:
       If we say `execute_callbacks=False` here, then this means that the return value will contain a list of callbacks -- so I think we want True here.

##########
File path: airflow/jobs/local_task_job.py
##########
@@ -264,3 +265,15 @@ def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
                 exc_info=True,
             )
             session.rollback()
+
+    @provide_session
+    def _update_dagrun_state_for_paused_dag(self, session=None):
+        """
+        Checks for paused dags with DagRuns in the running state and
+        update the DagRun state if possible
+        """
+        dag_run = self.task_instance.get_dagrun()

Review comment:
       ```suggestion
           dag_run = self.task_instance.get_dagrun(session=session)
   ```

##########
File path: airflow/jobs/local_task_job.py
##########
@@ -264,3 +265,15 @@ def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
                 exc_info=True,
             )
             session.rollback()
+
+    @provide_session
+    def _update_dagrun_state_for_paused_dag(self, session=None):
+        """
+        Checks for paused dags with DagRuns in the running state and
+        update the DagRun state if possible
+        """
+        dag_run = self.task_instance.get_dagrun()
+        if dag_run:
+            dag = dag_run.dag = self.task_instance.task.dag
+            if dag.get_is_paused():

Review comment:
       ```suggestion
               if dag.get_is_paused(session=session):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org