You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Piotr Pekala (JIRA)" <ji...@apache.org> on 2019/06/12 16:59:00 UTC

[jira] [Created] (AIRFLOW-4786) Task execution wails when Celery is used.

Piotr Pekala created AIRFLOW-4786:
-------------------------------------

             Summary: Task execution wails when Celery is used.
                 Key: AIRFLOW-4786
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4786
             Project: Apache Airflow
          Issue Type: Bug
          Components: celery
    Affects Versions: 1.10.3
            Reporter: Piotr Pekala


I'm using airflow 1.10.3 with LocalExecutor and everything is working properly. I want to switch to CeleryExecutor but tasks are failing (or are not executed at all).

I've tried in two separate clusters (2 machines: airflow + worker). Both with similar configuration (redis for broker and mysql for backend). In both cluster similar exception appears (below). In the first one tasks have status null and exception appears in scheduler while in second cluster tasks are started by worker but are failing with the same exception (on worker side):

 
{code:java}
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 32, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/dist-packages/airflow/utils/cli.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 498, in run
    _run(args, dag, ti)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 397, in _run
    run_job.run()
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 202, in run
    self._execute()
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 2598, in _execute
    pool=self.pool):
  File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1557, in _check_and_change_state_before_execution
    session.commit()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 927, in commit
    self.transaction.commit()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 467, in commit
    self._prepare_impl()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 447, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2209, in flush
    self._flush(objects)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2329, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 66, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2293, in _flush
    flush_context.execute()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", line 389, in execute
    rec.execute(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", line 548, in execute
    uow
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 177, in save_obj
    mapper, table, update)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 760, in _emit_update_statements
    (table.description, len(records), rows))
sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'task_instance' expected to update 1 row(s); 0 were matched.
[2019-06-12 16:41:31,167: ERROR/ForkPoolWorker-4] execute_command encountered a CalledProcessError
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", line 60, in execute_command
    close_fds=True, env=env)
  File "/usr/lib/python2.7/subprocess.py", line 186, in check_call
    raise CalledProcessError(retcode, cmd)
CalledProcessError: Command 'airflow run hello_world dummy_task 2019-06-12T15:05:00+00:00 --local -sd /data/airflow/dags/hello_world.py' returned non-zero exit status 1
[2019-06-12 16:41:31,168: ERROR/ForkPoolWorker-4] None
[2019-06-12 16:41:31,172: ERROR/ForkPoolWorker-4] Task airflow.executors.celery_executor.execute_command[dc2d3451-1d86-4097-8ded-6fd1aacd1de1] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 375, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 632, in __protected_call__
    return self.run(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", line 65, in execute_command
    raise AirflowException('Celery command failed')
AirflowException: Celery command failed{code}
 

I've tested different configuration variations with different celery and airflow versions. I was able to make it work only on airflow 1.8.2.

There is either a bug in airflow 1.10+ or some missing / new configuration that I'm not aware.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)