You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/03/10 19:00:40 UTC

[GitHub] [airflow] SamWheating opened a new issue #22160: Directly submitting task to executor doesn't set `queued_dttm` which can cause Scheduler crashes

SamWheating opened a new issue #22160:
URL: https://github.com/apache/airflow/issues/22160


   ### Apache Airflow version
   
   2.2.2
   
   ### What happened
   
   We had an entire Airflow environment crash recently due to a very rare series of events.
   
   1) A user manually submitted a task to the executor using the `run` button with `ignore_all_deps`
   2) While this task was running, the scheduler crashed
   3) A new scheduler attempted to adopt this task and entered a crash loop with the following stack trace:
   
   ```
   [2022-03-10 17:10:20,386] {scheduler_job.py:655} INFO - Exited execute loop
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/usr/local/lib/python3.9/site-packages/airflow/__main__.py", line 48, in main
       args.func(args)
     File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
       return f(*args, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
       _run_scheduler_job(args=args)
     File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
       job.run()
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 245, in run
       self._execute()
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 628, in _execute
       self._run_scheduler_loop()
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 681, in _run_scheduler_loop
       self.adopt_or_reset_orphaned_tasks()
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1117, in adopt_or_reset_orphaned_tasks
       for attempt in run_with_db_retries(logger=self.log):
     File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 382, in __iter__
       do = self.iter(retry_state=retry_state)
     File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 349, in iter
       return fut.result()
     File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in result
       return self.__get_result()
     File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
       raise self._exception
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1162, in adopt_or_reset_orphaned_tasks
       to_reset = self.executor.try_adopt_task_instances(tis_to_reset_or_adopt)
     File "/usr/local/lib/python3.9/site-packages/airflow/executors/celery_executor.py", line 485, in try_adopt_task_instances
       self.adopted_task_timeouts[ti.key] = ti.queued_dttm + self.task_adoption_timeout
   TypeError: unsupported operand type(s) for +: 'NoneType' and 'datetime.timedelta
   ```
   
   It turns out that submitting a task directly to the executor can create TaskInstances in the `Running` state with `queued_dttm=None`, which then crashes during the task adoption process. 
   
   Here's the place where the unhandled exception originates:
   https://github.com/apache/airflow/blob/9e6769206e124b65d31028a3b7b9047d51fd0be5/airflow/executors/celery_executor.py#L546
   
   And here's where the task is submitted to the executor without updating the `queued_dttm`:
   https://github.com/apache/airflow/blob/3b9ae4211b379f0ddee4ba3034c9b8e8c2f10707/airflow/www/views.py#L1738-L1743
   
   ### What you expected to happen
   
   A new executor should be able to adopt running task instances, regardless of how they are started. 
   
   ### How to reproduce
   
   1) Run the following DAG while using the CeleryExecutor:
   
   ```python
   from datetime import timedelta
   from airflow.models import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow import utils
   
   dag = DAG(
       'parallel-dag-example',
       start_date=utils.dates.days_ago(1),
       max_active_runs=1,
       dagrun_timeout=timedelta(minutes=60),
       schedule_interval=None,
       max_active_tasks=2,
       concurrency=1
   )
   
   for i in range(5):
       leaf = BashOperator(
           task_id=f'task_{i}',
           bash_command='sleep 300',
           retries=0,
           dag=dag,
       )
   ```
   
   2) Submit one of the `scheduled` tasks to the executor via the UI (you may need to `ignore_all_deps`)
   3) Restart the scheduler(s)
   
   This will cause a crashloop of the schedulers until the task completes.
   
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   We're using the Celery Executor
   
   ### Anything else
   
   I am currently evaluating a few possible solutions, among them:
   
   1) Updating the line in the celery executor to be:
   ```python
   self.adopted_task_timeouts[ti.key] = (ti.queued_dttm or ti.start_date) + self.task_adoption_timeout 
   ```
   
   2) Only setting the `adopted_task_timeout` if the task is in the pending state (since adoption timeouts are irrelevant for running tasks anyways):
   ```python
   if state == celery_states.PENDING:
       self.adopted_task_timeouts[ti.key] = (ti.queued_dttm or ti.start_date) + self.task_adoption_timeout 
   ```
   
   3) Updating the taskinstance `queued_dttm` from the webserver when the task is submitted to the queue (which feels like the most appropriate fix as it actually fixes the root cause of the issue).
   
   Let me know if you have any strong opinions on any of these proposed fixes. 
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on issue #22160: Directly submitting task to executor doesn't set `queued_dttm` which can cause scheduler crashloop

Posted by GitBox <gi...@apache.org>.
SamWheating commented on issue #22160:
URL: https://github.com/apache/airflow/issues/22160#issuecomment-1067029312


   Appreciate the feedback!
   
   I have opened a PR for the second proposed solution, but I'll close that up and open one to actually fix the task submission. 
   
   Feel free to assign this one to me, I'll get a PR open today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating closed issue #22160: Directly submitting task to executor doesn't set `queued_dttm` which can cause scheduler crashloop

Posted by GitBox <gi...@apache.org>.
SamWheating closed issue #22160:
URL: https://github.com/apache/airflow/issues/22160


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on issue #22160: Directly submitting task to executor doesn't set `queued_dttm` which can cause scheduler crashloop

Posted by GitBox <gi...@apache.org>.
SamWheating commented on issue #22160:
URL: https://github.com/apache/airflow/issues/22160#issuecomment-1068533311


   Closed by https://github.com/apache/airflow/pull/22259


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on issue #22160: Directly submitting task to executor doesn't set `queued_dttm` which can cause scheduler crashloop

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #22160:
URL: https://github.com/apache/airflow/issues/22160#issuecomment-1065495181


   From first glance your solution 3 does seem to be the most appropriate to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #22160: Directly submitting task to executor doesn't set `queued_dttm` which can cause scheduler crashloop

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #22160:
URL: https://github.com/apache/airflow/issues/22160#issuecomment-1066234187


   Agree. This is a valid case and solution 3. seems to be best.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org