You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/08/17 08:29:31 UTC

[airflow] 03/07: Fix task retries when they receive sigkill and have retries and properly handle sigterm (#16301)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1a264a4dca4282bc5af74012e6f157c4cace0062
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jul 28 15:57:35 2021 +0100

    Fix task retries when they receive sigkill and have retries and properly handle sigterm (#16301)
    
    Currently, tasks are not retried when they receive SIGKILL or SIGTERM even if the task has retry. This change fixes it
    and added test for both SIGTERM and SIGKILL so we don't experience regression
    
    Also, SIGTERM sets the task as failed and raises AirflowException which heartbeat sometimes see as externally set to fail
    and not call failure_callbacks. This commit also fixes this by calling handle_task_exit when a task gets SIGTERM
    
    Co-authored-by: Ash Berlin-Taylor <as...@firemirror.com>
    (cherry picked from commit 4e2a94c6d1bde5ddf2aa0251190c318ac22f3b17)
---
 tests/jobs/test_local_task_job.py | 50 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 50 insertions(+)

diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index c6a92b2..2e28332 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -786,6 +786,56 @@ class TestLocalTaskJob:
         assert retry_callback_called.value == 1
         assert task_terminated_externally.value == 1
 
+    def test_process_sigterm_works_with_retries(self, dag_maker):
+        """
+        Test that ensures that task runner sets tasks to retry when they(task runner)
+         receive sigterm
+        """
+        # use shared memory value so we can properly track value change even if
+        # it's been updated across processes.
+        retry_callback_called = Value('i', 0)
+        task_terminated_externally = Value('i', 1)
+        shared_mem_lock = Lock()
+
+        def retry_callback(context):
+            with shared_mem_lock:
+                retry_callback_called.value += 1
+            assert context['dag_run'].dag_id == 'test_mark_failure_2'
+
+        def task_function(ti):
+            time.sleep(60)
+            # This should not happen -- the state change should be noticed and the task should get killed
+            with shared_mem_lock:
+                task_terminated_externally.value = 0
+
+        with dag_maker(dag_id='test_mark_failure_2'):
+            task = PythonOperator(
+                task_id='test_on_failure',
+                python_callable=task_function,
+                retries=1,
+                retry_delay=timedelta(seconds=2),
+                on_retry_callback=retry_callback,
+            )
+        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        ti.refresh_from_db()
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        job1.task_runner.start()
+        settings.engine.dispose()
+        process = multiprocessing.Process(target=job1.run)
+        process.start()
+        for _ in range(0, 25):
+            ti.refresh_from_db()
+            if ti.state == State.RUNNING and ti.pid is not None:
+                break
+            time.sleep(0.2)
+        os.kill(process.pid, signal.SIGTERM)
+        process.join()
+        ti.refresh_from_db()
+        assert ti.state == State.UP_FOR_RETRY
+        assert retry_callback_called.value == 1
+        assert task_terminated_externally.value == 1
+
     def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
         """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
         dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)