You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/17 21:47:53 UTC
[airflow] 02/06: Add Pytest fixture to create dag and dagrun and
use it on local task job tests (#16889)
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 95e251af19e884ae249975111d9bedf33d8ca004
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Jul 20 18:48:35 2021 +0100
Add Pytest fixture to create dag and dagrun and use it on local task job tests (#16889)
This change adds pytest fixture to create dag and dagrun then use it on local task job tests
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
(cherry picked from commit 7c0d8a2f83cc6db25bdddcf6cecb6fb56f05f02f)
---
tests/jobs/test_local_task_job.py | 77 ++++++++++++++++-----------------------
1 file changed, 32 insertions(+), 45 deletions(-)
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 060bce8..c6a92b2 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -786,55 +786,42 @@ class TestLocalTaskJob:
assert retry_callback_called.value == 1
assert task_terminated_externally.value == 1
- def test_process_sigterm_works_with_retries(self, dag_maker):
- """
- Test that ensures that task runner sets tasks to retry when they(task runner)
- receive sigterm
- """
- # use shared memory value so we can properly track value change even if
- # it's been updated across processes.
- retry_callback_called = Value('i', 0)
- task_terminated_externally = Value('i', 1)
- shared_mem_lock = Lock()
-
- def retry_callback(context):
- with shared_mem_lock:
- retry_callback_called.value += 1
- assert context['dag_run'].dag_id == 'test_mark_failure_2'
+ def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
+ """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
+ dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+ op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
- def task_function(ti):
- time.sleep(60)
- # This should not happen -- the state change should be noticed and the task should get killed
- with shared_mem_lock:
- task_terminated_externally.value = 0
+ session = settings.Session()
+ orm_dag = DagModel(
+ dag_id=dag.dag_id,
+ has_task_concurrency_limits=False,
+ next_dagrun=dag.start_date,
+ next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
+ is_active=True,
+ is_paused=True,
+ )
+ session.add(orm_dag)
+ session.flush()
+ # Write Dag to DB
+ dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+ dagbag.bag_dag(dag, root_dag=dag)
+ dagbag.sync_to_db()
- with dag_maker(dag_id='test_mark_failure_2'):
- task = PythonOperator(
- task_id='test_on_failure',
- python_callable=task_function,
- retries=1,
- retry_delay=timedelta(seconds=2),
- on_retry_callback=retry_callback,
- )
- ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
- ti.refresh_from_db()
+ dr = dag.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ state=State.RUNNING,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ session=session,
+ )
+ assert dr.state == State.RUNNING
+ ti = TaskInstance(op1, dr.execution_date)
job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
job1.task_runner = StandardTaskRunner(job1)
- job1.task_runner.start()
- settings.engine.dispose()
- process = multiprocessing.Process(target=job1.run)
- process.start()
- for _ in range(0, 25):
- ti.refresh_from_db()
- if ti.state == State.RUNNING and ti.pid is not None:
- break
- time.sleep(0.2)
- os.kill(process.pid, signal.SIGTERM)
- process.join()
- ti.refresh_from_db()
- assert ti.state == State.UP_FOR_RETRY
- assert retry_callback_called.value == 1
- assert task_terminated_externally.value == 1
+ job1.run()
+ session.add(dr)
+ session.refresh(dr)
+ assert dr.state == State.SUCCESS
def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self, dag_maker):
"""Test that with DAG paused, DagRun state will update when the tasks finishes the run"""