You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/05/25 15:29:36 UTC

[airflow] branch v1-10-test updated: Fix timing-based flakey test in TestLocalTaskJob (#8405)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new a09d5ec  Fix timing-based flakey test in TestLocalTaskJob (#8405)
a09d5ec is described below

commit a09d5ec68afb550acaf56293a13c3c46e3347132
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Thu Apr 16 16:11:47 2020 +0100

    Fix timing-based flakey test in TestLocalTaskJob (#8405)
    
    This test suffered from timing-based failures, if the "main" process
    took even fractionally too long then the task process would have already
    cleaned up it's subprocess, so the expected callback in the main/test
    process would never be run.
    
    This changes is so that the callback _will always be called_ in the test
    process if it is called at all.
    
    (cherry picked from commit d06d3165ff7df8ceeb52f8f18154f5f27d83355b)
---
 tests/jobs/test_local_task_job.py | 57 ++++++++++++++++++++-------------------
 1 file changed, 30 insertions(+), 27 deletions(-)

diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 63c7c86..69a4369 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -29,12 +29,14 @@ from airflow.executors import SequentialExecutor
 from airflow.jobs import LocalTaskJob
 from airflow.models import DAG, TaskInstance as TI
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import PythonOperator
 from airflow.utils import timezone
 from airflow.utils.db import create_session
 from airflow.utils.net import get_hostname
 from airflow.utils.state import State
 from tests.compat import patch
 from tests.test_core import TEST_DAG_FOLDER
+from airflow.utils.timeout import timeout
 from tests.test_utils.db import clear_db_runs
 from tests.test_utils.mock_executor import MockExecutor
 
@@ -251,18 +253,27 @@ class LocalTaskJobTest(unittest.TestCase):
         data = {'called': False}
 
         def check_failure(context):
-            self.assertEqual(context['dag_run'].dag_id,
-                             'test_mark_failure')
+            self.assertEqual(context['dag_run'].dag_id, 'test_mark_failure')
             data['called'] = True
 
-        dag = DAG(dag_id='test_mark_failure',
-                  start_date=DEFAULT_DATE,
-                  default_args={'owner': 'owner1'})
-
-        task = DummyOperator(
-            task_id='test_state_succeeded1',
-            dag=dag,
-            on_failure_callback=check_failure)
+        def task_function(ti, **context):
+            with create_session() as session:
+                self.assertEqual(State.RUNNING, ti.state)
+                ti.log.info("Marking TI as failed 'externally'")
+                ti.state = State.FAILED
+                session.merge(ti)
+                session.commit()
+
+            time.sleep(60)
+            # This should not happen -- the state change should be noticed and the task should get killed
+            data['reached_end_of_sleep'] = True
+
+        with DAG(dag_id='test_mark_failure', start_date=DEFAULT_DATE) as dag:
+            task = PythonOperator(
+                task_id='test_state_succeeded1',
+                python_callable=task_function,
+                provide_context=True,
+                on_failure_callback=check_failure)
 
         session = settings.Session()
 
@@ -274,28 +285,20 @@ class LocalTaskJobTest(unittest.TestCase):
                           session=session)
         ti = TI(task=task, execution_date=DEFAULT_DATE)
         ti.refresh_from_db()
+
         job1 = LocalTaskJob(task_instance=ti,
                             ignore_ti_state=True,
                             executor=SequentialExecutor())
-        from airflow.task.task_runner.standard_task_runner import StandardTaskRunner
-        job1.task_runner = StandardTaskRunner(job1)
-        process = multiprocessing.Process(target=job1.run)
-        process.start()
-        ti.refresh_from_db()
-        for _ in range(0, 50):
-            if ti.state == State.RUNNING:
-                break
-            time.sleep(0.1)
-            ti.refresh_from_db()
-        self.assertEqual(State.RUNNING, ti.state)
-        ti.state = State.FAILED
-        session.merge(ti)
-        session.commit()
+        with timeout(30):
+            # This should be _much_ shorter to run.
+            # If you change this limit, make the timeout in the callbable above bigger
+            job1.run()
 
-        job1.heartbeat_callback(session=None)
+        ti.refresh_from_db()
+        self.assertEqual(ti.state, State.FAILED)
         self.assertTrue(data['called'])
-        process.join(timeout=10)
-        self.assertFalse(process.is_alive())
+        self.assertNotIn('reached_end_of_sleep', data,
+                         'Task should not have been allowed to run to completion')
 
     @pytest.mark.quarantined
     def test_mark_success_on_success_callback(self):