You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/13 18:20:58 UTC

[airflow] 38/38: Fix DAG run state not updated while DAG is paused (#16343)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7e79747f53d5501a3786639336abda6ca9d104d3
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Fri Jun 18 00:29:00 2021 +0100

    Fix DAG run state not updated while DAG is paused (#16343)
    
    The state of a DAG run does not update while the DAG is paused.
    The tasks continue to run if the DAG run was kicked off before
    the DAG was paused and eventually finish and are marked correctly.
    The DAG run state does not get updated and stays in Running state until the DAG is unpaused.
    
    This change fixes it by running a check on task exit to update state(if possible)
     of the DagRun if the task was able to finish the DagRun while the DAG is paused
    
    Co-authored-by: Ash Berlin-Taylor <as...@firemirror.com>
    (cherry picked from commit 3834df6ade22b33addd47e3ab2165a0b282926fa)
---
 airflow/jobs/local_task_job.py    | 14 +++++++++++++
 tests/jobs/test_local_task_job.py | 42 +++++++++++++++++++++++++++++++++++++--
 2 files changed, 54 insertions(+), 2 deletions(-)

diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index 33c2f59..cce4e64 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -164,6 +164,7 @@ class LocalTaskJob(BaseJob):
         if not self.task_instance.test_mode:
             if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True):
                 self._run_mini_scheduler_on_child_tasks()
+            self._update_dagrun_state_for_paused_dag()
 
     def on_kill(self):
         self.task_runner.terminate()
@@ -268,3 +269,16 @@ class LocalTaskJob(BaseJob):
                 exc_info=True,
             )
             session.rollback()
+
+    @provide_session
+    def _update_dagrun_state_for_paused_dag(self, session=None):
+        """
+        Checks for paused dags with DagRuns in the running state and
+        update the DagRun state if possible
+        """
+        dag = self.task_instance.task.dag
+        if dag.get_is_paused():
+            dag_run = self.task_instance.get_dagrun(session=session)
+            if dag_run:
+                dag_run.dag = dag
+                dag_run.update_state(session=session, execute_callbacks=True)
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 48047ad..11e9adf 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -45,6 +45,7 @@ from airflow.utils.net import get_hostname
 from airflow.utils.session import create_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
+from airflow.utils.types import DagRunType
 from tests.test_utils import db
 from tests.test_utils.asserts import assert_queries_count
 from tests.test_utils.config import conf_vars
@@ -781,6 +782,43 @@ class TestLocalTaskJob(unittest.TestCase):
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
+        """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
+        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+        op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
+
+        session = settings.Session()
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=dag.start_date,
+            next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
+            is_active=True,
+            is_paused=True,
+        )
+        session.add(orm_dag)
+        session.flush()
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            session=session,
+        )
+        assert dr.state == State.RUNNING
+        ti = TaskInstance(op1, dr.execution_date)
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        job1.run()
+        session.add(dr)
+        session.refresh(dr)
+        assert dr.state == State.SUCCESS
+
 
 @pytest.fixture()
 def clean_db_helper():
@@ -799,12 +837,12 @@ class TestLocalTaskJobPerformance:
         task = DummyOperator(task_id='test_state_succeeded1', dag=dag)
 
         dag.clear()
-        dag.create_dagrun(run_id=unique_prefix, state=State.NONE)
+        dag.create_dagrun(run_id=unique_prefix, execution_date=DEFAULT_DATE, state=State.NONE)
 
         ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
 
         mock_get_task_runner.return_value.return_code.side_effects = return_codes
 
         job = LocalTaskJob(task_instance=ti, executor=MockExecutor())
-        with assert_queries_count(15):
+        with assert_queries_count(16):
             job.run()