You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/08/15 18:45:05 UTC

[airflow] 29/45: Fix "This Session's transaction has been rolled back" (#25532)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 18f13384f1f4b551ddbd9c20c0fe25b21be37b17
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Fri Aug 5 17:15:31 2022 +0100

    Fix "This Session's transaction has been rolled back" (#25532)
    
    Accessing the run_id(self.run_id) on exception leads to error because sessions are invalidated on exception. Here we extract the run_id before handling the exception
    
    (cherry picked from commit 5668888a7e1074a620b3d38f407ecf1aa055b623)
---
 airflow/models/dagrun.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 8589f6878e..968f360de7 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1044,6 +1044,10 @@ class DagRun(Base, LoggingMixin):
         :param session: the session to use
 
         """
+        # Fetch the information we need before handling the exception to avoid
+        # PendingRollbackError due to the session being invalidated on exception
+        # see https://github.com/apache/superset/pull/530
+        run_id = self.run_id
         try:
             if hook_is_noop:
                 session.bulk_insert_mappings(TI, tasks)
@@ -1057,7 +1061,7 @@ class DagRun(Base, LoggingMixin):
             self.log.info(
                 'Hit IntegrityError while creating the TIs for %s- %s',
                 dag_id,
-                self.run_id,
+                run_id,
                 exc_info=True,
             )
             self.log.info('Doing session rollback.')