You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/26 18:02:29 UTC

[GitHub] [airflow] ejstembler opened a new issue, #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

ejstembler opened a new issue, #27300:
URL: https://github.com/apache/airflow/issues/27300

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Airflow version: `v2.3.3+astro.2`.
   
   We've encounter this issue twice this year.  Something causes the Scheduler to get stuck in an endless loop, yet it shows as healthy even though nothing is being processed.
   
   The last time we encounter this issue, this week. The Scheduler encountered a database update error:
   
   ```
   sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'dag' expected to update 1 row(s); 0 were matched.
   ```
   
   As a result, the Schedule logs should it's stuck in an endless loop, the same messages are repeating over-and-over.
   
   ![Screen_Shot_2022-10-24_at_10_25_21_AM](https://user-images.githubusercontent.com/45985338/198082454-7afce5e9-81c6-4f0a-9509-f99d591ede3e.png)
   
   Because of this, nothing runs, and the entire Airflow instance is considered down.
   
   In this particular case, the issue was resolved by manually deleting the duplicate row in the `dag` table.
   
   When we encounter a similar case earlier in the year, the root cause was different and required a different solution. (Upsizing workers).
   
   ### What you think should happen instead
   
   The Scheduler should not crash or get stuck in an endless loop.  It should handle exceptional cases gracefully. It should not be reported as healthy if it is crashing continuously or stuck in an endless loop.
   
   Some strategies for handling this, off the top of my head:
   
   * The Scheduler should have stricter error handling and when an error is encountered, it should log the error, and continue on to the next scheduled DAG.
   * The Scheduler itself should not be allowed to get into an endless loop.
     * Check the logs for repeating message patterns?
     * Keep a count to make sure DAGs are being run?
     * Use logarithmic or exponential backoff when retrying?
   
   
   ### How to reproduce
   
   Enter a duplicate row in the `dags` table.  There are probably other ways.  Earlier in the year we encounter this same issues when Workers were not properly upsized.
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   [apache-airflow-providers-http](https://pypi.python.org/pypi/apache-airflow-providers-http)==2.0.1
   [apache-airflow-providers-jdbc](https://pypi.python.org/pypi/apache-airflow-providers-jdbc)==2.0.1
   [simple-salesforce](https://pypi.python.org/pypi/simple-salesforce)==1.1.0
   [csvvalidator](https://pypi.python.org/pypi/csvvalidator)==1.2
   [pandas](https://pypi.python.org/pypi/pandas)==1.3.5
   [pre-commit](https://pypi.python.org/pypi/pre-commit)
   [pylint](https://pypi.python.org/pypi/pylint)==2.15
   [pytest](https://pypi.python.org/pypi/pytest)==6.2.5
   [pyspark](https://pypi.python.org/pypi/pyspark)==3.3.0
   [apache-airflow-providers-google](https://pypi.python.org/pypi/apache-airflow-providers-google)==6.4.0
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   Astronomer
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] lihan commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by GitBox <gi...@apache.org>.
lihan commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1292776762

   Same error message was observed in https://github.com/apache/airflow/issues/27259


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by "ephraimbuddy (via GitHub)" <gi...@apache.org>.
ephraimbuddy commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1430950096

   Do you still experience the StaleDataError in 2.5.1. Should have been fixed by #28689 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ejstembler commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by GitBox <gi...@apache.org>.
ejstembler commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1293542042

   ```
   [2022-10-24 22:11:55,940] {scheduler_job.py:768} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 751, in _execute
       self._run_scheduler_loop()
     File "/usr/local/lib/python3.9/site-packages/astronomer/airflow/version_check/plugin.py", line 29, in run_before
       fn(*args, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 839, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 911, in _do_scheduling
       self._create_dagruns_for_dags(guard, session)
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/retries.py", line 76, in wrapped_function
       for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
     File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 382, in __iter__
       do = self.iter(retry_state=retry_state)
     File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 349, in iter
       return fut.result()
     File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in result
       return self.__get_result()
     File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
       raise self._exception
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/retries.py", line 85, in wrapped_function
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 979, in _create_dagruns_for_dags
       self._create_dag_runs(query.all(), session)
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1029, in _create_dag_runs
       dag.create_dagrun(
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/airflow/models/dag.py", line 2384, in create_dagrun
       session.flush()
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3255, in flush
       self._flush(objects)
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3395, in _flush
       transaction.rollback(_capture_exception=True)
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
       compat.raise_(
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3355, in _flush
       flush_context.execute()
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 453, in execute
       rec.execute(self)
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 627, in execute
       util.preloaded.orm_persistence.save_obj(
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 234, in save_obj
       _emit_update_statements(
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 1032, in _emit_update_statements
       raise orm_exc.StaleDataError(
   sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'dag' expected to update 1 row(s); 0 were matched.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] radiant0619 commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by "radiant0619 (via GitHub)" <gi...@apache.org>.
radiant0619 commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1401369438

   I am also facing similar issue in Airlfow 2.5.0. it is happening inconsistently while A dag having Dynamic Task creation . I am using Mysql5.7 as backend.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ejstembler commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by GitBox <gi...@apache.org>.
ejstembler commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1293802439

   Incidentally, two Astronomer engineers familiar with the issue: @alex-astronomer and @wolfier 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1435491941

   @tongtie Could you provide a bit more details about your DB Backend? 
   Your Airflow deployments setup: Executor, number of schedulers, more details better.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1292705734

   It looks like some weird race condition @ashb @ephraimbuddy @uranusjr - we might want to take close look at potential culrprit (I saw similar issues recently).
   
   @ejstembler  - can you please provider more detailed stacktrace from the issue you saw - just one line is not nearly enough to diagnose it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1362586409

   @ejstembler @lihan - which database are you using ?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mrn-aglic commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by "mrn-aglic (via GitHub)" <gi...@apache.org>.
mrn-aglic commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1701442870

   I think I'm experiencing this same issue on Airflow 2.6.1 when using dynamic task mapping over task groups and using backfill to update a month of data.
   I'm using Airflow 2.6.1 (with astro), postgres:
   `psql            | psql (PostgreSQL) 13.11 (Debian 13.11-0+deb11u1)`
   and python 3.10.
   
   I have an operator that generates a list of dates (dicts), and passes those to the task group (insert_new_daily_data). 
   Each dict has 3 keys: start_date, end_date and current_date.
   
   I use:
   `insert_new_daily_data.expand_kwargs(kwargs=get_data_ranges.output)`
   
   The definition of the task group:
   ```
   @task_group(group_id="insert_new_daily_data")
       def insert_new_daily_data(start_date: str, end_date: str, current_date: str):
   
   ```
   
   
   And inside the task group I have 2 BigQueryJobOperators:
   `cleanup_data >> insert_data`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tongtie commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by "tongtie (via GitHub)" <gi...@apache.org>.
tongtie commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1435454065

   > Do you still experience the StaleDataError in 2.5.1. Should have been fixed by #28689
   
   when upgraded 2.5.1, I am still experience the StaleDataError.
   
   ![image](https://user-images.githubusercontent.com/2959046/219826622-1590924b-d39b-43de-b8c8-6660955d3bcb.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1401960184

   > I am also facing similar issue in Airlfow 2.5.0. it is happening inconsistently while A dag having Dynamic Task creation . I am using Mysql5.7 as backend.
   
   Can you please provide all the information, and logs and description of your circumstances? Eveyrything you can find that could help to diagnose it? I am afraid announcing "I have the same problem" without adding the details does not bring us any closer to diagnosing the problem. On the other hand if you can spend a little time on trying to provide some evidences, it might actually help those who might be able to solve your problem @radiant0619 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] liuning89757 commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by "liuning89757 (via GitHub)" <gi...@apache.org>.
liuning89757 commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1413361554

   Hi @potiuk @ashb @ephraimbuddy ,  hopes my findings may offer some clues about this
   I have met similar issue, almost each time when a dynamic task expand to 30+ indexes. (Airflow 2.4.3 and MariaDB 10.3.27)
   The traceback:
   ```
   Traceback (most recent call last):
     File "/usr/local/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 126, in _execute_work_in_fork
       args.func(args)
     File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 52, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line 103, in wrapper
       return f(*args, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 382, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 189, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 247, in _run_task_by_local_task_job
       run_job.run()
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 247, in run
       self._execute()
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 132, in _execute
       self.handle_task_exit(return_code)
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 163, in handle_task_exit
       self.task_instance.schedule_downstream_tasks()
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 75, in wrapper
       return func(*args, session=session, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2611, in schedule_downstream_tasks
       info = dag_run.task_instance_scheduling_decisions(session)
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", line 696, in task_instance_scheduling_decisions
       schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
     File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", line 755, in _get_ready_tis
       expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
     File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 719, in expand_mapped_task
       session.flush()
     File "/usr/local/lib64/python3.9/site-packages/sqlalchemy/orm/session.py", line 3444, in flush
       self._flush(objects)
     File "/usr/local/lib64/python3.9/site-packages/sqlalchemy/orm/session.py", line 3584, in _flush
       transaction.rollback(_capture_exception=True)
     File "/usr/local/lib64/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
       compat.raise_(
     File "/usr/local/lib64/python3.9/site-packages/sqlalchemy/util/compat.py", line 210, in raise_
       raise exception
     File "/usr/local/lib64/python3.9/site-packages/sqlalchemy/orm/session.py", line 3544, in _flush
       flush_context.execute()
     File "/usr/local/lib64/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
       rec.execute(self)
     File "/usr/local/lib64/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
       util.preloaded.orm_persistence.save_obj(
     File "/usr/local/lib64/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
       _emit_update_statements(
     File "/usr/local/lib64/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 1035, in _emit_update_statements
       raise orm_exc.StaleDataError(
   sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'task_instance' expected to update 1 row(s); 0 were matched.
   ```
   I turn on general log to capture the SQL when exception happens, found that there are two session (537138 and 535083) execute the same statement `UPDATE task_instance SET map_index=0 ... where ... AND task_instance.map_index = -1`, 537138 commit but 535083 rollback then raise the StaleDataError.
   ```
   ...
   537138 Query	UPDATE task_instance SET state='removed' WHERE task_instance.dag_id = 'new_monitor_server' AND task_instance.task_id = 'update_server' AND task_instance.run_id = 'scheduled__2023-02-01T08:35:00+00:00' AND task_instance.map_index >= 37
   		537138 Query	UPDATE task_instance SET map_index=0 WHERE task_instance.dag_id = 'new_monitor_server' AND task_instance.task_id = 'update_server' AND task_instance.run_id = 'scheduled__2023-02-01T08:35:00+00:00' AND task_instance.map_index = -1
   		537138 Query	INSERT INTO task_instance (task_id, dag_id, run_id, map_index, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, pool_slots, queue, priority_weight, operator, queued_dttm, queued_by_job_id, pid, executor_config, external_executor_id, trigger_id, trigger_timeout, next_method, next_kwargs) VALUES ('update_server', 'new_monitor_server', 'scheduled__2023-02-01T08:35:00+00:00', 1, NULL, NULL, NULL, NULL, 0, 0, '', 'root', NULL, 'default_pool', 1, 'default', 1, '_PythonDecoratedOperator', NULL, NULL, NULL, '€}”.', NULL, NULL, NULL, NULL, 'null'),('update_server', 'new_monitor_server', 'scheduled__2023-02-01T08:35:00+00:00', 2, NULL, NULL, NULL, NULL, 0, 0, '', 'root', NULL, 'default_pool', 1, 'default', 1, '_PythonDecoratedOperator', NULL, NULL, NULL, '€}”.', NULL, NULL, NULL, NULL, 'null'),('update_server', 'new_monitor_server', 'scheduled__2023-02-01T08:35:00+00:00', 3, NULL, NULL, NULL, NULL, 0, 0, '', 'root', NUL
 L, 'default_pool', 1, 'default', 1, 
   ......
   535083 Query	UPDATE task_instance SET state='removed' WHERE task_instance.dag_id = 'new_monitor_server' AND task_instance.task_id = 'update_server' AND task_instance.run_id = 'scheduled__2023-02-01T08:35:00+00:00' AND task_instance.map_index >= 37
   		535083 Query	UPDATE dag_run SET last_scheduling_decision='2023-02-01 08:40:01.318630' WHERE dag_run.id = 4320
   		535083 Query	UPDATE task_instance SET map_index=0 WHERE task_instance.dag_id = 'new_monitor_server' AND task_instance.task_id = 'update_server' AND task_instance.run_id = 'scheduled__2023-02-01T08:35:00+00:00' AND task_instance.map_index = -1
   		535083 Query	ROLLBACK
   		535083 Query	ROLLBACK
   ```
   
   After dig into the code in `airflow/models/mappedoperator.py` function `expand_mapped_task`,  i think there is a race condition in this section (maybe  mini scheduler and the main scheduler ? )  `unmapped_ti` is not use  lock `with_for_update()`, when two thread concurrently execute ` unmapped_ti.map_index = 0`, the first thread commit but the second thread call `flush()` may raise this exception.
   ```
   unmapped_ti: TaskInstance | None = (
               session.query(TaskInstance)
               .filter(
                   TaskInstance.dag_id == self.dag_id,
                   TaskInstance.task_id == self.task_id,
                   TaskInstance.run_id == run_id,
                   TaskInstance.map_index == -1,
                   or_(TaskInstance.state.in_(State.unfinished), TaskInstance.state.is_(None)),
               )
               .one_or_none()
           )
   
           all_expanded_tis: list[TaskInstance] = []
   
           if unmapped_ti:
               # The unmapped task instance still exists and is unfinished, i.e. we
               # haven't tried to run it before.
               if total_length is None:
                   if self.dag and self.dag.partial:
                       # If the DAG is partial, it's likely that the upstream tasks
                       # are not done yet, so we do nothing
                       indexes_to_map: Iterable[int] = ()
                   else:
                       # If the map length cannot be calculated (due to unavailable
                       # upstream sources), fail the unmapped task.
                       unmapped_ti.state = TaskInstanceState.UPSTREAM_FAILED
                       indexes_to_map = ()
               elif total_length < 1:
                   # If the upstream maps this to a zero-length value, simply mark
                   # the unmapped task instance as SKIPPED (if needed).
                   self.log.info(
                       "Marking %s as SKIPPED since the map has %d values to expand",
                       unmapped_ti,
                       total_length,
                   )
                   unmapped_ti.state = TaskInstanceState.SKIPPED
                   indexes_to_map = ()
               else:
                   # Otherwise convert this into the first mapped index, and create
                   # TaskInstance for other indexes.
                   unmapped_ti.map_index = 0
                   self.log.debug("Updated in place to become %s", unmapped_ti)
                   all_expanded_tis.append(unmapped_ti)
                   indexes_to_map = range(1, total_length)
   ```
   
   After change `unmapped_ti`  to `with_for_update().one_or_none()`, the exception not reappear  recently. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] lihan commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by GitBox <gi...@apache.org>.
lihan commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1297052080

   Dong this query returns multiple rows for the same dag_id
   
   ```
   select dag_id, count(*)
   from dag
   group by 1
   having count(*) > 1;
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1362681933

   > @ejstembler @lihan - which database are you using ?
   
   The OP at least is on postgres - I can't say for certain what version, but the logs in the screenshot are from Astro and we have only ever supported postgres.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tongtie commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by "tongtie (via GitHub)" <gi...@apache.org>.
tongtie commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1435689792

   > @tongtie Could you provide a bit more details about your DB Backend? And your Airflow setup: Executor, number of schedulers, etc. more details better.
   
   My db is mysql5.7, 
   1 scheduler running,
   using celery executor,
   here is my `airflow info` output:
   ![image](https://user-images.githubusercontent.com/2959046/219871388-2318823f-1083-4f60-8bfc-6d3b2702829a.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mrn-aglic commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by "mrn-aglic (via GitHub)" <gi...@apache.org>.
mrn-aglic commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1701464073

   I've re-run again, and it seems to be ok, but I'm getting this issue, although the task is marked as success:
   
   ```
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING - Traceback (most recent call last):
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -   File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -     self.run()
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -   File "/usr/local/lib/python3.10/threading.py", line 953, in run
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -     self._target(*self._args, **self._kwargs)
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -   File "/usr/local/lib/python3.10/site-packages/openlineage/airflow/listener.py", line 90, in on_running
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -     task_instance_copy.render_templates()
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -   File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2172, in render_templates
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -     context = self.get_template_context()
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -   File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2055, in get_template_context
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -     expanded_ti_count: int | None = task.get_mapped_ti_count(self.run_id, session=session)
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -   File "/usr/local/lib/python3.10/site-packages/airflow/models/abstractoperator.py", line 387, in get_mapped_ti_count
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -     return group.get_mapped_ti_count(run_id, session=session)
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -   File "/usr/local/lib/python3.10/site-packages/airflow/utils/task_group.py", line 576, in get_mapped_ti_count
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -     return functools.reduce(
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -   File "/usr/local/lib/python3.10/site-packages/airflow/utils/task_group.py", line 578, in <genexpr>
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING -     (g._expand_input.get_total_map_length(run_id, session=session) for g in groups),
   [2023-08-31, 17:25:46 UTC] {logging_mixin.py:149} WARNING - AttributeError: 'MappedTaskGroup' object has no attribute '_expand_input'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1435700100

   > My db is mysql5.7,
   
   That would be enough https://github.com/apache/airflow/pull/28689 fix only for DB backends which supported SELECT FOR UPDATE, unfortunetly MySQL 5.7 not supported this. 
   
   Potentially someone could found a solution for MySQL 5.7 before EOL, but for avoid waiting this for days or months I would recommend upgrade to MySQL 8.0 now. Or if you could afford lost all of history and create everything from scratch you might choose Postgres as backend.
   
   ---
   
   And just in case I would like to reminder for someone who found this issue that **MariaDB is not supported** database backend for Airflow.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #27300: Scheduler encounters database update error, then gets stuck in endless loop, yet still shows as healthy

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1362607151

   Hey @ephraimbuddy @ashb @uranusjr @jedcunningham @Taragolis @alex-astronomer and @wolfier  (just raising awareness for those who might have some clues or do do some more thorough investigation or mentioned above as familiar with this issue) 
   
   Maybe some of us already had some ideas we might want to take a very close look at this one before 2.5.1 and try to investigate it more thoroughly rather than moving to the next release (as happened before few times). 
   
   Seems it continues to happen and other uses report the same problems - example here: https://github.com/apache/airflow/discussions/28531 
   
   We already had very similar issues reported by other users:
   
   * https://github.com/apache/airflow/discussions/24727 and https://github.com/apache/airflow/discussions/25333 - suggested that this might happen on MariaDB only or after migrating a database originally created for MariaDB - after migration to MySQL the problem was gone. No-one was sure if this was the case, but it looked likely.
   
   * https://github.com/apache/airflow/discussions/25130 - suggests that the problem might happen when you remove a task while it is being scheduled (this is likely very different root cause, but the same query fails). I guess this is expected behaviour and users should not be doing it anyway (we might want to add some extra protection and detection of such case though).
   
   However https://github.com/apache/airflow/discussions/28531  is the same issue happening on fully supported version of MySQL: `mysql  Ver 14.14 Distrib 5.7.39, for el7 (x86_64) using  EditLine wrapper` so this is not very likely to be MariaDB issue only. 
   
   Just want to make sure to mention that one - because it impacts the perception of Airflow scheduler being "stable" and "solid" and I think this should be one of the super important properties of Airflow that we should focus on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org