You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/17 21:47:53 UTC

[airflow] 02/06: Add Pytest fixture to create dag and dagrun and use it on local task job tests (#16889)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 95e251af19e884ae249975111d9bedf33d8ca004
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Jul 20 18:48:35 2021 +0100

    Add Pytest fixture to create dag and dagrun and use it on local task job tests (#16889)
    
    This change adds pytest fixture to create dag and dagrun then use it on local task job tests
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit 7c0d8a2f83cc6db25bdddcf6cecb6fb56f05f02f)
---
 tests/jobs/test_local_task_job.py | 77 ++++++++++++++++-----------------------
 1 file changed, 32 insertions(+), 45 deletions(-)

diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 060bce8..c6a92b2 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -786,55 +786,42 @@ class TestLocalTaskJob:
         assert retry_callback_called.value == 1
         assert task_terminated_externally.value == 1
 
-    def test_process_sigterm_works_with_retries(self, dag_maker):
-        """
-        Test that ensures that task runner sets tasks to retry when they(task runner)
-         receive sigterm
-        """
-        # use shared memory value so we can properly track value change even if
-        # it's been updated across processes.
-        retry_callback_called = Value('i', 0)
-        task_terminated_externally = Value('i', 1)
-        shared_mem_lock = Lock()
-
-        def retry_callback(context):
-            with shared_mem_lock:
-                retry_callback_called.value += 1
-            assert context['dag_run'].dag_id == 'test_mark_failure_2'
+    def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
+        """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
+        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+        op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
 
-        def task_function(ti):
-            time.sleep(60)
-            # This should not happen -- the state change should be noticed and the task should get killed
-            with shared_mem_lock:
-                task_terminated_externally.value = 0
+        session = settings.Session()
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=dag.start_date,
+            next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
+            is_active=True,
+            is_paused=True,
+        )
+        session.add(orm_dag)
+        session.flush()
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
 
-        with dag_maker(dag_id='test_mark_failure_2'):
-            task = PythonOperator(
-                task_id='test_on_failure',
-                python_callable=task_function,
-                retries=1,
-                retry_delay=timedelta(seconds=2),
-                on_retry_callback=retry_callback,
-            )
-        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
-        ti.refresh_from_db()
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            session=session,
+        )
+        assert dr.state == State.RUNNING
+        ti = TaskInstance(op1, dr.execution_date)
         job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
         job1.task_runner = StandardTaskRunner(job1)
-        job1.task_runner.start()
-        settings.engine.dispose()
-        process = multiprocessing.Process(target=job1.run)
-        process.start()
-        for _ in range(0, 25):
-            ti.refresh_from_db()
-            if ti.state == State.RUNNING and ti.pid is not None:
-                break
-            time.sleep(0.2)
-        os.kill(process.pid, signal.SIGTERM)
-        process.join()
-        ti.refresh_from_db()
-        assert ti.state == State.UP_FOR_RETRY
-        assert retry_callback_called.value == 1
-        assert task_terminated_externally.value == 1
+        job1.run()
+        session.add(dr)
+        session.refresh(dr)
+        assert dr.state == State.SUCCESS
 
     def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self, dag_maker):
         """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""