You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/20 18:32:28 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #15172: Execute on failure callback when a sigterm is received

kaxil commented on a change in pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#discussion_r616938204



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -495,6 +495,69 @@ def task_function(ti):
         assert task_terminated_externally.value == 1
         assert not process.is_alive()
 
+    def test_process_kill_call_on_failure_callback(self):
+        """
+        Test that ensures that where a task is killed with sigterm
+        on_failure_callback gets executed

Review comment:
       ```suggestion
           Test that ensures that when a task is killed with sigterm
           on_failure_callback gets executed
   ```

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -495,6 +495,69 @@ def task_function(ti):
         assert task_terminated_externally.value == 1
         assert not process.is_alive()
 
+    def test_process_kill_call_on_failure_callback(self):
+        """
+        Test that ensures that where a task is killed with sigterm
+        on_failure_callback gets executed
+        """
+        # use shared memory value so we can properly track value change even if
+        # it's been updated across processes.
+        failure_callback_called = Value('i', 0)
+        task_terminated_externally = Value('i', 1)
+        shared_mem_lock = Lock()
+
+        def failure_callback(context):
+            with shared_mem_lock:
+                failure_callback_called.value += 1
+            assert context['dag_run'].dag_id == 'test_mark_failure'
+
+        dag = DAG(dag_id='test_mark_failure', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        def task_function(ti):
+            # pylint: disable=unused-argument
+            time.sleep(60)
+            # This should not happen -- the state change should be noticed and the task should get killed
+            with shared_mem_lock:
+                task_terminated_externally.value = 0
+
+        task = PythonOperator(
+            task_id='test_on_failure',
+            python_callable=task_function,
+            on_failure_callback=failure_callback,
+            dag=dag,
+        )
+
+        session = settings.Session()
+
+        dag.clear()
+        dag.create_dagrun(
+            run_id="test",
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            session=session,
+        )
+        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        ti.refresh_from_db()
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+
+        settings.engine.dispose()
+        process = multiprocessing.Process(target=job1.run)
+        process.start()
+
+        for _ in range(0, 10):
+            ti.refresh_from_db()
+            if ti.state == State.RUNNING:
+                break
+            time.sleep(0.2)
+        assert ti.state == State.RUNNING
+        os.kill(ti.pid, 15)

Review comment:
       ```suggestion
           os.kill(ti.pid, signal.SIGTERM)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org