You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/25 01:14:50 UTC

[GitHub] [airflow] csm10495 opened a new issue #21078: DAG with multiple async tasks leads to MySQL errors... which lead to failed tasks

csm10495 opened a new issue #21078:
URL: https://github.com/apache/airflow/issues/21078


   ### Apache Airflow version
   
   2.2.2
   
   ### What happened
   
   We have a DAG that has multiple (10ish) tasks that run at the same time. We are using the celery executor all on one host. A bunch of tasks (that were supposed to run at the same time) failed... though they didn't have any logs for their runs.
   
   Upon some log digging, found that the celery command had failed:
   
   ```
   ^[[1;31m[2022-01-24 23:16:18,329: ERROR/ForkPoolWorker-32] Task airflow.executors.celery_executor.execute_command[5ef18d0b-4f6f-425e-bf4c-27604f940361] raised unexpected: AirflowException('Celery command failed on host: test-host.test-host.com')^M
   Traceback (most recent call last):^M
     File "/python/site-packages/celery/app/trace.py", line 450, in trace_task^M
       R = retval = fun(*args, **kwargs)^M
     File "/python/site-packages/celery/app/trace.py", line 731, in __protected_call__^M
       return self.run(*args, **kwargs)^M
     File "/python/site-packages/airflow/executors/celery_executor.py", line 90, in execute_command^M
       _execute_in_fork(command_to_exec, celery_task_id)^M
     File "/python/site-packages/airflow/executors/celery_executor.py", line 101, in _execute_in_fork^M
       raise AirflowException('Celery command failed on host: ' + get_hostname())^M
   airflow.exceptions.AirflowException: Celery command failed on host: test-host.test-host.com^[[0m
   ```
   
   Above that we had something like so:
   
   ```
   ^[[1;31m[2022-01-24 23:16:18,274: ERROR/ForkPoolWorker-32] Failed to execute task (_mysql_exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')^M
   [SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pi
 d AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, dag_run_1.id AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS d
   ag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash ^M
   FROM task_instance INNER JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id ^M
   WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s ^M
    LIMIT %s FOR UPDATE]^M
   [parameters: ('parent_dag_for_creation', 'the-task-name', 'manual__2022-01-24T22:31:28.381676+00:00', 1)]^M
   (Background on this error at: http://sqlalche.me/e/13/e3q8).^M
   Traceback (most recent call last):^M
     File "/python/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context^M
       cursor, statement, parameters, context^M
     File "/python/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute^M
       cursor.execute(statement, parameters)^M
     File "/python/site-packages/MySQLdb/cursors.py", line 250, in execute^M
       self.errorhandler(self, exc, value)^M
     File "/python/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler^M
       raise errorvalue^M
     File "/python/site-packages/MySQLdb/cursors.py", line 247, in execute^M
       res = self._query(query)^M
     File "/export/content/lid/apps/in-airflow/i
   001/libexec/in-airflow_c964742ba5a71c8345b94ca6cb1f81724db7d4201485540f4b5ca637adb3351e/site-packages/MySQLdb/cursors.py", line 412, in _query^M
       rowcount = self._do_query(q)^M
     File "/python/site-packages/MySQLdb/cursors.py", line 375, in _do_query^M
       db.query(q)^M
     File "/python/site-packages/MySQLdb/connections.py", line 276, in query^M
       _mysql.connection.query(self, query)^M
   _mysql_exceptions.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')^M
   ^M
   The above exception was the direct cause of the following exception:^M
   ^M
   Traceback (most recent call last):^M
     File "/python/site-packages/airflow/executors/celery_executor.py", line 121, in _execute_in_fork^M
       args.func(args)^M
     File "/python/site-packages/airflow/cli/cli_parser.py", line 48, in command^M
       return func(*args, **kwargs)^M
     File "/python/site-packages/airflow/utils/cli.py", line 92, in wrapper^M
       return f(*args, **kwargs)^M
     File "/python/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run^M
       _run_task_by_selected_method(args, dag, ti)^M
     File "/python/site-packages/airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method^M
       _run_task_by_local_tas
   k_job(args, ti)^M
     File "/python/site-packages/airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job^M
       run_job.run()^M
     File "/python/site-packages/airflow/jobs/base_job.py", line 245, in run^M
       self._execute()^M
     File "/python/site-packages/airflow/jobs/local_task_job.py", line 97, in _execute^M
       external_executor_id=self.external_executor_id,^M
     File "/python/site-packages/airflow/utils/session.py", line 70, in wrapper^M
       return func(*args, session=session, **kwargs)^M
     File "/python/site-packages/airflow/models/taskinstance.py", line 1176, in check_and_change_state_before_execution^M
       self.refresh_from_db(session=session, lock_for_update=True)^M
     File "/python/site-packages/airflow/utils/session.py", line 67, in wrapper^M
       return func(*args, **kwargs)^M
     File "/python/site-packages/airflow/models/taskinstance.py", line 729, in refresh_from_db^M
       ti: Optional[TaskInstance] = qry.with_for_update().first()^M
     File "/python/site-packages/sqlalchemy/orm/query.py", line 3429, in first^M
       ret = list(self[0:1])^M
     File "/python/site-packages/sqlalchemy/orm/query.py", line 3203, in __getitem__^M
       return list(res)^M
     File "/python/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__^M
       return self._execute_and_instances(context)^M
     File "/python/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances^M
       result = conn.execute(querycontext.statement, self._params)^M
     File "/python/site-packages/sqlalchemy/engine/base.py", line 1011, in execute^M
       return meth(self, multiparams, params)^M
     File "/python/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection^M
       return connection._execute_clauseelement(self, multiparams, params)^M
     File "/python/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement^M
       distilled_params,^M
     File "/python/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context^M
       e, statement, parameters, cursor, context^M
     File "/python/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception^M
       sqlalchemy_exception, with_traceback=exc_info[2], from_=e^M
     File "/python/site-packages/sqlalchemy/util/compat.py", line 182, in raise_^M
       raise exception^M
     File "/python/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context^M
       cursor, statement, parameters, context^M
     File "/python/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute^M
       cursor.execute(statement, parameters)^M
     File "/python/site-packages/MySQLdb/cursors.py", line 250, in execute^M
       self.errorhandler(self, exc, value)^M
     File "/python/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler^M
       raise errorvalue^M
     File "/python/site-packages/MySQLdb/cursors.py", line 247, in execute^M
       res = self._query(query)^M
     File "/python/site-packages/MySQLdb/cursors.py", line 412, in _query^M
       rowcount = self._do_query(q)^M
     File "/python/site-packages/MySQLdb/cursors.py", line 375, in _do_query^M
       db.query(q)^M
     File "/python/site-packages/MySQLdb/connections.py", line 276, in query^M
       _mysql.connection.query(self, query)^M
   sqlalchemy.exc.OperationalError: (_mysql_exce
   ptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')^M
   [SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pi
 d AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, dag_run_1.id AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run
   _1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash ^M
   FROM task_instance INNER JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id ^M
   WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s ^M
    LIMIT %s FOR UPDATE]^M
   [parameters: ('parent_dag_for_creation', 'the-task-name', 'manual__2022-01-24T22:31:28.381676+00:00', 1)]^M
   (Background on this error at: http://sqlalche.me/e/13/e3q8)^[[0m
   ```
   
   So it kind of seems like an error working with the db happened and it led to the task never actually running and getting marked as failed.
   
   ### What you expected to happen
   
   If a mysql error like this happens, it should be retried (if appropriate).
   
   At the worst case if the task doesn't start, it should be rescheduled instead of being marked as failure (with no logs).
   
   Optimally any task failure should have a reason in the webui as to why/how it was marked as failure. For this particular one we had to go into the airflow process logs to find any info.
   
   ### How to reproduce
   
   _No response_
   
   ### Operating System
   
   CentOS 7.9.2009
   
   ### Versions of Apache Airflow Providers
   
   NA
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   PyMySQL=1.0.2
   redis=1.7.1
   SQLAlchemy=1.3.24
   celery=5.1.2
   mysqlclient=1.3.13
   
   ```
   mysql> SHOW VARIABLES LIKE "%version%";
   +--------------------------+------------------------------+
   | Variable_name            | Value                        |
   +--------------------------+------------------------------+
   | admin_tls_version        | TLSv1,TLSv1.1,TLSv1.2        |
   | immediate_server_version | 999999                       |
   | innodb_version           | 8.0.21                       |
   | original_server_version  | 999999                       |
   | protocol_version         | 10                           |
   | slave_type_conversions   |                              |
   | tls_version              | TLSv1,TLSv1.1,TLSv1.2        |
   | version                  | 8.0.21                       |
   | version_comment          | MySQL Community Server - GPL |
   | version_compile_machine  | x86_64                       |
   | version_compile_os       | Linux                        |
   | version_compile_zlib     | 1.2.11                       |
   +--------------------------+------------------------------+
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] csm10495 commented on issue #21078: DAG with multiple async tasks leads to MySQL errors... which lead to failed tasks

Posted by GitBox <gi...@apache.org>.
csm10495 commented on issue #21078:
URL: https://github.com/apache/airflow/issues/21078#issuecomment-1027317538


   So there were several tasks that 
   
   ```
            /- Task1
   Start 
            \- Task2
   
            \- Task...n
   ```
   Each task was just a simple Python Operator doing something external. 
   
   This was more easy to reproduce if we set AIRFLOW__CORE__PARALLELISM and AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG  to something high like 128 and then had a dag with 200+ parallel tasks. Though we had seen it in a case with neither set (so the fell to defaults).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #21078: DAG with multiple async tasks leads to MySQL errors... which lead to failed tasks

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #21078:
URL: https://github.com/apache/airflow/issues/21078#issuecomment-1020710146


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #21078: DAG with multiple async tasks leads to MySQL errors... which lead to failed tasks

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #21078:
URL: https://github.com/apache/airflow/issues/21078#issuecomment-1027121528


   I think what might be really helpful here is description about the DAGs/operators you had - your issue indicates "async" but nothing in the description points to that,so I wonder what really this all means. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] csm10495 commented on issue #21078: DAG with multiple async tasks leads to MySQL errors... which lead to failed tasks

Posted by GitBox <gi...@apache.org>.
csm10495 commented on issue #21078:
URL: https://github.com/apache/airflow/issues/21078#issuecomment-1021644583


   So it seems like if we have the task's retries set to something > 0 it'll retry and may/may not pass later. 
   
   Should it count as a retry if the task didn't even start?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] csm10495 edited a comment on issue #21078: DAG with multiple async tasks leads to MySQL errors... which lead to failed tasks

Posted by GitBox <gi...@apache.org>.
csm10495 edited a comment on issue #21078:
URL: https://github.com/apache/airflow/issues/21078#issuecomment-1027317538


   So there were several tasks that 
   
   ```
            /- Task1
   Start 
            \- Task2
   
            \- Task...n
   ```
   Each task was just a simple Python Operator doing something external. (Or for testing it can just be a sleep)
   
   This was more easy to reproduce if we set AIRFLOW__CORE__PARALLELISM and AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG  to something high like 128 and then had a dag with 200+ parallel tasks. Though we had seen it in a case with neither set (so the fell to defaults).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #21078: DAG with multiple async tasks leads to MySQL errors... which lead to failed tasks

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #21078:
URL: https://github.com/apache/airflow/issues/21078#issuecomment-1020710146


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org