You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by Daniel Standish <dp...@gmail.com> on 2020/02/23 21:56:59 UTC

random CalledSubprocessError on celery worker

Occasionally i am getting subprocess.CalledProcessError.

Here is a snippet from celery worker log:

2020-02-23 20:42:59,709: ERROR/ForkPoolWorker-7] execute_command
encountered a CalledProcessError
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py",
line 67, in execute_command
    close_fds=True, env=env)
  File "/usr/local/lib/python3.7/subprocess.py", line 363, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['airflow', 'run',
'my_dag_name', 'my_task_id', '2020-02-23T16:15:00+00:00', '--local',
'--pool', 'default_pool', '-sd', '/path/to/my_dag_name.py']' returned
non-zero exit status 1.
[2020-02-23 20:42:59,777: ERROR/ForkPoolWorker-7] None
[2020-02-23 20:42:59,937: ERROR/ForkPoolWorker-7] Task
airflow.executors.celery_executor.execute_command[751ff496-f198-40d8-bed0-c89ee5bf94a5]
raised unexpected: AirflowException('Celery command failed')
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py",
line 67, in execute_command
    close_fds=True, env=env)
  File "/usr/local/lib/python3.7/subprocess.py", line 363, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['airflow', 'run',
'my_dag_name', 'my_task_id', '2020-02-23T16:15:00+00:00', '--local',
'--pool', 'default_pool', '-sd', '/path/to/my_dag_name.py']' returned
non-zero exit status 1.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py",
line 385, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py",
line 648, in __protected_call__
    return self.run(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py",
line 72, in execute_command
    raise AirflowException('Celery command failed')
airflow.exceptions.AirflowException: Celery command failed
[2020-02-23 20:43:01,116] {cli.py:516} INFO - Running <TaskInstance:
my_dag_name.my_task_id 2020-02-23T16:15:00+00:00 [scheduled]> on host
nebular-parsec-0038-worker-75d5d64c6d-qw4kn


Could not find any more helpful info in worker log.  Nothing in task log
cus task never runs.

Anyone have any idea what might be causing this?

The dag in question is relatively large, like 100-400 tasks.

The error happens only occasionally.  When this happens (it doesn't happen
every dag run), only 1-5 tasks will error out like this.

The dag is built dynamically like so:

with dag:
    for table_name in table_name_list:
        table = get_table_config(table_name)

        to_s3 = table.to_s3_operator

        to_snowflake = table.to_snowflake_operator

        to_s3 >> to_snowflake

The function `get_table_config` uses `import_string` to load a config class
from a file.

def get_table_config(table_name) -> TableConfig:
    return import_string(  # type: ignore
        f"module.containing.{table_name}.table_config"
    )

Any ideas?

Thanks