You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/08 09:00:02 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

ephraimbuddy opened a new pull request #16301:
URL: https://github.com/apache/airflow/pull/16301


   Closes: https://github.com/apache/airflow/issues/16285
   
   Currently tasks are not retried when they receive sigkill or sigterm even if the task has retries. This change fixes it
   and added test for both sigterm and sigkill so we don't experience regression
   
   Before, sigterm sets the task as failed and raises AirflowException which heartbeat sometimes see as externally set to fail
   and not call failure_callbacks. This commit also fixes this by calling handle_task_exit when a task gets sigterm
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16301:
URL: https://github.com/apache/airflow/pull/16301


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16301:
URL: https://github.com/apache/airflow/pull/16301


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r657184582



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -151,16 +147,16 @@ def signal_handler(signum, frame):
             self.on_kill()
 
     def handle_task_exit(self, return_code: int) -> None:
-        """Handle case where self.task_runner exits by itself"""
+        """Handle case where self.task_runner exits by itself or received a SIGKILL"""

Review comment:
       SIGKILL can't be caught though.
   
   >SIGKILL is an immediate termination signal and it cannot be caught or ignored by the process.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r649178181



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -154,11 +148,13 @@ def handle_task_exit(self, return_code: int) -> None:
         # in case it failed due to runtime exception/error

Review comment:
       This comment ("task exited by itself" part) is no longer true as it is called from the signal handler




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r657228926



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -151,16 +147,16 @@ def signal_handler(signum, frame):
             self.on_kill()
 
     def handle_task_exit(self, return_code: int) -> None:
-        """Handle case where self.task_runner exits by itself"""
+        """Handle case where self.task_runner exits by itself or received a SIGKILL"""

Review comment:
       The wording is pretty inaccurate here (inherited from previous implemented, see `on_kill` and line 161 below); `handle_task_exit()` is reached when the task is killed, but not necessarily by a SIGKILL. This should proably say something like “or is killed externally”.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#issuecomment-866372095


   Conflicts here too :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r650878310



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -151,16 +147,16 @@ def signal_handler(signum, frame):
             self.on_kill()
 
     def handle_task_exit(self, return_code: int) -> None:
-        """Handle case where self.task_runner exits by itself"""
+        """Handle case where self.task_runner exits by itself or received a sigkill/sigterm"""

Review comment:
       ```suggestion
           """Handle case where self.task_runner exits by itself or received a SIGKILL"""
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16301:
URL: https://github.com/apache/airflow/pull/16301






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r657468611



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -151,16 +147,16 @@ def signal_handler(signum, frame):
             self.on_kill()
 
     def handle_task_exit(self, return_code: int) -> None:
-        """Handle case where self.task_runner exits by itself"""
+        """Handle case where self.task_runner exits by itself or received a SIGKILL"""

Review comment:
       Yes. @kaxil , actually, it's the task that gets the sigkill, -9 instead, somehow, it gets to the handle task exit. See https://github.com/apache/airflow/issues/11086




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r650878667



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -151,16 +147,16 @@ def signal_handler(signum, frame):
             self.on_kill()
 
     def handle_task_exit(self, return_code: int) -> None:
-        """Handle case where self.task_runner exits by itself"""
+        """Handle case where self.task_runner exits by itself or received a sigkill/sigterm"""
         self.log.info("Task exited with return code %s", return_code)
         self.task_instance.refresh_from_db()
-        # task exited by itself, so we need to check for error file
+        # We need to check for error file
         # in case it failed due to runtime exception/error
         error = None
         if self.task_instance.state == State.RUNNING:
-            # This is for a case where the task received a sigkill
+            # This is for a case where the task received a sigkill or sigterm

Review comment:
       ```suggestion
               # This is for a case where the task received a SIGKILL
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r677261506



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -544,10 +535,53 @@ def test_process_kill_calls_on_failure_callback(self, signal_type, dag_maker):
         def failure_callback(context):
             with shared_mem_lock:
                 failure_callback_called.value += 1
-            assert context['dag_run'].dag_id == 'test_mark_failure'
+            assert context['dag_run'].dag_id == 'test_send_sigkill'
 
         def task_function(ti):
+            # pylint: disable=unused-argument
+            time.sleep(1)
+            os.kill(ti.pid, signal.SIGKILL)
+            # This should not happen -- the state change should be noticed and the task should get killed
+            with shared_mem_lock:
+                task_terminated_externally.value = 0
+
+        with dag_maker(dag_id='test_send_sigkill', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'}):

Review comment:
       ```suggestion
           with dag_maker(dag_id='test_send_sigkill'):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r657184079



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -78,16 +79,11 @@ def _execute(self):
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
             self.log.error("Received SIGTERM. Terminating subprocesses")
-            self.on_kill()
             self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(  # pylint: disable=protected-access
-                error="task received sigterm"
-            )
-            raise AirflowException("LocalTaskJob received SIGTERM signal")
+            os.kill(self.task_instance.pid, signum)

Review comment:
       Do we need L82 then? If so let's add a comment explaining why it is required.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#issuecomment-885436875


   Still flaky...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r657184582



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -151,16 +147,16 @@ def signal_handler(signum, frame):
             self.on_kill()
 
     def handle_task_exit(self, return_code: int) -> None:
-        """Handle case where self.task_runner exits by itself"""
+        """Handle case where self.task_runner exits by itself or received a SIGKILL"""

Review comment:
       SIGKILL can't be caught though, agree with TP's comment below to update the docstrings
   
   >SIGKILL is an immediate termination signal and it cannot be caught or ignored by the process.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r675181372



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -79,11 +79,7 @@ def signal_handler(signum, frame):
             """Setting kill signal handler"""
             self.log.error("Received SIGTERM. Terminating subprocesses")
             self.on_kill()
-            self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(error="task received sigterm")
-            raise AirflowException("LocalTaskJob received SIGTERM signal")
+            self.handle_task_exit(128 + signum)

Review comment:
       what's 128 + signum?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r657084261



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -78,16 +79,11 @@ def _execute(self):
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
             self.log.error("Received SIGTERM. Terminating subprocesses")
-            self.on_kill()
             self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(  # pylint: disable=protected-access
-                error="task received sigterm"
-            )
-            raise AirflowException("LocalTaskJob received SIGTERM signal")
+            os.kill(self.task_instance.pid, signum)

Review comment:
       But if `self.on_kill()`
   
   is not called we would be missing some cleanup steps, no?
   
   https://github.com/apache/airflow/blob/8217db8cb4b1ff302c5cf8662477ac00f701e78c/airflow/jobs/local_task_job.py#L172-L174

##########
File path: airflow/jobs/local_task_job.py
##########
@@ -78,16 +79,11 @@ def _execute(self):
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
             self.log.error("Received SIGTERM. Terminating subprocesses")
-            self.on_kill()
             self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(  # pylint: disable=protected-access
-                error="task received sigterm"
-            )
-            raise AirflowException("LocalTaskJob received SIGTERM signal")
+            os.kill(self.task_instance.pid, signum)

Review comment:
       But if `self.on_kill()`
   
   is not called we would be missing some cleanup steps for Task Runners, no?
   
   https://github.com/apache/airflow/blob/8217db8cb4b1ff302c5cf8662477ac00f701e78c/airflow/jobs/local_task_job.py#L172-L174




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r647378301



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -73,15 +73,8 @@ def _execute(self):
         # pylint: disable=unused-argument
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
-            self.log.error("Received SIGTERM. Terminating subprocesses")
-            self.on_kill()
-            self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(  # pylint: disable=protected-access
-                error="task received sigterm"
-            )
-            raise AirflowException("LocalTaskJob received SIGTERM signal")
+            self.log.error("LocalTaskJob received SIGTERM signal")
+            self.handle_task_exit(143)

Review comment:
       A comment describing the significance of 143 would be nice 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r649175771



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -154,11 +148,13 @@ def handle_task_exit(self, return_code: int) -> None:
         # in case it failed due to runtime exception/error
         error = None
         if self.task_instance.state == State.RUNNING:
+            print(f"Running: state: {self.task_instance.state}")

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r649176409



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -154,11 +148,13 @@ def handle_task_exit(self, return_code: int) -> None:
         # in case it failed due to runtime exception/error
         error = None
         if self.task_instance.state == State.RUNNING:
+            print(f"Running: state: {self.task_instance.state}")
             # This is for a case where the task received a sigkill
             # while running
-            self.task_instance.set_state(State.FAILED)
+            self.task_instance.handle_failure(error="Task received a SIGKILL signal")
         if self.task_instance.state != State.SUCCESS:
             error = self.task_runner.deserialize_run_error()
+        print(f"Exiting: state={self.task_instance.state}")

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil closed pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
kaxil closed pull request #16301:
URL: https://github.com/apache/airflow/pull/16301


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r647443889



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -156,7 +150,7 @@ def handle_task_exit(self, return_code: int) -> None:
         if self.task_instance.state == State.RUNNING:
             # This is for a case where the task received a sigkill
             # while running
-            self.task_instance.set_state(State.FAILED)
+            self.task_instance.handle_failure(error="Task received a SIGKILL signal")

Review comment:
       ```suggestion
               self.task_instance.handle_failure(error="Task received termination signal")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16301:
URL: https://github.com/apache/airflow/pull/16301


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r677284871



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -544,10 +535,53 @@ def test_process_kill_calls_on_failure_callback(self, signal_type, dag_maker):
         def failure_callback(context):
             with shared_mem_lock:
                 failure_callback_called.value += 1
-            assert context['dag_run'].dag_id == 'test_mark_failure'
+            assert context['dag_run'].dag_id == 'test_send_sigkill'
 
         def task_function(ti):
+            # pylint: disable=unused-argument
+            time.sleep(1)
+            os.kill(ti.pid, signal.SIGKILL)

Review comment:
       Let's be explicit --we are killing ourselves here
   ```suggestion
               os.kill(os.getpid(), signal.SIGKILL)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r675323258



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -79,11 +79,7 @@ def signal_handler(signum, frame):
             """Setting kill signal handler"""
             self.log.error("Received SIGTERM. Terminating subprocesses")
             self.on_kill()
-            self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(error="task received sigterm")
-            raise AirflowException("LocalTaskJob received SIGTERM signal")
+            self.handle_task_exit(128 + signum)

Review comment:
       Instead of writing 143, which is sigterm exit code, I divided it so I can use signum to show where the code comes from. Also see this
   https://tldp.org/LDP/abs/html/exitcodes.html
   
   I can write it 143 if it’s better




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r677309445



##########
File path: tests/models/test_taskinstance.py
##########
@@ -530,6 +531,37 @@ def raise_skip_exception():
         ti.run()
         assert State.SKIPPED == ti.state
 
+    def test_task_sigterm_works_with_retries(self):
+        """
+        Test that ensures that tasks are retried when they receive sigterm
+        """
+        dag = DAG(dag_id='test_mark_failure_2', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        def task_function(ti):
+            # pylint: disable=unused-argument
+            os.kill(ti.pid, signal.SIGTERM)

Review comment:
       ```suggestion
               os.kill(os.getpid(), signal.SIGTERM)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r650882901



##########
File path: tests/models/test_taskinstance.py
##########
@@ -514,6 +515,37 @@ def raise_skip_exception():
         ti.run()
         assert State.SKIPPED == ti.state
 
+    def test_task_sigkill_sigterm_works_with_retries(self):

Review comment:
       ```suggestion
       def test_task_sigterm_works_with_retries(self):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#issuecomment-885586906


   The flaky test will be stable when https://github.com/apache/airflow/pull/17187 is merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r649180426



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -571,6 +572,79 @@ def task_function(ti):
         assert task_terminated_externally.value == 1
         assert not process.is_alive()
 
+    @parameterized.expand(
+        [
+            (signal.SIGTERM,),
+            (signal.SIGKILL,),
+        ]
+    )
+    def test_process_kill_calls_works_with_retries(self, signal_type):
+        """
+        Test that ensures that tasks are set for up-for-retry when they receive
+        sigkill or sigterm and failure_callback is not called on getting a sigterm
+        """
+        # use shared memory value so we can properly track value change even if
+        # it's been updated across processes.
+        failure_callback_called = Value('i', 0)
+        task_terminated_externally = Value('i', 1)
+        shared_mem_lock = Lock()
+
+        def failure_callback(context):
+            with shared_mem_lock:
+                failure_callback_called.value += 1
+            assert context['dag_run'].dag_id == 'test_mark_failure_2'
+
+        dag = DAG(dag_id='test_mark_failure_2', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        def task_function(ti):
+            # pylint: disable=unused-argument
+            time.sleep(60)
+            # This should not happen -- the state change should be noticed and the task should get killed
+            with shared_mem_lock:
+                task_terminated_externally.value = 0
+
+        task = PythonOperator(
+            task_id='test_on_failure',
+            python_callable=task_function,
+            retries=1,
+            retry_delay=timedelta(seconds=3),
+            on_failure_callback=failure_callback,
+            dag=dag,
+        )
+
+        session = settings.Session()
+
+        dag.clear()
+        dag.create_dagrun(
+            run_id="test",
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            session=session,
+        )
+        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        ti.refresh_from_db()
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+
+        settings.engine.dispose()
+        process = multiprocessing.Process(target=job1.run)
+        process.start()
+
+        for _ in range(0, 20):
+            ti.refresh_from_db()
+            if ti.state == State.RUNNING and ti.pid is not None:
+                break
+            time.sleep(0.2)
+        assert ti.state == State.RUNNING
+        assert ti.pid is not None
+        os.kill(ti.pid, signal_type)

Review comment:
       I think this is killing the "wrong" process, and is not testing the new code you wrote.
   
   You want to send the signal to `process` -- and `SIGKILL` cannot be caught, so you don't need to test that.
   
   (It's only working now because you are sending it to one process further down, not the one running the LocalTaskJob code you have edited)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16301:
URL: https://github.com/apache/airflow/pull/16301


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r657470849



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -151,16 +147,16 @@ def signal_handler(signum, frame):
             self.on_kill()
 
     def handle_task_exit(self, return_code: int) -> None:
-        """Handle case where self.task_runner exits by itself"""
+        """Handle case where self.task_runner exits by itself or received a SIGKILL"""

Review comment:
       Yup, just updating docstrings as TP mentioned though
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r657509122



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -78,16 +79,11 @@ def _execute(self):
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
             self.log.error("Received SIGTERM. Terminating subprocesses")
-            self.on_kill()
             self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(  # pylint: disable=protected-access
-                error="task received sigterm"
-            )
-            raise AirflowException("LocalTaskJob received SIGTERM signal")
+            os.kill(self.task_instance.pid, signum)

Review comment:
       Added a comment because we need to refresh it to get the PID




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r658614278



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -78,16 +79,11 @@ def _execute(self):
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
             self.log.error("Received SIGTERM. Terminating subprocesses")
-            self.on_kill()
             self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(  # pylint: disable=protected-access
-                error="task received sigterm"
-            )
-            raise AirflowException("LocalTaskJob received SIGTERM signal")
+            os.kill(self.task_instance.pid, signum)

Review comment:
       I dont understand this change.
   
   on_kill called task_runner.terminate which terminates the process and tidies up afterwards.
   
   In the case of run_as_user, task_instance.pid will be a different user and thus _we won't be able to send it the kill signal_.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r657091158



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -78,16 +79,11 @@ def _execute(self):
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
             self.log.error("Received SIGTERM. Terminating subprocesses")
-            self.on_kill()
             self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(  # pylint: disable=protected-access
-                error="task received sigterm"
-            )
-            raise AirflowException("LocalTaskJob received SIGTERM signal")
+            os.kill(self.task_instance.pid, signum)

Review comment:
       It will now exit by itself and not terminate immediately.
   
   The SigTerm will kill the task and run the `task.on_kill` method then exits through the LocalTaskJob.handle_task_exit and the task_runner will terminate. Right?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r650880747



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -78,16 +79,11 @@ def _execute(self):
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
             self.log.error("Received SIGTERM. Terminating subprocesses")
-            self.on_kill()
             self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(  # pylint: disable=protected-access
-                error="task received sigterm"
-            )
-            raise AirflowException("LocalTaskJob received SIGTERM signal")
+            os.kill(self.task_instance.pid, signum)

Review comment:
       Passed this to TI so we can properly handle the callbacks for sigterm




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r650880028



##########
File path: airflow/models/taskinstance.py
##########
@@ -1311,12 +1311,6 @@ def _run_finished_callback(self, error: Optional[Union[str, Exception]] = None)
             if task.on_success_callback is not None:
                 context = self.get_template_context()
                 task.on_success_callback(context)
-        elif self.state == State.UP_FOR_RETRY:
-            task = self.task
-            if task.on_retry_callback is not None:
-                context = self.get_template_context()
-                context["exception"] = error
-                task.on_retry_callback(context)

Review comment:
       I have moved this to handle_failure method since UP_FOR_RETRY is not a finished callback and that makes it not to be called when on_retry_callback is set




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr closed pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
uranusjr closed pull request #16301:
URL: https://github.com/apache/airflow/pull/16301


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy merged pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #16301:
URL: https://github.com/apache/airflow/pull/16301


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r675172756



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -78,16 +79,11 @@ def _execute(self):
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
             self.log.error("Received SIGTERM. Terminating subprocesses")
-            self.on_kill()
             self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(  # pylint: disable=protected-access
-                error="task received sigterm"
-            )
-            raise AirflowException("LocalTaskJob received SIGTERM signal")
+            os.kill(self.task_instance.pid, signum)

Review comment:
       I have updated this. Actually I was misunderstanding what triggers this area of the code. 
   This is triggered by sigterm on task runner and not sigterm on task. 
   When we get the task runner sigterm, we exit the task and handle failure with callback.  This ensures tasks are retried. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r649174921



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -73,15 +73,9 @@ def _execute(self):
         # pylint: disable=unused-argument
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
-            self.log.error("Received SIGTERM. Terminating subprocesses")
-            self.on_kill()
-            self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(  # pylint: disable=protected-access
-                error="task received sigterm"
-            )
-            raise AirflowException("LocalTaskJob received SIGTERM signal")
+            self.log.error("LocalTaskJob received SIGTERM signal")
+            # Call the handle_task_exit method with SIGTERM exit code 143:
+            self.handle_task_exit(143)

Review comment:
       ```suggestion
               self.handle_task_exit(128 + signum)
   ```
   
   would be better I think (this is the default behaviour




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r650882443



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -686,6 +679,71 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    @parameterized.expand([(signal.SIGKILL,), (signal.SIGTERM,)])
+    def test_task_sigkill_sigterm_works_with_retries(self, sigtype):
+        """
+        Test that ensures that task runner retries tasks when they receive sigkill

Review comment:
       ```suggestion
           Test that ensures that task runner retries tasks when they receive sigkill or sigterm
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#issuecomment-887446672


   All the MSSQL tests are failing, not sure if they are related or not. The static check failure is unrelated and caused by https://github.com/apache/airflow/pull/17237 - @ashb is fixing it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r649558802



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -80,14 +80,15 @@ def signal_handler(signum, frame):
             self.log.error("Received SIGTERM. Terminating subprocesses")
             self.on_kill()
             self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(  # pylint: disable=protected-access
-                error="task received sigterm"
-            )
+            if self.task_instance.state == State.RUNNING:
+                # This only happens when a pod is deleted
+                # killing the task_runner by sending sigterm.
+                # TI sigterm is handled at taskinstance.py
+                self.handle_task_exit(128 + signum)

Review comment:
       @ashb The test I had previously for sigterm was not hitting here. I'm only able to hit here when I delete a running kubernetes pod. I wonder how we can test this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r650881572



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -513,16 +514,10 @@ def task_function(ti):
         assert task_terminated_externally.value == 1
         assert not process.is_alive()
 
-    @parameterized.expand(
-        [
-            (signal.SIGTERM,),
-            (signal.SIGKILL,),
-        ]
-    )
-    @pytest.mark.quarantined
-    def test_process_kill_calls_on_failure_callback(self, signal_type):
+    @parameterized.expand([(signal.SIGKILL,), (signal.SIGTERM,)])
+    def test_task_sigkill_sigterm_calls_on_failure_callback(self, sigtype):
         """
-        Test that ensures that when a task is killed with sigterm or sigkill
+        Test that ensures that when a task is killed with sigkill

Review comment:
       ```suggestion
           Test that ensures that when a task is killed with sigterm or sigkill 
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r649174921



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -73,15 +73,9 @@ def _execute(self):
         # pylint: disable=unused-argument
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
-            self.log.error("Received SIGTERM. Terminating subprocesses")
-            self.on_kill()
-            self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(  # pylint: disable=protected-access
-                error="task received sigterm"
-            )
-            raise AirflowException("LocalTaskJob received SIGTERM signal")
+            self.log.error("LocalTaskJob received SIGTERM signal")
+            # Call the handle_task_exit method with SIGTERM exit code 143:
+            self.handle_task_exit(143)

Review comment:
       ```suggestion
               self.handle_task_exit(128 + signum)
   ```
   
   would be better I think (this is the same behaviour, but just has a little bit less magic constant/shows how 143 comes about)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r657509122



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -78,16 +79,11 @@ def _execute(self):
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
             self.log.error("Received SIGTERM. Terminating subprocesses")
-            self.on_kill()
             self.task_instance.refresh_from_db()
-            if self.task_instance.state not in State.finished:
-                self.task_instance.set_state(State.FAILED)
-            self.task_instance._run_finished_callback(  # pylint: disable=protected-access
-                error="task received sigterm"
-            )
-            raise AirflowException("LocalTaskJob received SIGTERM signal")
+            os.kill(self.task_instance.pid, signum)

Review comment:
       Added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16301: Fix task retries when they receive sigkill and have retries. Properly handle sigterm too

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16301:
URL: https://github.com/apache/airflow/pull/16301#discussion_r650881572



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -513,16 +514,10 @@ def task_function(ti):
         assert task_terminated_externally.value == 1
         assert not process.is_alive()
 
-    @parameterized.expand(
-        [
-            (signal.SIGTERM,),
-            (signal.SIGKILL,),
-        ]
-    )
-    @pytest.mark.quarantined
-    def test_process_kill_calls_on_failure_callback(self, signal_type):
+    @parameterized.expand([(signal.SIGKILL,), (signal.SIGTERM,)])
+    def test_task_sigkill_sigterm_calls_on_failure_callback(self, sigtype):
         """
-        Test that ensures that when a task is killed with sigterm or sigkill
+        Test that ensures that when a task is killed with sigkill

Review comment:
       ```suggestion
           Test that ensures that when a task is killed with sigterm or sigkill
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org