You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/13 18:20:56 UTC

[airflow] 36/38: Revert "Fix DAG run state not updated while DAG is paused (#16343)"

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit effbee143fb5f1afdff25b6ec6065808297ad04b
Author: James Timmins <ja...@astronomer.io>
AuthorDate: Fri Aug 13 11:01:58 2021 -0700

    Revert "Fix DAG run state not updated while DAG is paused (#16343)"
    
    This reverts commit 446e66b052a116f8371be331624c1e1b03299380.
---
 airflow/jobs/local_task_job.py    | 15 --------------
 tests/jobs/test_local_task_job.py | 42 ++-------------------------------------
 2 files changed, 2 insertions(+), 55 deletions(-)

diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index 3afc801..3dca97e 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -158,8 +158,6 @@ class LocalTaskJob(BaseJob):
         if self.task_instance.state != State.SUCCESS:
             error = self.task_runner.deserialize_run_error()
         self.task_instance._run_finished_callback(error=error)
-        if not self.task_instance.test_mode:
-            self._update_dagrun_state_for_paused_dag()
 
     def on_kill(self):
         self.task_runner.terminate()
@@ -213,16 +211,3 @@ class LocalTaskJob(BaseJob):
                 error = self.task_runner.deserialize_run_error() or "task marked as failed externally"
             ti._run_finished_callback(error=error)
             self.terminating = True
-
-    @provide_session
-    def _update_dagrun_state_for_paused_dag(self, session=None):
-        """
-        Checks for paused dags with DagRuns in the running state and
-        update the DagRun state if possible
-        """
-        dag = self.task_instance.task.dag
-        if dag.get_is_paused():
-            dag_run = self.task_instance.get_dagrun(session=session)
-            if dag_run:
-                dag_run.dag = dag
-                dag_run.update_state(session=session, execute_callbacks=True)
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index ed43198..17a7285 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -44,7 +44,6 @@ from airflow.utils.net import get_hostname
 from airflow.utils.session import create_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
-from airflow.utils.types import DagRunType
 from tests.test_utils.asserts import assert_queries_count
 from tests.test_utils.db import clear_db_jobs, clear_db_runs
 from tests.test_utils.mock_executor import MockExecutor
@@ -668,43 +667,6 @@ class TestLocalTaskJob(unittest.TestCase):
         assert task_terminated_externally.value == 1
         assert not process.is_alive()
 
-    def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
-        """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
-        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
-        op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
-
-        session = settings.Session()
-        orm_dag = DagModel(
-            dag_id=dag.dag_id,
-            has_task_concurrency_limits=False,
-            next_dagrun=dag.start_date,
-            next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
-            is_active=True,
-            is_paused=True,
-        )
-        session.add(orm_dag)
-        session.flush()
-        # Write Dag to DB
-        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
-        dagbag.bag_dag(dag, root_dag=dag)
-        dagbag.sync_to_db()
-
-        dr = dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            state=State.RUNNING,
-            execution_date=DEFAULT_DATE,
-            start_date=DEFAULT_DATE,
-            session=session,
-        )
-        assert dr.state == State.RUNNING
-        ti = TaskInstance(op1, dr.execution_date)
-        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
-        job1.task_runner = StandardTaskRunner(job1)
-        job1.run()
-        session.add(dr)
-        session.refresh(dr)
-        assert dr.state == State.SUCCESS
-
 
 @pytest.fixture()
 def clean_db_helper():
@@ -723,12 +685,12 @@ class TestLocalTaskJobPerformance:
         task = DummyOperator(task_id='test_state_succeeded1', dag=dag)
 
         dag.clear()
-        dag.create_dagrun(run_id=unique_prefix, execution_date=DEFAULT_DATE, state=State.NONE)
+        dag.create_dagrun(run_id=unique_prefix, state=State.NONE)
 
         ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
 
         mock_get_task_runner.return_value.return_code.side_effects = return_codes
 
         job = LocalTaskJob(task_instance=ti, executor=MockExecutor())
-        with assert_queries_count(16):
+        with assert_queries_count(15):
             job.run()