You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/05/19 11:18:55 UTC

[GitHub] [airflow] yuqian90 opened a new issue #15938: celery_executor becomes stuck if child process receives signal before reset_signals is called

yuqian90 opened a new issue #15938:
URL: https://github.com/apache/airflow/issues/15938


   **Apache Airflow version**: 1.10.13 onwards (Any version that picked up #11278, including Airflow 2.0.* and 2.1.*)
   
   
   **Environment**:
   - **Cloud provider or hardware configuration**: Any
   - **OS** (e.g. from /etc/os-release): Only tested on Debian Linux, but others may be affected too
   - **Kernel** (e.g. `uname -a`): Any
   - **Install tools**: Any
   - **Others**: Only celery_executor is affected
   
   **What happened**:
   This was first reported [here](https://github.com/apache/airflow/issues/7935#issuecomment-839656436).
   airflow-scheduler sometimes stops heartbeating and stops scheduling any tasks with this last line in the log. This happen at random times, a few times a week. Happens more often if the scheduler machine is slow. 
   
   ```
   {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15
   ```
   
   Related to #7935
   Most likely caused by #11278
   
   **What you expected to happen**: 
   Scheduler should not become stuck
   
   **How to reproduce it**:
   
   Here's a small reproducing example of the problem. There's roughly 1/25 chance it will be stuck. Run it many times to see it happen.
   
   ```python
   #!/usr/bin/env python3.8
   import os
   import random
   import signal
   import time
   from multiprocessing import Pool
   
   
   def send_task_to_executor(arg):
       pass
   
   
   def _exit_gracefully(signum, frame):
       print(f"{os.getpid()} Exiting gracefully upon receiving signal {signum}")
   
   
   def register_signals():
       print(f"{os.getpid()} register_signals()")
       signal.signal(signal.SIGINT, _exit_gracefully)
       signal.signal(signal.SIGTERM, _exit_gracefully)
       signal.signal(signal.SIGUSR2, _exit_gracefully)
   
   
   def reset_signals():
       if random.randint(0, 500) == 0:
           # This sleep statement here simulates the machine being busy
           print(f"{os.getpid()} is slow")
           time.sleep(0.1)
       signal.signal(signal.SIGINT, signal.SIG_DFL)
       signal.signal(signal.SIGTERM, signal.SIG_DFL)
       signal.signal(signal.SIGUSR2, signal.SIG_DFL)
   
   
   if __name__ == "__main__":
       register_signals()
   
       task_tuples_to_send = list(range(20))
       sync_parallelism = 15
       chunksize = 5
   
       with Pool(processes=sync_parallelism, initializer=reset_signals) as pool:
           pool.map(
               send_task_to_executor,
               task_tuples_to_send,
               chunksize=chunksize,
           )
   
   
   ```
   
   The reproducing example above can become stuck with a `py-spy dump` that looks exactly like what airflow scheduler does:
   
   `py-spy dump` for the parent `airflow scheduler` process
   ```
   Python v3.8.7
   
   Thread 0x7FB54794E740 (active): "MainThread"
       poll (multiprocessing/popen_fork.py:27)
       wait (multiprocessing/popen_fork.py:47)
       join (multiprocessing/process.py:149)
       _terminate_pool (multiprocessing/pool.py:729)
       __call__ (multiprocessing/util.py:224)
       terminate (multiprocessing/pool.py:654)
       __exit__ (multiprocessing/pool.py:736)
       _send_tasks_to_celery (airflow/executors/celery_executor.py:331)
       _process_tasks (airflow/executors/celery_executor.py:272)
       trigger_tasks (airflow/executors/celery_executor.py:263)
       heartbeat (airflow/executors/base_executor.py:158)
       _run_scheduler_loop (airflow/jobs/scheduler_job.py:1388)
       _execute (airflow/jobs/scheduler_job.py:1284)
       run (airflow/jobs/base_job.py:237)
       scheduler (airflow/cli/commands/scheduler_command.py:63)
       wrapper (airflow/utils/cli.py:89)
       command (airflow/cli/cli_parser.py:48)
       main (airflow/__main__.py:40)
       <module> (airflow:8)
   ```
   
   `py-spy dump` for the child `airflow scheduler` process
   
   ```
   Python v3.8.7
   
   Thread 16232 (idle): "MainThread"
       __enter__ (multiprocessing/synchronize.py:95)
       get (multiprocessing/queues.py:355)
       worker (multiprocessing/pool.py:114)
       run (multiprocessing/process.py:108)
       _bootstrap (multiprocessing/process.py:315)
       _launch (multiprocessing/popen_fork.py:75)
       __init__ (multiprocessing/popen_fork.py:19)
       _Popen (multiprocessing/context.py:277)
       start (multiprocessing/process.py:121)
       _repopulate_pool_static (multiprocessing/pool.py:326)
       _repopulate_pool (multiprocessing/pool.py:303)
       __init__ (multiprocessing/pool.py:212)
       Pool (multiprocessing/context.py:119)
       _send_tasks_to_celery (airflow/executors/celery_executor.py:330)
       _process_tasks (airflow/executors/celery_executor.py:272)
       trigger_tasks (airflow/executors/celery_executor.py:263)
       heartbeat (airflow/executors/base_executor.py:158)
       _run_scheduler_loop (airflow/jobs/scheduler_job.py:1388)
       _execute (airflow/jobs/scheduler_job.py:1284)
       run (airflow/jobs/base_job.py:237)
       scheduler (airflow/cli/commands/scheduler_command.py:63)
       wrapper (airflow/utils/cli.py:89)
       command (airflow/cli/cli_parser.py:48)
       main (airflow/__main__.py:40)
       <module> (airflow:8)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] gdevanla commented on issue #15938: celery_executor becomes stuck if child process receives signal before reset_signals is called

Posted by GitBox <gi...@apache.org>.
gdevanla commented on issue #15938:
URL: https://github.com/apache/airflow/issues/15938#issuecomment-845671073


   I just upgraded to 2.0.2 and this happens almost within 1 minute of scheduler restart.  We are on Python 3.8 and using RabbitMq/Celery.
   
   ```
   [2021-05-21 05:40:15,996] {channel.py:446} DEBUG - Channel open
   [2021-05-21 05:40:16,000] {channel.py:105} DEBUG - using channel_id: 1
   [2021-05-21 05:40:16,005] {channel.py:446} DEBUG - Channel open
   [[2021-05-21 05:40:16,053] {2021-05-21 05:40:16,053scheduler_job.py:] {746} INFOscheduler_job.py: - Exiting gracefully upon receiving signal 15746} INFO
    - Exiting gracefully upon receiving signal 15
   [2021-05-21 05:40:16,060] {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15
   [2021-05-21 05:40:16,067] {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15
   [2021-05-21 05:40:16,076] {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15
   [2021-05-21 05:43:34,715] {serialized_dag.py:197} DEBUG - Deleting Serialized DAGs (for which DAG files are deleted) from serialized_dag table 
   [2021-05-21 05:43:34,716] {dag.py:2213} DEBUG - Deactivating DAGs (for which DAG files are deleted) from dag table 
   [2021-05-21 05:43:34,723] {dagcode.py:135} DEBUG - Deleting code from dag_code table 
   [2021-05-21 05:43:48,746] {settings.py:292} DEBUG - Disposing DB connection pool (PID 6758)
   [2021-05-21 05:43:48,760] {scheduler_job.py:310} DEBUG - Waiting for <ForkProcess name='DagFileProcessor1-Process' pid=6758 parent=1204 stopped exitcode=0>
   ```
   
   And here is the corresponding logs in the `error` file: (I am not sure if this is related, but just thought I would add this if it could be useful)
   
   ```
   Process ForkPoolWorker-622:
   Traceback (most recent call last):
     File "/home/imcron/py_src/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
       self.run()
     File "/home/imcron/py_src/lib/python3.8/multiprocessing/process.py", line 108, in run
       self._target(*self._args, **self._kwargs)
     File "/home/imcron/py_src/lib/python3.8/multiprocessing/pool.py", line 114, in worker
       task = get()
     File "/home/imcron/py_src/lib/python3.8/multiprocessing/queues.py", line 355, in get
       with self._rlock:
     File "/home/imcron/py_src/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
       return self._semlock.__enter__()
     File "/home/imcron/rapc/.env38/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 748, in _exit_gracefully
       self.processor_agent.end()
     File "/home/imcron/rapc/.env38/lib/python3.8/site-packages/airflow/utils/dag_processing.py", line 458, in end
       self._process.join(timeout=1.0)
     File "/home/imcron/py_src/lib/python3.8/multiprocessing/process.py", line 147, in join
       assert self._parent_pid == os.getpid(), 'can only join a child process'
   AssertionError: can only join a child process
   ```
   
   
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] gdevanla commented on issue #15938: celery_executor becomes stuck if child process receives signal before reset_signals is called

Posted by GitBox <gi...@apache.org>.
gdevanla commented on issue #15938:
URL: https://github.com/apache/airflow/issues/15938#issuecomment-845692411


   On a related note, we used to have `-r` option to recycle the scheduler.  Looks like that option is not available anymore. That option would have been a nice workaround for this kind of issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] gdevanla edited a comment on issue #15938: celery_executor becomes stuck if child process receives signal before reset_signals is called

Posted by GitBox <gi...@apache.org>.
gdevanla edited a comment on issue #15938:
URL: https://github.com/apache/airflow/issues/15938#issuecomment-845671073


   I just upgraded to 2.0.2 and this happens almost within 1 minute of scheduler restart.  We are on Python 3.8 and using RabbitMq/Celery.
   
   ```
   [2021-05-21 05:40:15,996] {channel.py:446} DEBUG - Channel open
   [2021-05-21 05:40:16,000] {channel.py:105} DEBUG - using channel_id: 1
   [2021-05-21 05:40:16,005] {channel.py:446} DEBUG - Channel open
   [[2021-05-21 05:40:16,053] {2021-05-21 05:40:16,053scheduler_job.py:] {746} INFOscheduler_job.py: - Exiting gracefully upon receiving signal 15746} INFO
    - Exiting gracefully upon receiving signal 15
   [2021-05-21 05:40:16,060] {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15
   [2021-05-21 05:40:16,067] {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15
   [2021-05-21 05:40:16,076] {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15
   [2021-05-21 05:43:34,715] {serialized_dag.py:197} DEBUG - Deleting Serialized DAGs (for which DAG files are deleted) from serialized_dag table 
   [2021-05-21 05:43:34,716] {dag.py:2213} DEBUG - Deactivating DAGs (for which DAG files are deleted) from dag table 
   [2021-05-21 05:43:34,723] {dagcode.py:135} DEBUG - Deleting code from dag_code table 
   [2021-05-21 05:43:48,746] {settings.py:292} DEBUG - Disposing DB connection pool (PID 6758)
   [2021-05-21 05:43:48,760] {scheduler_job.py:310} DEBUG - Waiting for <ForkProcess name='DagFileProcessor1-Process' pid=6758 parent=1204 stopped exitcode=0>
   ```
   
   And here is the corresponding logs in the `error` file: (I am not sure if this is related, but just thought I would add this if it could be useful)
   
   ```
   Process ForkPoolWorker-622:
   Traceback (most recent call last):
     File "/home/foo/py_src/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
       self.run()
     File "/home/foo/py_src/lib/python3.8/multiprocessing/process.py", line 108, in run
       self._target(*self._args, **self._kwargs)
     File "/home/foo/py_src/lib/python3.8/multiprocessing/pool.py", line 114, in worker
       task = get()
     File "/home/foo/py_src/lib/python3.8/multiprocessing/queues.py", line 355, in get
       with self._rlock:
     File "/home/foo/py_src/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
       return self._semlock.__enter__()
     File "/home/foo/rapc/.env38/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 748, in _exit_gracefully
       self.processor_agent.end()
     File "/home/foo/rapc/.env38/lib/python3.8/site-packages/airflow/utils/dag_processing.py", line 458, in end
       self._process.join(timeout=1.0)
     File "/home/foo/py_src/lib/python3.8/multiprocessing/process.py", line 147, in join
       assert self._parent_pid == os.getpid(), 'can only join a child process'
   AssertionError: can only join a child process
   ```
   
   
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #15938: celery_executor becomes stuck if child process receives signal before reset_signals is called

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #15938:
URL: https://github.com/apache/airflow/issues/15938#issuecomment-846143436


   I have _no_ problem with us not using multiprocessing - this isn't or first such problem like this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #15938: celery_executor becomes stuck if child process receives signal before reset_signals is called

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #15938:
URL: https://github.com/apache/airflow/issues/15938#issuecomment-844143373


   Just _how_ slow does it have to be to happen?
   
   Nice digging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #15938: celery_executor becomes stuck if child process receives signal before reset_signals is called

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #15938:
URL: https://github.com/apache/airflow/issues/15938#issuecomment-844144705


   We can probably guard this by closing of the current pid when we register them, and checking that the signal is received by the same pid


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #15938: celery_executor becomes stuck if child process receives signal before reset_signals is called

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #15938:
URL: https://github.com/apache/airflow/issues/15938#issuecomment-844139146


   Whoa!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on issue #15938: celery_executor becomes stuck if child process receives signal before reset_signals is called

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on issue #15938:
URL: https://github.com/apache/airflow/issues/15938#issuecomment-845994258


   > Just how slow does it have to be to happen?
   > We can probably guard this by closing of the current pid when we register them, and checking that the signal is received by the same pid
   
   Hi, @ashb it's not clear to me how slow it must be exactly for this to happen. It looks like as long as some child processes are a fraction of a second slower than the others, they easily get into a deadlock when a SIGTERM is received. So even a transient slowness of a beefy machine can cause this to happen. 
   
   Here's what I tried so far. Only the last method seems to fix the issue completely (i.e. we have to stop using `multiprocessing.Pool`):
   - Tried to reset the signal handler to `signal.SIG_DFL` in `register_signals` if the current process is a child process. This doesn't help because the child process inherits the parent's signal handler when it's forked. Still hangs occasionally.
   - Tried to make `_exit_gracefully` a no-op if the current process is a child process. This isn't sufficient. Still hangs occasionally.
   - Tried to change multiprocessing to use "spawn" instead of "fork" like some people suggested [on the internet](https://pythonspeed.com/articles/python-multiprocessing/), it greatly reduced the chance of this issue happening. However, after running the reproducing example about 8000 times, it still happened. So it doesn't fix the issue completely.
   - **Replace `multiprocessing.Pool` with `concurrent.futures.process.ProcessPoolExecutor`. Once this is done, the reproducing example no longer hangs even after running it tens of thousands times.**. So I put up PR #15938 which fixes the issue using this method. 
   
   From experience, `multiprocessing.Pool` is notorious for causing mysterious hangs like these. Using `ProcessPoolExecutor` does not cause the same problems. It has similar interface and uses similar underlying libraries. I don't understand exactly why it fixes the issue, but in practice it always seems to help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 edited a comment on issue #15938: celery_executor becomes stuck if child process receives signal before reset_signals is called

Posted by GitBox <gi...@apache.org>.
yuqian90 edited a comment on issue #15938:
URL: https://github.com/apache/airflow/issues/15938#issuecomment-845994258


   > Just how slow does it have to be to happen?
   > We can probably guard this by closing of the current pid when we register them, and checking that the signal is received by the same pid
   
   Hi, @ashb it's not clear to me how slow it must be exactly for this to happen. It looks like as long as some child processes are a fraction of a second slower than the others, they easily get into a deadlock when a SIGTERM is received. So even a transient slowness of a beefy machine can cause this to happen. 
   
   Here's what I tried so far. Only the last method seems to fix the issue completely (i.e. we have to stop using `multiprocessing.Pool`):
   - Tried to reset the signal handler to `signal.SIG_DFL` in `register_signals` if the current process is a child process. This doesn't help because the child process inherits the parent's signal handler when it's forked. Still hangs occasionally.
   - Tried to make `_exit_gracefully` a no-op if the current process is a child process. This isn't sufficient. Still hangs occasionally.
   - Tried to change multiprocessing to use "spawn" instead of "fork" like some people suggested [on the internet](https://pythonspeed.com/articles/python-multiprocessing/), it greatly reduced the chance of this issue happening. However, after running the reproducing example about 8000 times, it still happened. So it doesn't fix the issue completely.
   - **Replace `multiprocessing.Pool` with `concurrent.futures.process.ProcessPoolExecutor`. Once this is done, the reproducing example no longer hangs even after running it tens of thousands times.**. So I put up PR #15989 which fixes the issue using this method. 
   
   From experience, `multiprocessing.Pool` is notorious for causing mysterious hangs like these. Using `ProcessPoolExecutor` does not cause the same problems. It has similar interface and uses similar underlying libraries. I don't understand exactly why it fixes the issue, but in practice it always seems to help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 closed issue #15938: celery_executor becomes stuck if child process receives signal before reset_signals is called

Posted by GitBox <gi...@apache.org>.
yuqian90 closed issue #15938:
URL: https://github.com/apache/airflow/issues/15938


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] BwL1289 commented on issue #15938: celery_executor becomes stuck if child process receives signal before reset_signals is called

Posted by GitBox <gi...@apache.org>.
BwL1289 commented on issue #15938:
URL: https://github.com/apache/airflow/issues/15938#issuecomment-849693450


   commenting to keep UTD on this issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org