You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by Daniel Standish <dp...@gmail.com> on 2020/02/23 21:56:59 UTC
random CalledSubprocessError on celery worker
Occasionally i am getting subprocess.CalledProcessError.
Here is a snippet from celery worker log:
2020-02-23 20:42:59,709: ERROR/ForkPoolWorker-7] execute_command
encountered a CalledProcessError
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py",
line 67, in execute_command
close_fds=True, env=env)
File "/usr/local/lib/python3.7/subprocess.py", line 363, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['airflow', 'run',
'my_dag_name', 'my_task_id', '2020-02-23T16:15:00+00:00', '--local',
'--pool', 'default_pool', '-sd', '/path/to/my_dag_name.py']' returned
non-zero exit status 1.
[2020-02-23 20:42:59,777: ERROR/ForkPoolWorker-7] None
[2020-02-23 20:42:59,937: ERROR/ForkPoolWorker-7] Task
airflow.executors.celery_executor.execute_command[751ff496-f198-40d8-bed0-c89ee5bf94a5]
raised unexpected: AirflowException('Celery command failed')
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py",
line 67, in execute_command
close_fds=True, env=env)
File "/usr/local/lib/python3.7/subprocess.py", line 363, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['airflow', 'run',
'my_dag_name', 'my_task_id', '2020-02-23T16:15:00+00:00', '--local',
'--pool', 'default_pool', '-sd', '/path/to/my_dag_name.py']' returned
non-zero exit status 1.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py",
line 385, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py",
line 648, in __protected_call__
return self.run(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py",
line 72, in execute_command
raise AirflowException('Celery command failed')
airflow.exceptions.AirflowException: Celery command failed
[2020-02-23 20:43:01,116] {cli.py:516} INFO - Running <TaskInstance:
my_dag_name.my_task_id 2020-02-23T16:15:00+00:00 [scheduled]> on host
nebular-parsec-0038-worker-75d5d64c6d-qw4kn
Could not find any more helpful info in worker log. Nothing in task log
cus task never runs.
Anyone have any idea what might be causing this?
The dag in question is relatively large, like 100-400 tasks.
The error happens only occasionally. When this happens (it doesn't happen
every dag run), only 1-5 tasks will error out like this.
The dag is built dynamically like so:
with dag:
for table_name in table_name_list:
table = get_table_config(table_name)
to_s3 = table.to_s3_operator
to_snowflake = table.to_snowflake_operator
to_s3 >> to_snowflake
The function `get_table_config` uses `import_string` to load a config class
from a file.
def get_table_config(table_name) -> TableConfig:
return import_string( # type: ignore
f"module.containing.{table_name}.table_config"
)
Any ideas?
Thanks