You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "hterik (via GitHub)" <gi...@apache.org> on 2023/03/01 10:42:53 UTC

[GitHub] [airflow] hterik opened a new issue, #29833: Celery tasks stuck in queued state after worker crash (Set changed size during iteration)

hterik opened a new issue, #29833:
URL: https://github.com/apache/airflow/issues/29833

   ### Apache Airflow version
   
   2.5.1
   
   ### What happened
   
   1. Task was started and sent to celery by scheduler
   2. Celery worker picked up the task
   3. Celery worker lost database connection
   4. Celery worker crashes with error below (`RuntimeError: Set changed size during iteration`)
   5. Task is stuck in queued state for over 14 hours.
   
   Following log and screenshot shows a more recent example of the situation above, where 5) has not reached 14h yet. Though we've observed a few such situations recently. 
   
   ```
   
   [2023-03-01 09:55:27,163: INFO/MainProcess] Task airflow.executors.celery_executor.execute_command[6e43c201-b325-4538-ae94-3cf9a583f138] received
   [2023-03-01 09:55:27,364: INFO/ForkPoolWorker-1] [6e43c201-b325-4538-ae94-3cf9a583f138] Executing command in Celery: ['airflow', 'tasks', 'run', XXXXX
   [2023-03-01 09:55:28,263: INFO/ForkPoolWorker-1] Filling up the DagBag from /XXXXX
   [2023-03-01 09:55:30,265: INFO/ForkPoolWorker-1] Running <TaskInstance: XXXXX [queued]> on host worker4
   [2023-03-01 10:00:23,487: ERROR/ForkPoolWorker-1] [6e43c201-b325-4538-ae94-3cf9a583f138] Failed to execute task (psycopg2.OperationalError) connection to server at "XXXXXXXXX failed: Connection timed out
   	Is the server running on that host and accepting TCP/IP connections?
   
   (Background on this error at: https://sqlalche.me/e/14/e3q8).
   Traceback (most recent call last):
     File "sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
       return fn()
     File "sqlalchemy/pool/base.py", line 325, in connect
       return _ConnectionFairy._checkout(self)
     File "sqlalchemy/pool/base.py", line 888, in _checkout
       fairy = _ConnectionRecord.checkout(pool)
     File "sqlalchemy/pool/base.py", line 491, in checkout
       rec = pool._do_get()
     File "sqlalchemy/pool/impl.py", line 256, in _do_get
       return self._create_connection()
     File "sqlalchemy/pool/base.py", line 271, in _create_connection
       return _ConnectionRecord(self)
     File "sqlalchemy/pool/base.py", line 386, in __init__
       self.__connect()
     File "sqlalchemy/pool/base.py", line 684, in __connect
       with util.safe_reraise():
     File "sqlalchemy/util/langhelpers.py", line 70, in __exit__
       compat.raise_(
     File "sqlalchemy/util/compat.py", line 210, in raise_
       raise exception
     File "sqlalchemy/pool/base.py", line 680, in __connect
       self.dbapi_connection = connection = pool._invoke_creator(self)
     File "sqlalchemy/engine/create.py", line 578, in connect
       return dialect.connect(*cargs, **cparams)
     File "sqlalchemy/engine/default.py", line 598, in connect
       return self.dbapi.connect(*cargs, **cparams)
     File "psycopg2/__init__.py", line 122, in connect
       conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
   psycopg2.OperationalError: connection to server at "xxxxxx port 5432 failed: Connection timed out
   	Is the server running on that host and accepting TCP/IP connections?
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "airflow/executors/celery_executor.py", line 130, in _execute_in_fork
       args.func(args)
     File "airflow/cli/cli_parser.py", line 52, in command
       return func(*args, **kwargs)
     File "airflow/utils/cli.py", line 108, in wrapper
       return f(*args, **kwargs)
     File "airflow/cli/commands/task_command.py", line 396, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "airflow/cli/commands/task_command.py", line 193, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "airflow/cli/commands/task_command.py", line 252, in _run_task_by_local_task_job
       run_job.run()
     File "airflow/jobs/base_job.py", line 259, in run
       session.merge(self)
     File "sqlalchemy/orm/session.py", line 3051, in merge
       return self._merge(
     File "sqlalchemy/orm/session.py", line 3131, in _merge
       merged = self.get(
     File "sqlalchemy/orm/session.py", line 2848, in get
       return self._get_impl(
     File "sqlalchemy/orm/session.py", line 2970, in _get_impl
       return db_load_fn(
     File "sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
       session.execute(
     File "sqlalchemy/orm/session.py", line 1713, in execute
       conn = self._connection_for_bind(bind)
     File "sqlalchemy/orm/session.py", line 1552, in _connection_for_bind
       return self._transaction._connection_for_bind(
     File "sqlalchemy/orm/session.py", line 747, in _connection_for_bind
       conn = bind.connect()
     File "sqlalchemy/engine/base.py", line 3315, in connect
       return self._connection_cls(self, close_with_result=close_with_result)
     File "sqlalchemy/engine/base.py", line 96, in __init__
       else engine.raw_connection()
     File "sqlalchemy/engine/base.py", line 3394, in raw_connection
       return self._wrap_pool_connect(self.pool.connect, _connection)
     File "sqlalchemy/engine/base.py", line 3364, in _wrap_pool_connect
       Connection._handle_dbapi_exception_noconnection(
     File "sqlalchemy/engine/base.py", line 2198, in _handle_dbapi_exception_noconnection
       util.raise_(
     File "sqlalchemy/util/compat.py", line 210, in raise_
       raise exception
     File "sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
       return fn()
     File "sqlalchemy/pool/base.py", line 325, in connect
       return _ConnectionFairy._checkout(self)
     File "sqlalchemy/pool/base.py", line 888, in _checkout
       fairy = _ConnectionRecord.checkout(pool)
     File "sqlalchemy/pool/base.py", line 491, in checkout
       rec = pool._do_get()
     File "sqlalchemy/pool/impl.py", line 256, in _do_get
       return self._create_connection()
     File "sqlalchemy/pool/base.py", line 271, in _create_connection
       return _ConnectionRecord(self)
     File "sqlalchemy/pool/base.py", line 386, in __init__
       self.__connect()
     File "sqlalchemy/pool/base.py", line 684, in __connect
       with util.safe_reraise():
     File "sqlalchemy/util/langhelpers.py", line 70, in __exit__
       compat.raise_(
     File "sqlalchemy/util/compat.py", line 210, in raise_
       raise exception
     File "sqlalchemy/pool/base.py", line 680, in __connect
       self.dbapi_connection = connection = pool._invoke_creator(self)
     File "sqlalchemy/engine/create.py", line 578, in connect
       return dialect.connect(*cargs, **cparams)
     File "sqlalchemy/engine/default.py", line 598, in connect
       return self.dbapi.connect(*cargs, **cparams)
     File "psycopg2/__init__.py", line 122, in connect
       conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
   sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "XXXX, port 5432 failed: Connection timed out
   	Is the server running on that host and accepting TCP/IP connections?
   
   (Background on this error at: https://sqlalche.me/e/14/e3q8)
   [2023-03-01 10:00:23,705: ERROR/ForkPoolWorker-1] Task airflow.executors.celery_executor.execute_command[6e43c201-b325-4538-ae94-3cf9a583f138] raised unexpected: AirflowException('Celery command failed on host: worker4 with celery_task_id 6e43c201-b325-4538-ae94-3cf9a583f138')
   Traceback (most recent call last):
     File "celery/app/trace.py", line 451, in trace_task
       R = retval = fun(*args, **kwargs)
     File "celery/app/trace.py", line 734, in __protected_call__
       return self.run(*args, **kwargs)
     File "airflow/executors/celery_executor.py", line 96, in execute_command
       _execute_in_fork(command_to_exec, celery_task_id)
     File "airflow/executors/celery_executor.py", line 111, in _execute_in_fork
       raise AirflowException(msg)
   airflow.exceptions.AirflowException: Celery command failed on host: worker4 with celery_task_id 6e43c201-b325-4538-ae94-3cf9a583f138
   [2023-03-01 10:00:24,109: CRITICAL/MainProcess] Unrecoverable error: RuntimeError('Set changed size during iteration')
   Traceback (most recent call last):
     File "celery/worker/worker.py", line 203, in start
       self.blueprint.start(self)
     File "celery/bootsteps.py", line 116, in start
       step.start(parent)
     File "celery/bootsteps.py", line 365, in start
       return self.obj.start()
     File "celery/worker/consumer/consumer.py", line 332, in start
       blueprint.start(self)
     File "celery/bootsteps.py", line 116, in start
       step.start(parent)
     File "celery/worker/consumer/consumer.py", line 628, in start
       c.loop(*c.loop_args())
     File "celery/worker/loops.py", line 97, in asynloop
       next(loop)
     File "kombu/asynchronous/hub.py", line 294, in create_loop
       for tick_callback in on_tick:
   RuntimeError: Set changed size during iteration
   [2023-03-01 10:00:25 +0000] [14] [INFO] Handling signal: term
   [2023-03-01 10:00:25 +0000] [15] [INFO] Worker exiting (pid: 15)
   [2023-03-01 10:00:25 +0000] [16] [INFO] Worker exiting (pid: 16)
   [2023-03-01 10:00:25 +0000] [14] [INFO] Shutting down: Master
   
   ```
   
   ![image](https://user-images.githubusercontent.com/89977373/222112114-eee187a9-1e80-4f7a-9470-9538d0212dfb.png)
   
   
   ### What you think should happen instead
   
   A. Celery worker should reconnect to the database in case of intermittent network errors
   B. In case of unrecoverable errors, scheduler should eventually retry or fail the task.
   
   ### How to reproduce
   
   Difficult, happens intermittently.
   
   ### Operating System
   
   Ubuntu 22.04
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow==2.5.0
   apache-airflow-providers-celery==3.1.0
   redis==3.5.3
   celery==5.2.7
   kombu==5.2.4
   
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   airflow.cfg, celery options
   ```
   [celery]
   worker_concurrency = 1
   worker_prefetch_multiplier = 1
   worker_autoscale = 1,1
   celery_config_options = ...see below
   
   [celery_broker_transport_options]
   socket_connect = 240
   socket_keepalive = True
   socket_connect_timeout = 240
   retry_on_timeout = True
   ```
   ```
   celery_config_options = {
       **airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG,
       "result_backend_always_retry": True,
       "result_backend_max_retries": 20,
       "redis_socket_keepalive": True,
       "redis_retry_on_timeout": True,
       "redis_socket_connect_timeout": 240,
       "worker_deduplicate_successful_tasks": True,
   }``
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hterik commented on issue #29833: Celery tasks stuck in queued state after worker crash (Set changed size during iteration)

Posted by "hterik (via GitHub)" <gi...@apache.org>.
hterik commented on issue #29833:
URL: https://github.com/apache/airflow/issues/29833#issuecomment-1453440071

   @hussein-awala here is the db extract  
   Note it's a different instance from the original log, but this also produces same type of failing log on the worker (set changed size during iteration)
   
   ```
   task_id     |dag_id                |run_id                              |start_date|end_date|duration|state |try_number|hostname|unixname|job_id|pool        |queue           |priority_weight|operator                 |queued_dttm                  |pid|max_tries|executor_config|pool_slots|queued_by_job_id|external_executor_id                |trigger_id|trigger_timeout|next_method|next_kwargs|map_index|updated_at                   |
   ------------+----------------------+------------------------------------+----------+--------+--------+------+----------+--------+--------+------+------------+----------------+---------------+-------------------------+-----------------------------+---+---------+---------------+----------+----------------+------------------------------------+----------+---------------+-----------+-----------+---------+-----------------------------+
   test_device0|xxxxxxxx              |scheduled__2023-03-02T22:00:00+00:00|          |        |        |queued|         0|        |airflow |      |default_pool|yyyyy           |              1|PythonOperatorInWorkspace|2023-03-03 00:00:01.210 +0100|   |        0|  } .          |         1|          196211|5a2f3e09-5a73-46c7-a6a6-4f3c09431aae|          |               |           |           |       -1|2023-03-03 00:00:02.394 +0100|
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #29833: Celery tasks stuck in queued state after worker crash (Set changed size during iteration)

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on issue #29833:
URL: https://github.com/apache/airflow/issues/29833#issuecomment-1455194597

   > I'm not sure if we check for workers heartbeats, where I didn't find this check in the method `_process_executor_events`, @potiuk can you please confirm this or send a link to the part which checks if the worker is still alive?
   
   I am not THAT knowledgeable about this part, so take it with a grain of salt, so let me explain how I understand what's going on. @ephraimbuddy @ashb - maybe you can take a look anc confirm if my understanding is wrong?
   
   Everything related to managing celery task state happens in the Celery Executor. 
   I don't think we are monitoring workers in any way. Each executors monitors tasks for their state and either see if they have been stalled or whether they need adoption (when they were monitored in another executor).
   
   Eventually - if the task does not update its state (when for example worker crashed, then it should be rescheduled as stalled (by own executor) or adopted (by another one). That's how much details I know from the top of my head. 
   
   There are a few race conditions that might occur (no distributed system is ever fool proof) and I think the original design is that eventually even if a very nasty race condition happens, the tasks will eventually be rescheduled. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on issue #29833: Celery tasks stuck in queued state after worker crash (Set changed size during iteration)

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on issue #29833:
URL: https://github.com/apache/airflow/issues/29833#issuecomment-1455158100

   I think this can happen when the Airflow worker lost completely the access to the Metadata, where in this case it will not be able to update the task state, and if we use it as a result backend for Celery commands, then the Celery workers will not be able to send events to the DB which the scheduler processes to re-queue the failed tasks.
   
   I'm not sure if we check for workers heartbeats, where I didn't find this check in the method `_process_executor_events`, @potiuk can you please confirm this or send a link to the part which checks if the worker is still alive?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Celery tasks stuck in queued state after worker crash (Set changed size during iteration) [airflow]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #29833:
URL: https://github.com/apache/airflow/issues/29833#issuecomment-2055719148

   This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on issue #29833: Celery tasks stuck in queued state after worker crash (Set changed size during iteration)

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on issue #29833:
URL: https://github.com/apache/airflow/issues/29833#issuecomment-1452785595

   Can you extract the ti state from the DB for a task stuck in queued state?
   ```sql
   SELECT *
   FROM task_instance
   WHERE dag_id='<dag id>' AND task_id='<task id>' AND execution_date='<execution date>'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hterik commented on issue #29833: Celery tasks stuck in queued state after worker crash (Set changed size during iteration)

Posted by "hterik (via GitHub)" <gi...@apache.org>.
hterik commented on issue #29833:
URL: https://github.com/apache/airflow/issues/29833#issuecomment-1451351005

   After digging more i found following possibly related issues:
   https://github.com/apache/airflow/discussions/28022
   https://github.com/apache/airflow/issues/28120
   https://github.com/apache/airflow/pull/23690


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org