You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/09/10 12:50:25 UTC

[airflow] 05/05: Fix Sentry handler from LocalTaskJob causing error (#18119)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f9c337fb5a0290a38069a2adff5a8fcea806bf2b
Author: Robin Edwards <ro...@gmail.com>
AuthorDate: Fri Sep 10 00:14:36 2021 +0100

    Fix Sentry handler from LocalTaskJob causing error (#18119)
    
    The `enrich_errors` method assumes the first argument to the function
    its patch is a TaskInstance when infact it can also be a LocalTaskJob.
    
    This is now handled by extracting the task_instance from the
    LocalTaskJob
    
    Closes #18118
    
    (cherry picked from commit f97ddf10e148bd18b6a09ec96f1901068c8684f0)
---
 airflow/sentry.py | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git a/airflow/sentry.py b/airflow/sentry.py
index 51fe26f..9e271b4 100644
--- a/airflow/sentry.py
+++ b/airflow/sentry.py
@@ -148,11 +148,14 @@ if conf.getboolean("sentry", 'sentry_on', fallback=False):
                 sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, level="info")
 
         def enrich_errors(self, func):
-            """Wrap TaskInstance._run_raw_task to support task specific tags and breadcrumbs."""
+            """
+            Wrap TaskInstance._run_raw_task and LocalTaskJob._run_mini_scheduler_on_child_tasks
+             to support task specific tags and breadcrumbs.
+            """
             session_args_idx = find_session_idx(func)
 
             @wraps(func)
-            def wrapper(task_instance, *args, **kwargs):
+            def wrapper(_self, *args, **kwargs):
                 # Wrapping the _run_raw_task function with push_scope to contain
                 # tags and breadcrumbs to a specific Task Instance
 
@@ -163,8 +166,14 @@ if conf.getboolean("sentry", 'sentry_on', fallback=False):
 
                 with sentry_sdk.push_scope():
                     try:
-                        return func(task_instance, *args, **kwargs)
+                        return func(_self, *args, **kwargs)
                     except Exception as e:
+                        # Is a LocalTaskJob get the task instance
+                        if hasattr(_self, 'task_instance'):
+                            task_instance = _self.task_instance
+                        else:
+                            task_instance = _self
+
                         self.add_tagging(task_instance)
                         self.add_breadcrumbs(task_instance, session=session)
                         sentry_sdk.capture_exception(e)