You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2022/01/27 17:24:19 UTC

[airflow] branch main updated: Ensure `on_task_instance_running` listener can get at task (#21157)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 82adce5  Ensure `on_task_instance_running` listener can get at task (#21157)
82adce5 is described below

commit 82adce535eb0c427c230035d648bf3c829824b21
Author: Maciej Obuchowski <ob...@gmail.com>
AuthorDate: Thu Jan 27 18:23:36 2022 +0100

    Ensure `on_task_instance_running` listener can get at task (#21157)
    
    When we added TaskListener API. It's contract promises to pass TaskInstance
    object to listener plugin. However, what happens is not 100% true
    - the object being passed is one that maps to current SQLAlchemy session.
    
    `check_and_change_state_before_execution` operates on detached TaskInstance
    object, then merges it to current session. Since there is no attached object in
    the SQLAlchemy identity map, SQLAlchemy creates it, and it's this object that's
    being passed to the SQLAlchemy event listeners.
    
    The problem with that is that when creating new SQLAlchemy object, SQLAlchemy
    takes care about setting only database-mapped fields. The ones that are purely
    on the python side, like task aren't being set on the new object.
    
    This manually sets task on the new SQLAlchemy object, so that
    `on_task_instance_running` receives a TaskInstance with `task` field set.
    
    Signed-off-by: Maciej Obuchowski <ob...@gmail.com>
---
 airflow/models/taskinstance.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6472e74..ed3e9b0 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1251,7 +1251,7 @@ class TaskInstance(Base, LoggingMixin):
         self.external_executor_id = external_executor_id
         self.end_date = None
         if not test_mode:
-            session.merge(self)
+            session.merge(self).task = task
         session.commit()
 
         # Closing all pooled connections to prevent