You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/13 18:20:56 UTC
[airflow] 36/38: Revert "Fix DAG run state not updated while DAG is
paused (#16343)"
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit effbee143fb5f1afdff25b6ec6065808297ad04b
Author: James Timmins <ja...@astronomer.io>
AuthorDate: Fri Aug 13 11:01:58 2021 -0700
Revert "Fix DAG run state not updated while DAG is paused (#16343)"
This reverts commit 446e66b052a116f8371be331624c1e1b03299380.
---
airflow/jobs/local_task_job.py | 15 --------------
tests/jobs/test_local_task_job.py | 42 ++-------------------------------------
2 files changed, 2 insertions(+), 55 deletions(-)
diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index 3afc801..3dca97e 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -158,8 +158,6 @@ class LocalTaskJob(BaseJob):
if self.task_instance.state != State.SUCCESS:
error = self.task_runner.deserialize_run_error()
self.task_instance._run_finished_callback(error=error)
- if not self.task_instance.test_mode:
- self._update_dagrun_state_for_paused_dag()
def on_kill(self):
self.task_runner.terminate()
@@ -213,16 +211,3 @@ class LocalTaskJob(BaseJob):
error = self.task_runner.deserialize_run_error() or "task marked as failed externally"
ti._run_finished_callback(error=error)
self.terminating = True
-
- @provide_session
- def _update_dagrun_state_for_paused_dag(self, session=None):
- """
- Checks for paused dags with DagRuns in the running state and
- update the DagRun state if possible
- """
- dag = self.task_instance.task.dag
- if dag.get_is_paused():
- dag_run = self.task_instance.get_dagrun(session=session)
- if dag_run:
- dag_run.dag = dag
- dag_run.update_state(session=session, execute_callbacks=True)
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index ed43198..17a7285 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -44,7 +44,6 @@ from airflow.utils.net import get_hostname
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.timeout import timeout
-from airflow.utils.types import DagRunType
from tests.test_utils.asserts import assert_queries_count
from tests.test_utils.db import clear_db_jobs, clear_db_runs
from tests.test_utils.mock_executor import MockExecutor
@@ -668,43 +667,6 @@ class TestLocalTaskJob(unittest.TestCase):
assert task_terminated_externally.value == 1
assert not process.is_alive()
- def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
- """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
- dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
- op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
-
- session = settings.Session()
- orm_dag = DagModel(
- dag_id=dag.dag_id,
- has_task_concurrency_limits=False,
- next_dagrun=dag.start_date,
- next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
- is_active=True,
- is_paused=True,
- )
- session.add(orm_dag)
- session.flush()
- # Write Dag to DB
- dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
- dagbag.bag_dag(dag, root_dag=dag)
- dagbag.sync_to_db()
-
- dr = dag.create_dagrun(
- run_type=DagRunType.SCHEDULED,
- state=State.RUNNING,
- execution_date=DEFAULT_DATE,
- start_date=DEFAULT_DATE,
- session=session,
- )
- assert dr.state == State.RUNNING
- ti = TaskInstance(op1, dr.execution_date)
- job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
- job1.task_runner = StandardTaskRunner(job1)
- job1.run()
- session.add(dr)
- session.refresh(dr)
- assert dr.state == State.SUCCESS
-
@pytest.fixture()
def clean_db_helper():
@@ -723,12 +685,12 @@ class TestLocalTaskJobPerformance:
task = DummyOperator(task_id='test_state_succeeded1', dag=dag)
dag.clear()
- dag.create_dagrun(run_id=unique_prefix, execution_date=DEFAULT_DATE, state=State.NONE)
+ dag.create_dagrun(run_id=unique_prefix, state=State.NONE)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
mock_get_task_runner.return_value.return_code.side_effects = return_codes
job = LocalTaskJob(task_instance=ti, executor=MockExecutor())
- with assert_queries_count(16):
+ with assert_queries_count(15):
job.run()