You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/05/27 15:20:38 UTC

[GitHub] [airflow] ashb commented on a change in pull request #15989: Fix Celery executor getting stuck randomly because of reset_signals in multiprocessing

ashb commented on a change in pull request #15989:
URL: https://github.com/apache/airflow/pull/15989#discussion_r640727665



##########
File path: tests/executors/test_celery_executor.py
##########
@@ -484,3 +485,57 @@ def test_should_support_base_backend(self):
         assert [
             'DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 2 state(s) for 2 task(s)'
         ] == cm.output
+
+
+class MockTask:
+    """
+    A picklable object used to mock tasks sent to Celery. Can't use the mock library
+    here because it's not picklable.
+    """
+
+    def apply_async(self, *args, **kwargs):
+        return 1
+
+
+def _exit_gracefully(signum, _):
+    print(f"{os.getpid()} Exiting gracefully upon receiving signal {signum}")
+    sys.exit(signum)
+
+
+@pytest.fixture
+def register_signals():
+    """
+    Register the same signals as scheduler does to test celery_executor to make sure it does not
+    hang.
+    """
+
+    print(f"{os.getpid()} register_signals()")
+

Review comment:
       ```suggestion
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -745,21 +753,24 @@ def register_signals(self) -> None:
 
     def _exit_gracefully(self, signum, frame) -> None:  # pylint: disable=unused-argument
         """Helper method to clean up processor_agent to avoid leaving orphan processes."""
-        self.log.info("Exiting gracefully upon receiving signal %s", signum)
-        if self.processor_agent:
-            self.processor_agent.end()
-        sys.exit(os.EX_OK)
+        if _is_parent_process():
+            # Only the parent process should perform the cleanup.
+            self.log.info("Exiting gracefully upon receiving signal %s", signum)
+            if self.processor_agent:
+                self.processor_agent.end()
+            sys.exit(os.EX_OK)
 
     def _debug_dump(self, signum, frame):  # pylint: disable=unused-argument
-        try:
-            sig_name = signal.Signals(signum).name  # pylint: disable=no-member
-        except Exception:  # pylint: disable=broad-except
-            sig_name = str(signum)
+        if _is_parent_process():

Review comment:
       ```suggestion
           if not _is_parent_process():
               return
   ```
   
   Then we don't need to indent/change the rest of this fn.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org