You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/09/10 12:50:25 UTC
[airflow] 05/05: Fix Sentry handler from LocalTaskJob causing error
(#18119)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f9c337fb5a0290a38069a2adff5a8fcea806bf2b
Author: Robin Edwards <ro...@gmail.com>
AuthorDate: Fri Sep 10 00:14:36 2021 +0100
Fix Sentry handler from LocalTaskJob causing error (#18119)
The `enrich_errors` method assumes the first argument to the function
its patch is a TaskInstance when infact it can also be a LocalTaskJob.
This is now handled by extracting the task_instance from the
LocalTaskJob
Closes #18118
(cherry picked from commit f97ddf10e148bd18b6a09ec96f1901068c8684f0)
---
airflow/sentry.py | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
diff --git a/airflow/sentry.py b/airflow/sentry.py
index 51fe26f..9e271b4 100644
--- a/airflow/sentry.py
+++ b/airflow/sentry.py
@@ -148,11 +148,14 @@ if conf.getboolean("sentry", 'sentry_on', fallback=False):
sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, level="info")
def enrich_errors(self, func):
- """Wrap TaskInstance._run_raw_task to support task specific tags and breadcrumbs."""
+ """
+ Wrap TaskInstance._run_raw_task and LocalTaskJob._run_mini_scheduler_on_child_tasks
+ to support task specific tags and breadcrumbs.
+ """
session_args_idx = find_session_idx(func)
@wraps(func)
- def wrapper(task_instance, *args, **kwargs):
+ def wrapper(_self, *args, **kwargs):
# Wrapping the _run_raw_task function with push_scope to contain
# tags and breadcrumbs to a specific Task Instance
@@ -163,8 +166,14 @@ if conf.getboolean("sentry", 'sentry_on', fallback=False):
with sentry_sdk.push_scope():
try:
- return func(task_instance, *args, **kwargs)
+ return func(_self, *args, **kwargs)
except Exception as e:
+ # Is a LocalTaskJob get the task instance
+ if hasattr(_self, 'task_instance'):
+ task_instance = _self.task_instance
+ else:
+ task_instance = _self
+
self.add_tagging(task_instance)
self.add_breadcrumbs(task_instance, session=session)
sentry_sdk.capture_exception(e)