You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/06/10 07:00:11 UTC

[GitHub] [airflow] mik-laj commented on a change in pull request #5382: [AIRFLOW-4741] Include Sentry for Airflow

mik-laj commented on a change in pull request #5382: [AIRFLOW-4741] Include Sentry for Airflow
URL: https://github.com/apache/airflow/pull/5382#discussion_r291899596
 
 

 ##########
 File path: airflow/contrib/hooks/sentry_hook.py
 ##########
 @@ -0,0 +1,91 @@
+from airflow.hooks.base_hook import BaseHook
+from airflow.utils.db import provide_session
+from airflow.models import TaskInstance
+
+from sentry_sdk.integrations.celery import CeleryIntegration
+from sentry_sdk.integrations.logging import ignore_logger
+from sentry_sdk import configure_scope, add_breadcrumb, init
+
+original_task_init = TaskInstance.__init__
+original_clear_xcom = TaskInstance.clear_xcom_data
+SCOPE_TAGS = frozenset(("task_id", "dag_id", "execution_date", "ds", "operator"))
+
+
+@provide_session
+def get_task_instance_attr(self, task_id, attr, session=None):
+    """
+    Retrieve attribute from task.
+    """
+    TI = TaskInstance
+    ti = (
+        session.query(TI)
+        .filter(
+            TI.dag_id == self.dag_id,
+            TI.task_id == task_id,
+            TI.execution_date == self.execution_date,
+        )
+        .all()
+    )
+    if ti:
+        attr = getattr(ti[0], attr)
+    else:
+        attr = None
+    return attr
+
+
+@property
+def ds(self):
+    """
+    Date stamp for task object.
+    """
+    return self.execution_date.strftime("%Y-%m-%d")
+
+
+@provide_session
+def new_clear_xcom(self, session=None):
+    """
+    Add breadcrumbs just before task is executed.
+    """
+    for task in self.task.get_flat_relatives(upstream=True):
+        state = get_task_instance_attr(self, task.task_id, "state")
+        operation = get_task_instance_attr(self, task.task_id, "operator")
+        add_breadcrumb(
+            category="data",
+            message="Upstream Task: {}, State: {}, Operation: {}".format(
+                task.task_id, state, operation
+            ),
+            level="info",
+        )
+    original_clear_xcom(self, session)
+
+
+def add_sentry(self, *args, **kwargs):
+    """
+    Change the TaskInstance init function to add customized tagging.
+    """
+    original_task_init(self, *args, **kwargs)
+    self.operator = self.task.__class__.__name__
+    with configure_scope() as scope:
+        for tag_name in SCOPE_TAGS:
+            scope.set_tag(tag_name, getattr(self, tag_name))
+
+
+class SentryHook(BaseHook):
+    """
+    Wrap around the Sentry SDK.
+    """
+
+    def __init__(self, sentry_conn_id=None):
+        sentry_celery = CeleryIntegration()
+        integrations = [sentry_celery]
+        ignore_logger("airflow.task")
+
+        self.conn_id = self.get_connection(sentry_conn_id)
+        self.dsn = self.conn_id.host
+        init(dsn=self.dsn, integrations=integrations)
+
+        if not getattr(TaskInstance, "_sentry_integration_", False):
+            TaskInstance.__init__ = add_sentry
 
 Review comment:
   I am very glad that this integration appears. I have doubts whether the current implementation is good. I think the init logic of this code does not necessarily have to be in the hook. I think that it is not a problem to initiate this integration in the core.
   
   I also think that hooking methods in this way is not a good idea. This can be very confusing to other people.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services