You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2021/08/03 08:54:18 UTC

[airflow] branch main updated: Fix quarantined/flaky tests in test_local_task_job.py (#17385)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6895075  Fix quarantined/flaky tests in test_local_task_job.py (#17385)
6895075 is described below

commit 6895075d560e1a35d0d56f983963f8afb96301fe
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Aug 3 09:54:02 2021 +0100

    Fix quarantined/flaky tests in test_local_task_job.py (#17385)
    
    This PR attempts to fix some flaky/quarantined tests in test_local_task_job.py
    by removing assert not process.is_alive() in the tests and making sure process.join
    is called with timeout
---
 tests/jobs/test_local_task_job.py | 18 ++++++------------
 1 file changed, 6 insertions(+), 12 deletions(-)

diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 1d6d572..7b05f9b 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -306,9 +306,7 @@ class TestLocalTaskJob:
         ti.state = State.SUCCESS
         session.merge(ti)
         session.commit()
-
-        process.join()
-        assert not process.is_alive()
+        process.join(timeout=10)
         ti.refresh_from_db()
         assert State.SUCCESS == ti.state
 
@@ -501,7 +499,6 @@ class TestLocalTaskJob:
         assert ti.state == State.FAILED  # task exits with failure state
         assert failure_callback_called.value == 1
 
-    @pytest.mark.quarantined
     def test_mark_success_on_success_callback(self, dag_maker):
         """
         Test that ensures that where a task is marked success in the UI
@@ -554,10 +551,9 @@ class TestLocalTaskJob:
         session.merge(ti)
         session.commit()
         ti.refresh_from_db()
-        process.join()
+        process.join(timeout=10)
         assert success_callback_called.value == 1
         assert task_terminated_externally.value == 1
-        assert not process.is_alive()
 
     def test_task_sigkill_calls_on_failure_callback(self, dag_maker):
         """
@@ -595,10 +591,9 @@ class TestLocalTaskJob:
         process = multiprocessing.Process(target=job1.run)
         process.start()
         time.sleep(0.3)
-        process.join()
+        process.join(timeout=10)
         assert failure_callback_called.value == 1
         assert task_terminated_externally.value == 1
-        assert not process.is_alive()
 
     def test_process_sigterm_calls_on_failure_callback(self, dag_maker):
         """
@@ -641,10 +636,9 @@ class TestLocalTaskJob:
             time.sleep(0.2)
         os.kill(process.pid, signal.SIGTERM)
         ti.refresh_from_db()
-        process.join()
+        process.join(timeout=10)
         assert failure_callback_called.value == 1
         assert task_terminated_externally.value == 1
-        assert not process.is_alive()
 
     @pytest.mark.parametrize(
         "conf, dependencies, init_state, first_run_state, second_run_state, error_message",
@@ -781,7 +775,7 @@ class TestLocalTaskJob:
         process = multiprocessing.Process(target=job1.run)
         process.start()
         time.sleep(0.4)
-        process.join()
+        process.join(timeout=10)
         ti.refresh_from_db()
         assert ti.state == State.UP_FOR_RETRY
         assert retry_callback_called.value == 1
@@ -831,7 +825,7 @@ class TestLocalTaskJob:
                 break
             time.sleep(0.2)
         os.kill(process.pid, signal.SIGTERM)
-        process.join()
+        process.join(timeout=10)
         ti.refresh_from_db()
         assert ti.state == State.UP_FOR_RETRY
         assert retry_callback_called.value == 1