You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2021/08/03 08:54:18 UTC
[airflow] branch main updated: Fix quarantined/flaky tests in
test_local_task_job.py (#17385)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6895075 Fix quarantined/flaky tests in test_local_task_job.py (#17385)
6895075 is described below
commit 6895075d560e1a35d0d56f983963f8afb96301fe
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Aug 3 09:54:02 2021 +0100
Fix quarantined/flaky tests in test_local_task_job.py (#17385)
This PR attempts to fix some flaky/quarantined tests in test_local_task_job.py
by removing assert not process.is_alive() in the tests and making sure process.join
is called with timeout
---
tests/jobs/test_local_task_job.py | 18 ++++++------------
1 file changed, 6 insertions(+), 12 deletions(-)
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 1d6d572..7b05f9b 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -306,9 +306,7 @@ class TestLocalTaskJob:
ti.state = State.SUCCESS
session.merge(ti)
session.commit()
-
- process.join()
- assert not process.is_alive()
+ process.join(timeout=10)
ti.refresh_from_db()
assert State.SUCCESS == ti.state
@@ -501,7 +499,6 @@ class TestLocalTaskJob:
assert ti.state == State.FAILED # task exits with failure state
assert failure_callback_called.value == 1
- @pytest.mark.quarantined
def test_mark_success_on_success_callback(self, dag_maker):
"""
Test that ensures that where a task is marked success in the UI
@@ -554,10 +551,9 @@ class TestLocalTaskJob:
session.merge(ti)
session.commit()
ti.refresh_from_db()
- process.join()
+ process.join(timeout=10)
assert success_callback_called.value == 1
assert task_terminated_externally.value == 1
- assert not process.is_alive()
def test_task_sigkill_calls_on_failure_callback(self, dag_maker):
"""
@@ -595,10 +591,9 @@ class TestLocalTaskJob:
process = multiprocessing.Process(target=job1.run)
process.start()
time.sleep(0.3)
- process.join()
+ process.join(timeout=10)
assert failure_callback_called.value == 1
assert task_terminated_externally.value == 1
- assert not process.is_alive()
def test_process_sigterm_calls_on_failure_callback(self, dag_maker):
"""
@@ -641,10 +636,9 @@ class TestLocalTaskJob:
time.sleep(0.2)
os.kill(process.pid, signal.SIGTERM)
ti.refresh_from_db()
- process.join()
+ process.join(timeout=10)
assert failure_callback_called.value == 1
assert task_terminated_externally.value == 1
- assert not process.is_alive()
@pytest.mark.parametrize(
"conf, dependencies, init_state, first_run_state, second_run_state, error_message",
@@ -781,7 +775,7 @@ class TestLocalTaskJob:
process = multiprocessing.Process(target=job1.run)
process.start()
time.sleep(0.4)
- process.join()
+ process.join(timeout=10)
ti.refresh_from_db()
assert ti.state == State.UP_FOR_RETRY
assert retry_callback_called.value == 1
@@ -831,7 +825,7 @@ class TestLocalTaskJob:
break
time.sleep(0.2)
os.kill(process.pid, signal.SIGTERM)
- process.join()
+ process.join(timeout=10)
ti.refresh_from_db()
assert ti.state == State.UP_FOR_RETRY
assert retry_callback_called.value == 1