You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Qian Yu (Jira)" <ji...@apache.org> on 2020/01/10 04:30:00 UTC

[jira] [Commented] (AIRFLOW-6527) Error sending Celery task:Timeout in send_task_to_executor

    [ https://issues.apache.org/jira/browse/AIRFLOW-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17012447#comment-17012447 ] 

Qian Yu commented on AIRFLOW-6527:
----------------------------------

Tagging some people who might know:
[~kamil.bregula][~potiuk][~kaxilnaik][~yrqls21]

> Error sending Celery task:Timeout in send_task_to_executor
> ----------------------------------------------------------
>
>                 Key: AIRFLOW-6527
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6527
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.10.7
>            Reporter: Qian Yu
>            Priority: Major
>
> We use Airflow with CeleryExecutor and redis broker. Our airflow scheduler often encounters this \{{AirflowTaskTimeout}} error. 
> - This happens in \{{send_task_to_executor()}}. 
> - It only happens occasionally. 
> - Retrying the failed task a few times always works.
> - This affects at least 1.10.6 and 1.10.7 and possibly other versions too. 
> Possible cause:
> Our airflow venv and dags_folder are on an NFS mount because we want to keep the various pieces of Airflow services in sync. 
> The NFS mount can be slow sometimes. This causes the import to be slow and causes \{{send_task_to_executor()}} to take more than 2 seconds.
> Other people with similar looking problems:
> The following issue is now closed. It's not clear to me whether or how the user resolved this issue.
> https://github.com/bitnami/bitnami-docker-airflow-scheduler/issues/1
> Another user asked a question in the mailing list. It's not answered.
> https://www.mail-archive.com/dev@airflow.apache.org/msg01093.html
> Proposed workaround:
> - Make this `timeout(seconds=2)` configurable. E.g adding a [celery]send_task_timeout to airflow.cfg. Since 2 seconds seems too short, we can configure it to something like 15 seconds to make it much less likely to happen.
> - Move airflow venv to the local disk. This makes it inconvenient to sync airflow installation across multiple hosts though.
> {code}
> Jan 09 22:46:59 scheduler_host airflow[18882]: [2020-01-09 22:46:59,763] \{celery_executor.py:224} ERROR - Error sending Celery task:Timeout, PID: 27724
> Jan 09 22:46:59 scheduler_host airflow[18882]: Celery Task ID: ('example_daily', 'example_sensor1', datetime.datetime(2020, 1, 9, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1)
> Jan 09 22:46:59 scheduler_host airflow[18882]: Traceback (most recent call last):
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
> Jan 09 22:46:59 scheduler_host airflow[18882]: return obj.__dict__[self.__name__]
> Jan 09 22:46:59 scheduler_host airflow[18882]: KeyError: 'amqp'
> Jan 09 22:46:59 scheduler_host airflow[18882]: During handling of the above exception, another exception occurred:
> Jan 09 22:46:59 scheduler_host airflow[18882]: Traceback (most recent call last):
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 118, in send_task_to_executor
> Jan 09 22:46:59 scheduler_host airflow[18882]: result = task.apply_async(args=[command], queue=queue)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/task.py", line 570, in apply_async
> Jan 09 22:46:59 scheduler_host airflow[18882]: **options
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/base.py", line 712, in send_task
> Jan 09 22:46:59 scheduler_host airflow[18882]: amqp = self.amqp
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__
> Jan 09 22:46:59 scheduler_host airflow[18882]: value = obj.__dict__[self.__name__] = self.__get(obj)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/base.py", line 1202, in amqp
> Jan 09 22:46:59 scheduler_host airflow[18882]: return instantiate(self.amqp_cls, app=self)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/utils/imports.py", line 55, in instantiate
> Jan 09 22:46:59 scheduler_host airflow[18882]: return symbol_by_name(name)(*args, **kwargs)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/imports.py", line 57, in symbol_by_name
> Jan 09 22:46:59 scheduler_host airflow[18882]: module = imp(module_name, package=package, **kwargs)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/usr/lib/python3.6/importlib/__init__.py", line 126, in import_module
> Jan 09 22:46:59 scheduler_host airflow[18882]: return _bootstrap._gcd_import(name[level:], package, level)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>", line 994, in _gcd_import
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>", line 971, in _find_and_load
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>", line 955, in _find_and_load_unlocked
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap_external>", line 678, in exec_module
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/amqp.py", line 23, in <module>
> Jan 09 22:46:59 scheduler_host airflow[18882]: from . import routes as _routes
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>", line 971, in _find_and_load
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>", line 951, in _find_and_load_unlocked
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap>", line 894, in _find_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap_external>", line 1157, in find_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap_external>", line 1129, in _get_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap_external>", line 1271, in find_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap_external>", line 96, in _path_isfile
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap_external>", line 88, in _path_is_mode_type
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen importlib._bootstrap_external>", line 82, in _path_stat
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/airflow/utils/timeout.py", line 43, in handle_timeout
> Jan 09 22:46:59 scheduler_host airflow[18882]: raise AirflowTaskTimeout(self.error_message)
> Jan 09 22:46:59 scheduler_host airflow[18882]: airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 27724
> Jan 09 22:46:59 scheduler_host airflow[18882]: [2020-01-09 22:46:59,764] \{celery_executor.py:224} ERROR - Error sending Celery task:Timeout, PID: 27725
> {code}
> This is the code that causes this. The timeout(seconds=2) is hardcoded:
> {code:python}
> def send_task_to_executor(task_tuple):
>     key, simple_ti, command, queue, task = task_tuple
>     try:
>         with timeout(seconds=2):
>             result = task.apply_async(args=[command], queue=queue)
>     except Exception as e:
>         exception_traceback = "Celery Task ID: {}\n{}".format(key,
>                                                               traceback.format_exc())
>         result = ExceptionWithTraceback(e, exception_traceback)
>     return key, command, result
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)