You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/08/09 11:22:10 UTC
[airflow] branch master updated: Disable sentry integration by
default (#10212)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new db8d06a Disable sentry integration by default (#10212)
db8d06a is described below
commit db8d06a6962cf6c154accd27806b0520570c66c1
Author: Kamil BreguĊa <mi...@users.noreply.github.com>
AuthorDate: Sun Aug 9 13:21:41 2020 +0200
Disable sentry integration by default (#10212)
* Disable sentry integration by default
---
UPDATING.md | 5 +
airflow/config_templates/config.yml | 6 +
airflow/config_templates/default_airflow.cfg | 2 +
airflow/sentry.py | 241 +++++++++++++--------------
docs/errors.rst | 4 +-
tests/test_sentry.py | 9 +-
6 files changed, 140 insertions(+), 127 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index af365a4..db7081d 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -556,6 +556,11 @@ Change DAG file loading duration metric from
`dag.loading-duration.<dag_id>` to `dag.loading-duration.<dag_file>`. This is to
better handle the case when a DAG file has multiple DAGs.
+#### Sentry is disabled by default
+
+Sentry is disabled by default. To enable these integrations, you need set ``sentry_on`` option
+in ``[sentry]`` section to ``"True"``.
+
### Changes to the core operators/hooks
We strive to ensure that there are no changes that may affect the end user and your files, but this
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 776e61f..972ad27 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1163,6 +1163,12 @@
Unsupported options: ``integrations``, ``in_app_include``, ``in_app_exclude``,
``ignore_errors``, ``before_breadcrumb``, ``before_send``, ``transport``.
options:
+ - name: sentry_on
+ description: Enable error repoting to Sentry
+ version_added: ~
+ type: string
+ example: ~
+ default: "false"
- name: sentry_dsn
description: ~
version_added: 1.10.6
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index be8b90d..bddb418 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -573,6 +573,8 @@ smtp_mail_from = airflow@example.com
# https://docs.sentry.io/error-reporting/configuration/?platform=python.
# Unsupported options: ``integrations``, ``in_app_include``, ``in_app_exclude``,
# ``ignore_errors``, ``before_breadcrumb``, ``before_send``, ``transport``.
+# Enable error repoting to Sentry
+sentry_on = false
sentry_dsn =
[celery]
diff --git a/airflow/sentry.py b/airflow/sentry.py
index 51b38d2..23a7703 100644
--- a/airflow/sentry.py
+++ b/airflow/sentry.py
@@ -57,135 +57,130 @@ class DummySentry:
"""
-class ConfiguredSentry(DummySentry):
- """
- Configure Sentry SDK.
- """
-
- SCOPE_TAGS = frozenset(
- ("task_id", "dag_id", "execution_date", "operator", "try_number")
- )
- SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration"))
-
- UNSUPPORTED_SENTRY_OPTIONS = frozenset(
- ("integrations", "in_app_include", "in_app_exclude", "ignore_errors",
- "before_breadcrumb", "before_send", "transport")
- )
+Sentry: DummySentry = DummySentry()
+if conf.getboolean("sentry", 'sentry_on', fallback=False):
+ import sentry_sdk
+ # Verify blinker installation
+ from blinker import signal # noqa: F401 pylint: disable=unused-import
+ from sentry_sdk.integrations.flask import FlaskIntegration
+ from sentry_sdk.integrations.logging import ignore_logger
- def __init__(self):
- """
- Initialize the Sentry SDK.
+ class ConfiguredSentry(DummySentry):
"""
- ignore_logger("airflow.task")
- ignore_logger("airflow.jobs.backfill_job.BackfillJob")
- executor_name = conf.get("core", "EXECUTOR")
-
- sentry_flask = FlaskIntegration()
-
- # LoggingIntegration is set by default.
- integrations = [sentry_flask]
-
- if executor_name == "CeleryExecutor":
- from sentry_sdk.integrations.celery import CeleryIntegration
-
- sentry_celery = CeleryIntegration()
- integrations.append(sentry_celery)
-
- dsn = None
- sentry_config_opts = conf.getsection("sentry") or {}
- if sentry_config_opts:
- old_way_dsn = sentry_config_opts.pop("sentry_dsn", None)
- new_way_dsn = sentry_config_opts.pop("dsn", None)
- # supported backward compability with old way dsn option
- dsn = old_way_dsn or new_way_dsn
-
- unsupported_options = self.UNSUPPORTED_SENTRY_OPTIONS.intersection(
- sentry_config_opts.keys())
- if unsupported_options:
- log.warning(
- "There are unsupported options in [sentry] section: %s",
- ", ".join(unsupported_options)
- )
-
- if dsn:
- init(dsn=dsn, integrations=integrations, **sentry_config_opts)
- else:
- # Setting up Sentry using environment variables.
- log.debug("Defaulting to SENTRY_DSN in environment.")
- init(integrations=integrations, **sentry_config_opts)
-
- def add_tagging(self, task_instance):
+ Configure Sentry SDK.
"""
- Function to add tagging for a task_instance.
- """
- task = task_instance.task
-
- with configure_scope() as scope:
- for tag_name in self.SCOPE_TAGS:
- attribute = getattr(task_instance, tag_name)
- if tag_name == "operator":
- attribute = task.__class__.__name__
- scope.set_tag(tag_name, attribute)
- @provide_session
- def add_breadcrumbs(self, task_instance, session=None):
- """
- Function to add breadcrumbs inside of a task_instance.
- """
- if session is None:
- return
- execution_date = task_instance.execution_date
- task = task_instance.task
- dag = task.dag
- task_instances = dag.get_task_instances(
- state={State.SUCCESS, State.FAILED},
- end_date=execution_date,
- start_date=execution_date,
- session=session,
+ SCOPE_TAGS = frozenset(
+ ("task_id", "dag_id", "execution_date", "operator", "try_number")
)
+ SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration"))
- for ti in task_instances:
- data = {}
- for crumb_tag in self.SCOPE_CRUMBS:
- data[crumb_tag] = getattr(ti, crumb_tag)
-
- add_breadcrumb(category="completed_tasks", data=data, level="info")
-
- def enrich_errors(self, func):
- """
- Wrap TaskInstance._run_raw_task to support task specific tags and breadcrumbs.
- """
-
- @wraps(func)
- def wrapper(task_instance, *args, session=None, **kwargs):
- # Wrapping the _run_raw_task function with push_scope to contain
- # tags and breadcrumbs to a specific Task Instance
- with push_scope():
- try:
- return func(task_instance, *args, session=session, **kwargs)
- except Exception as e:
- self.add_tagging(task_instance)
- self.add_breadcrumbs(task_instance, session=session)
- capture_exception(e)
- raise
-
- return wrapper
-
- def flush(self):
- import sentry_sdk
- sentry_sdk.flush()
-
+ UNSUPPORTED_SENTRY_OPTIONS = frozenset(
+ ("integrations", "in_app_include", "in_app_exclude", "ignore_errors",
+ "before_breadcrumb", "before_send", "transport")
+ )
-Sentry = DummySentry() # type: DummySentry
-
-try:
- # Verify blinker installation
- from blinker import signal # noqa: F401 pylint: disable=unused-import
- from sentry_sdk import add_breadcrumb, capture_exception, configure_scope, init, push_scope
- from sentry_sdk.integrations.flask import FlaskIntegration
- from sentry_sdk.integrations.logging import ignore_logger
+ def __init__(self):
+ """
+ Initialize the Sentry SDK.
+ """
+ ignore_logger("airflow.task")
+ ignore_logger("airflow.jobs.backfill_job.BackfillJob")
+ executor_name = conf.get("core", "EXECUTOR")
+
+ sentry_flask = FlaskIntegration()
+
+ # LoggingIntegration is set by default.
+ integrations = [sentry_flask]
+
+ if executor_name == "CeleryExecutor":
+ from sentry_sdk.integrations.celery import CeleryIntegration
+
+ sentry_celery = CeleryIntegration()
+ integrations.append(sentry_celery)
+
+ dsn = None
+ sentry_config_opts = conf.getsection("sentry") or {}
+ if sentry_config_opts:
+ sentry_config_opts.pop("sentry_on")
+ old_way_dsn = sentry_config_opts.pop("sentry_dsn", None)
+ new_way_dsn = sentry_config_opts.pop("dsn", None)
+ # supported backward compability with old way dsn option
+ dsn = old_way_dsn or new_way_dsn
+
+ unsupported_options = self.UNSUPPORTED_SENTRY_OPTIONS.intersection(
+ sentry_config_opts.keys())
+ if unsupported_options:
+ log.warning(
+ "There are unsupported options in [sentry] section: %s",
+ ", ".join(unsupported_options)
+ )
+
+ if dsn:
+ sentry_sdk.init(dsn=dsn, integrations=integrations, **sentry_config_opts)
+ else:
+ # Setting up Sentry using environment variables.
+ log.debug("Defaulting to SENTRY_DSN in environment.")
+ sentry_sdk.init(integrations=integrations, **sentry_config_opts)
+
+ def add_tagging(self, task_instance):
+ """
+ Function to add tagging for a task_instance.
+ """
+ task = task_instance.task
+
+ with sentry_sdk.configure_scope() as scope:
+ for tag_name in self.SCOPE_TAGS:
+ attribute = getattr(task_instance, tag_name)
+ if tag_name == "operator":
+ attribute = task.__class__.__name__
+ scope.set_tag(tag_name, attribute)
+
+ @provide_session
+ def add_breadcrumbs(self, task_instance, session=None):
+ """
+ Function to add breadcrumbs inside of a task_instance.
+ """
+ if session is None:
+ return
+ execution_date = task_instance.execution_date
+ task = task_instance.task
+ dag = task.dag
+ task_instances = dag.get_task_instances(
+ state={State.SUCCESS, State.FAILED},
+ end_date=execution_date,
+ start_date=execution_date,
+ session=session,
+ )
+
+ for ti in task_instances:
+ data = {}
+ for crumb_tag in self.SCOPE_CRUMBS:
+ data[crumb_tag] = getattr(ti, crumb_tag)
+
+ sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, level="info")
+
+ def enrich_errors(self, func):
+ """
+ Wrap TaskInstance._run_raw_task to support task specific tags and breadcrumbs.
+ """
+
+ @wraps(func)
+ def wrapper(task_instance, *args, session=None, **kwargs):
+ # Wrapping the _run_raw_task function with push_scope to contain
+ # tags and breadcrumbs to a specific Task Instance
+ with sentry_sdk.push_scope():
+ try:
+ return func(task_instance, *args, session=session, **kwargs)
+ except Exception as e:
+ self.add_tagging(task_instance)
+ self.add_breadcrumbs(task_instance, session=session)
+ sentry_sdk.capture_exception(e)
+ raise
+
+ return wrapper
+
+ def flush(self):
+ sentry_sdk.flush()
Sentry = ConfiguredSentry()
-
-except ImportError as e:
- log.debug("Could not configure Sentry: %s, using DummySentry instead.", e)
diff --git a/docs/errors.rst b/docs/errors.rst
index 716add8..0dbb9db 100644
--- a/docs/errors.rst
+++ b/docs/errors.rst
@@ -29,7 +29,9 @@ First you must install sentry requirement:
pip install 'apache-airflow[sentry]'
-Add your ``SENTRY_DSN`` to your configuration file e.g. ``airflow.cfg`` under ``[sentry]``. Its template resembles the following: ``'{PROTOCOL}://{PUBLIC_KEY}@{HOST}/{PROJECT_ID}'``
+After that, you need to enable the integration by set ``sentry_on`` option in ``[sentry]`` section to ``"True"``.
+
+Add your ``SENTRY_DSN`` to your configuration file e.g. ``airflow.cfg`` in ``[sentry]`` section. Its template resembles the following: ``'{PROTOCOL}://{PUBLIC_KEY}@{HOST}/{PROJECT_ID}'``
.. code-block:: ini
diff --git a/tests/test_sentry.py b/tests/test_sentry.py
index 21b4001..3d5979a 100644
--- a/tests/test_sentry.py
+++ b/tests/test_sentry.py
@@ -17,6 +17,7 @@
# under the License.
import datetime
+import importlib
import unittest
from unittest.mock import MagicMock, Mock
@@ -24,10 +25,10 @@ from freezegun import freeze_time
from sentry_sdk import configure_scope
from airflow.models import TaskInstance
-from airflow.sentry import ConfiguredSentry
from airflow.settings import Session
from airflow.utils import timezone
from airflow.utils.state import State
+from tests.test_utils.config import conf_vars
EXECUTION_DATE = timezone.utcnow()
DAG_ID = "test_dag"
@@ -60,9 +61,11 @@ CRUMB = {
class TestSentryHook(unittest.TestCase):
+ @conf_vars({('sentry', 'sentry_on'): 'True'})
def setUp(self):
-
- self.sentry = ConfiguredSentry()
+ from airflow import sentry
+ importlib.reload(sentry)
+ self.sentry = sentry.ConfiguredSentry()
# Mock the Dag
self.dag = Mock(dag_id=DAG_ID, params=[])