You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/01/11 22:38:25 UTC
[airflow] branch v2-5-test updated: Fix `DetachedInstanceError` when finding zombies in Dag Parsing process (#28198)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-5-test by this push:
new d59a479a11 Fix `DetachedInstanceError` when finding zombies in Dag Parsing process (#28198)
d59a479a11 is described below
commit d59a479a1154f9d803177af43fe4875d6d6c7459
Author: Bob Du <i...@bobdu.cc>
AuthorDate: Thu Dec 22 20:36:39 2022 +0800
Fix `DetachedInstanceError` when finding zombies in Dag Parsing process (#28198)
```
[2022-12-06T14:20:21.622+0000] {base_job.py:229} DEBUG - [heartbeat]
[2022-12-06T14:20:21.623+0000] {scheduler_job.py:1495} DEBUG - Finding 'running' jobs without a recent heartbeat
[2022-12-06T14:20:21.637+0000] {scheduler_job.py:1515} WARNING - Failing (2) jobs without heartbeat after 2022-12-06 14:15:21.623199+00:00
[2022-12-06T14:20:21.641+0000] {scheduler_job.py:1526} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/xxx_dag.py', 'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'xxx', 'Task Id': 'xxx', 'Run Id': 'scheduled__2022-12-05T00:15:00+00:00', 'Hostname': 'airflow-worker-0.airflow-worker.airflow2.svc.cluster.local', 'External Executor Id': '9520cb9f-3245-497a-8e17-e9dec29d4549'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object a [...]
[2022-12-06T14:20:21.645+0000] {scheduler_job.py:763} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 746, in _execute
self._run_scheduler_loop()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 878, in _run_scheduler_loop
next_event = timers.run(blocking=False)
File "/usr/local/lib/python3.10/sched.py", line 151, in run
action(*argument, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", line 37, in repeat
action(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1522, in _find_zombies
processor_subdir=ti.dag_model.processor_subdir,
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
return self.impl.get(state, dict_)
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 926, in get
value = self._fire_loader_callables(state, key, passive)
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 962, in _fire_loader_callables
return self.callable_(state, passive)
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 861, in _load_for_state
raise orm_exc.DetachedInstanceError(
sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f1ccc3e8520> is not bound to a Session; lazy load operation of attribute 'dag_model' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
[2022-12-06T14:20:21.647+0000] {celery_executor.py:443} DEBUG - Inquiring about 5 celery task(s)
[2022-12-06T14:20:21.669+0000] {celery_executor.py:602} DEBUG - Fetched 5 state(s) for 5 task(s)
[2022-12-06T14:20:21.669+0000] {celery_executor.py:446} DEBUG - Inquiries completed.
[2022-12-06T14:20:21.669+0000] {scheduler_job.py:775} INFO - Exited execute loop
[2022-12-06T14:20:21.674+0000] {cli_action_loggers.py:83} DEBUG - Calling callbacks: []
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line 39, in main
args.func(args)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 52, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 103, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 85, in scheduler
_run_scheduler_job(args=args)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 50, in _run_scheduler_job
job.run()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 247, in run
self._execute()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 746, in _execute
self._run_scheduler_loop()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 878, in _run_scheduler_loop
next_event = timers.run(blocking=False)
File "/usr/local/lib/python3.10/sched.py", line 151, in run
action(*argument, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", line 37, in repeat
action(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1522, in _find_zombies
processor_subdir=ti.dag_model.processor_subdir,
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
return self.impl.get(state, dict_)
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 926, in get
value = self._fire_loader_callables(state, key, passive)
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 962, in _fire_loader_callables
return self.callable_(state, passive)
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 861, in _load_for_state
raise orm_exc.DetachedInstanceError(
sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f1ccc3e8520> is not bound to a Session; lazy load operation of attribute 'dag_model' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
```
When in standalone dag processor mode, will use `DatabaseCallbackSink`
`_find_zombies` func call `self.executor.send_callback(request)` func.
But not propagation orm `session` , provide_session in `send` func again.
```
class DatabaseCallbackSink(BaseCallbackSink):
"""Sends callbacks to database."""
@provide_session
def send(self, callback: CallbackRequest, session: Session = NEW_SESSION) -> None:
"""Sends callback for execution."""
db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
session.add(db_callback)
```
Signed-off-by: BobDu <i...@bobdu.cc>
(cherry picked from commit 4b340b7561e9db0055bf69ad0fc8b3a508ea7667)
---
airflow/jobs/scheduler_job.py | 33 ++++++++++++++++----------------
tests/jobs/test_scheduler_job.py | 41 ++++++++++++++++++++--------------------
2 files changed, 36 insertions(+), 38 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 6fe8d3d710..baeabdc2ec 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1510,8 +1510,7 @@ class SchedulerJob(BaseJob):
if num_timed_out_tasks:
self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)
- @provide_session
- def _find_zombies(self, session: Session) -> None:
+ def _find_zombies(self) -> None:
"""
Find zombie task instances, which are tasks haven't heartbeated for too long
or have a no-longer-running LocalTaskJob, and create a TaskCallbackRequest
@@ -1522,30 +1521,30 @@ class SchedulerJob(BaseJob):
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
- zombies = (
- session.query(TaskInstance, DagModel.fileloc)
- .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
- .join(LocalTaskJob, TaskInstance.job_id == LocalTaskJob.id)
- .join(DagModel, TaskInstance.dag_id == DagModel.dag_id)
- .filter(TaskInstance.state == TaskInstanceState.RUNNING)
- .filter(
- or_(
- LocalTaskJob.state != State.RUNNING,
- LocalTaskJob.latest_heartbeat < limit_dttm,
+ with create_session() as session:
+ zombies: list[tuple[TI, str, str]] = (
+ session.query(TI, DM.fileloc, DM.processor_subdir)
+ .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
+ .join(LocalTaskJob, TaskInstance.job_id == LocalTaskJob.id)
+ .filter(TI.state == TaskInstanceState.RUNNING)
+ .filter(
+ or_(
+ LocalTaskJob.state != State.RUNNING,
+ LocalTaskJob.latest_heartbeat < limit_dttm,
+ )
)
+ .filter(TI.queued_by_job_id == self.id)
+ .all()
)
- .filter(TaskInstance.queued_by_job_id == self.id)
- .all()
- )
if zombies:
self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)
- for ti, file_loc in zombies:
+ for ti, file_loc, processor_subdir in zombies:
zombie_message_details = self._generate_zombie_message_details(ti)
request = TaskCallbackRequest(
full_filepath=file_loc,
- processor_subdir=ti.dag_model.processor_subdir,
+ processor_subdir=processor_subdir,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=str(zombie_message_details),
)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index b0c8e016c8..f04a76f57e 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -4129,14 +4129,13 @@ class TestSchedulerJob:
assert ti2.state == State.DEFERRED
def test_find_zombies_nothing(self):
- with create_session() as session:
- executor = MockExecutor(do_update=False)
- self.scheduler_job = SchedulerJob(executor=executor)
- self.scheduler_job.processor_agent = mock.MagicMock()
+ executor = MockExecutor(do_update=False)
+ self.scheduler_job = SchedulerJob(executor=executor)
+ self.scheduler_job.processor_agent = mock.MagicMock()
- self.scheduler_job._find_zombies(session=session)
+ self.scheduler_job._find_zombies()
- self.scheduler_job.executor.callback_sink.send.assert_not_called()
+ self.scheduler_job.executor.callback_sink.send.assert_not_called()
def test_find_zombies(self, load_examples):
dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
@@ -4179,20 +4178,21 @@ class TestSchedulerJob:
ti.queued_by_job_id = self.scheduler_job.id
session.flush()
- self.scheduler_job._find_zombies(session=session)
+ self.scheduler_job._find_zombies()
- self.scheduler_job.executor.callback_sink.send.assert_called_once()
- requests = self.scheduler_job.executor.callback_sink.send.call_args[0]
- assert 1 == len(requests)
- assert requests[0].full_filepath == dag.fileloc
- assert requests[0].msg == str(self.scheduler_job._generate_zombie_message_details(ti))
- assert requests[0].is_failure_callback is True
- assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
- assert ti.dag_id == requests[0].simple_task_instance.dag_id
- assert ti.task_id == requests[0].simple_task_instance.task_id
- assert ti.run_id == requests[0].simple_task_instance.run_id
- assert ti.map_index == requests[0].simple_task_instance.map_index
+ self.scheduler_job.executor.callback_sink.send.assert_called_once()
+ requests = self.scheduler_job.executor.callback_sink.send.call_args[0]
+ assert 1 == len(requests)
+ assert requests[0].full_filepath == dag.fileloc
+ assert requests[0].msg == str(self.scheduler_job._generate_zombie_message_details(ti))
+ assert requests[0].is_failure_callback is True
+ assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
+ assert ti.dag_id == requests[0].simple_task_instance.dag_id
+ assert ti.task_id == requests[0].simple_task_instance.task_id
+ assert ti.run_id == requests[0].simple_task_instance.run_id
+ assert ti.map_index == requests[0].simple_task_instance.map_index
+ with create_session() as session:
session.query(TaskInstance).delete()
session.query(LocalTaskJob).delete()
@@ -4267,12 +4267,11 @@ class TestSchedulerJob:
Check that the same set of failure callback with zombies are passed to the dag
file processors until the next zombie detection logic is invoked.
"""
- with conf_vars({("core", "load_examples"): "False"}):
+ with conf_vars({("core", "load_examples"): "False"}), create_session() as session:
dagbag = DagBag(
dag_folder=os.path.join(settings.DAGS_FOLDER, "test_example_bash_operator.py"),
read_dags_from_db=False,
)
- session = settings.Session()
session.query(LocalTaskJob).delete()
dag = dagbag.get_dag("test_example_bash_operator")
dag.sync_to_db(processor_subdir=TEST_DAG_FOLDER)
@@ -4301,7 +4300,7 @@ class TestSchedulerJob:
self.scheduler_job.executor = MockExecutor()
self.scheduler_job.processor_agent = mock.MagicMock()
- self.scheduler_job._find_zombies(session=session)
+ self.scheduler_job._find_zombies()
self.scheduler_job.executor.callback_sink.send.assert_called_once()