You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2022/01/27 17:24:19 UTC
[airflow] branch main updated: Ensure `on_task_instance_running` listener can get at task (#21157)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 82adce5 Ensure `on_task_instance_running` listener can get at task (#21157)
82adce5 is described below
commit 82adce535eb0c427c230035d648bf3c829824b21
Author: Maciej Obuchowski <ob...@gmail.com>
AuthorDate: Thu Jan 27 18:23:36 2022 +0100
Ensure `on_task_instance_running` listener can get at task (#21157)
When we added TaskListener API. It's contract promises to pass TaskInstance
object to listener plugin. However, what happens is not 100% true
- the object being passed is one that maps to current SQLAlchemy session.
`check_and_change_state_before_execution` operates on detached TaskInstance
object, then merges it to current session. Since there is no attached object in
the SQLAlchemy identity map, SQLAlchemy creates it, and it's this object that's
being passed to the SQLAlchemy event listeners.
The problem with that is that when creating new SQLAlchemy object, SQLAlchemy
takes care about setting only database-mapped fields. The ones that are purely
on the python side, like task aren't being set on the new object.
This manually sets task on the new SQLAlchemy object, so that
`on_task_instance_running` receives a TaskInstance with `task` field set.
Signed-off-by: Maciej Obuchowski <ob...@gmail.com>
---
airflow/models/taskinstance.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6472e74..ed3e9b0 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1251,7 +1251,7 @@ class TaskInstance(Base, LoggingMixin):
self.external_executor_id = external_executor_id
self.end_date = None
if not test_mode:
- session.merge(self)
+ session.merge(self).task = task
session.commit()
# Closing all pooled connections to prevent