You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "PP (JIRA)" <ji...@apache.org> on 2019/06/17 15:35:00 UTC

[jira] [Comment Edited] (AIRFLOW-4786) Task execution wails when Celery is used.

    [ https://issues.apache.org/jira/browse/AIRFLOW-4786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16865700#comment-16865700 ] 

PP edited comment on AIRFLOW-4786 at 6/17/19 3:34 PM:
------------------------------------------------------

The problem was wrong mysql client installed on worker machine. I think the error would be more descriptive in this case.


was (Author: pp_):
The problem was wrong mysql client installed on worker machine.

> Task execution wails when Celery is used.
> -----------------------------------------
>
>                 Key: AIRFLOW-4786
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4786
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery
>    Affects Versions: 1.10.3
>            Reporter: PP
>            Priority: Minor
>
> I'm using airflow 1.10.3 with LocalExecutor and everything is working properly. I want to switch to CeleryExecutor but tasks are failing (or are not executed at all).
> I've tried in two separate clusters (2 machines: airflow + worker). Both with similar configuration (redis for broker and mysql for backend). In both cluster similar exception appears (below). In the first one tasks have status null and exception appears in scheduler while in second cluster tasks are started by worker but are failing with the same exception (on worker side):
>  
> {code:java}
> Traceback (most recent call last):
>   File "/usr/local/bin/airflow", line 32, in <module>
>     args.func(args)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/utils/cli.py", line 74, in wrapper
>     return f(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 498, in run
>     _run(args, dag, ti)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 397, in _run
>     run_job.run()
>   File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 202, in run
>     self._execute()
>   File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 2598, in _execute
>     pool=self.pool):
>   File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 74, in wrapper
>     return func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1557, in _check_and_change_state_before_execution
>     session.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 927, in commit
>     self.transaction.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 467, in commit
>     self._prepare_impl()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 447, in _prepare_impl
>     self.session.flush()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2209, in flush
>     self._flush(objects)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2329, in _flush
>     transaction.rollback(_capture_exception=True)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 66, in __exit__
>     compat.reraise(exc_type, exc_value, exc_tb)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2293, in _flush
>     flush_context.execute()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", line 389, in execute
>     rec.execute(self)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", line 548, in execute
>     uow
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 177, in save_obj
>     mapper, table, update)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 760, in _emit_update_statements
>     (table.description, len(records), rows))
> sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'task_instance' expected to update 1 row(s); 0 were matched.
> [2019-06-12 16:41:31,167: ERROR/ForkPoolWorker-4] execute_command encountered a CalledProcessError
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", line 60, in execute_command
>     close_fds=True, env=env)
>   File "/usr/lib/python2.7/subprocess.py", line 186, in check_call
>     raise CalledProcessError(retcode, cmd)
> CalledProcessError: Command 'airflow run hello_world dummy_task 2019-06-12T15:05:00+00:00 --local -sd /data/airflow/dags/hello_world.py' returned non-zero exit status 1
> [2019-06-12 16:41:31,168: ERROR/ForkPoolWorker-4] None
> [2019-06-12 16:41:31,172: ERROR/ForkPoolWorker-4] Task airflow.executors.celery_executor.execute_command[dc2d3451-1d86-4097-8ded-6fd1aacd1de1] raised unexpected: AirflowException('Celery command failed',)
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 375, in trace_task
>     R = retval = fun(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 632, in __protected_call__
>     return self.run(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", line 65, in execute_command
>     raise AirflowException('Celery command failed')
> AirflowException: Celery command failed{code}
>  
> I've tested different configuration variations with different celery and airflow versions. I was able to make it work only on airflow 1.8.2.
> There is either a bug in airflow 1.10+ or some missing / new configuration that I'm not aware.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)