You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/05/25 15:29:36 UTC
[airflow] branch v1-10-test updated: Fix timing-based flakey test
in TestLocalTaskJob (#8405)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new a09d5ec Fix timing-based flakey test in TestLocalTaskJob (#8405)
a09d5ec is described below
commit a09d5ec68afb550acaf56293a13c3c46e3347132
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Thu Apr 16 16:11:47 2020 +0100
Fix timing-based flakey test in TestLocalTaskJob (#8405)
This test suffered from timing-based failures, if the "main" process
took even fractionally too long then the task process would have already
cleaned up it's subprocess, so the expected callback in the main/test
process would never be run.
This changes is so that the callback _will always be called_ in the test
process if it is called at all.
(cherry picked from commit d06d3165ff7df8ceeb52f8f18154f5f27d83355b)
---
tests/jobs/test_local_task_job.py | 57 ++++++++++++++++++++-------------------
1 file changed, 30 insertions(+), 27 deletions(-)
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 63c7c86..69a4369 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -29,12 +29,14 @@ from airflow.executors import SequentialExecutor
from airflow.jobs import LocalTaskJob
from airflow.models import DAG, TaskInstance as TI
from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import PythonOperator
from airflow.utils import timezone
from airflow.utils.db import create_session
from airflow.utils.net import get_hostname
from airflow.utils.state import State
from tests.compat import patch
from tests.test_core import TEST_DAG_FOLDER
+from airflow.utils.timeout import timeout
from tests.test_utils.db import clear_db_runs
from tests.test_utils.mock_executor import MockExecutor
@@ -251,18 +253,27 @@ class LocalTaskJobTest(unittest.TestCase):
data = {'called': False}
def check_failure(context):
- self.assertEqual(context['dag_run'].dag_id,
- 'test_mark_failure')
+ self.assertEqual(context['dag_run'].dag_id, 'test_mark_failure')
data['called'] = True
- dag = DAG(dag_id='test_mark_failure',
- start_date=DEFAULT_DATE,
- default_args={'owner': 'owner1'})
-
- task = DummyOperator(
- task_id='test_state_succeeded1',
- dag=dag,
- on_failure_callback=check_failure)
+ def task_function(ti, **context):
+ with create_session() as session:
+ self.assertEqual(State.RUNNING, ti.state)
+ ti.log.info("Marking TI as failed 'externally'")
+ ti.state = State.FAILED
+ session.merge(ti)
+ session.commit()
+
+ time.sleep(60)
+ # This should not happen -- the state change should be noticed and the task should get killed
+ data['reached_end_of_sleep'] = True
+
+ with DAG(dag_id='test_mark_failure', start_date=DEFAULT_DATE) as dag:
+ task = PythonOperator(
+ task_id='test_state_succeeded1',
+ python_callable=task_function,
+ provide_context=True,
+ on_failure_callback=check_failure)
session = settings.Session()
@@ -274,28 +285,20 @@ class LocalTaskJobTest(unittest.TestCase):
session=session)
ti = TI(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
+
job1 = LocalTaskJob(task_instance=ti,
ignore_ti_state=True,
executor=SequentialExecutor())
- from airflow.task.task_runner.standard_task_runner import StandardTaskRunner
- job1.task_runner = StandardTaskRunner(job1)
- process = multiprocessing.Process(target=job1.run)
- process.start()
- ti.refresh_from_db()
- for _ in range(0, 50):
- if ti.state == State.RUNNING:
- break
- time.sleep(0.1)
- ti.refresh_from_db()
- self.assertEqual(State.RUNNING, ti.state)
- ti.state = State.FAILED
- session.merge(ti)
- session.commit()
+ with timeout(30):
+ # This should be _much_ shorter to run.
+ # If you change this limit, make the timeout in the callbable above bigger
+ job1.run()
- job1.heartbeat_callback(session=None)
+ ti.refresh_from_db()
+ self.assertEqual(ti.state, State.FAILED)
self.assertTrue(data['called'])
- process.join(timeout=10)
- self.assertFalse(process.is_alive())
+ self.assertNotIn('reached_end_of_sleep', data,
+ 'Task should not have been allowed to run to completion')
@pytest.mark.quarantined
def test_mark_success_on_success_callback(self):