You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/03/06 06:20:38 UTC

[airflow] branch main updated: Blocklist to disable specific metric tags or metric names (#29881)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 86cd79ffa7 Blocklist to disable specific metric tags or metric names (#29881)
86cd79ffa7 is described below

commit 86cd79ffa76d4e4d4abe3fe829d7797852a713a5
Author: Sung Yun <10...@users.noreply.github.com>
AuthorDate: Mon Mar 6 01:20:05 2023 -0500

    Blocklist to disable specific metric tags or metric names (#29881)
---
 airflow/config_templates/config.yml          |  19 ++++
 airflow/config_templates/default_airflow.cfg |  12 +++
 airflow/stats.py                             | 150 +++++++++++++++++----------
 tests/core/test_stats.py                     |  93 +++++++++++++++--
 4 files changed, 214 insertions(+), 60 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index c9d0bf92ac..4d0d234416 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -882,6 +882,16 @@ metrics:
       type: string
       example: ~
       default: ""
+    statsd_block_list:
+      description: |
+        If you want to avoid sending all the available metrics to StatsD,
+        you can configure a block list of prefixes (comma separated) to filter out metrics that
+        start with the elements of the list (e.g: "scheduler,executor,dagrun").
+        If statsd_allow_list and statsd_block_list are both configured, statsd_block_list is ignored
+      version_added: 2.6.0
+      type: string
+      example: ~
+      default: ""
     stat_name_handler:
       description: |
         A function that validate the StatsD stat name, apply changes to the stat name if necessary and return
@@ -923,6 +933,15 @@ metrics:
       type: string
       example: ~
       default: ~
+    statsd_disabled_tags:
+      description: |
+        If you want to avoid sending all the available metrics tags to StatsD,
+        you can configure a block list of prefixes (comma separated) to filter out metric tags
+        that start with the elements of the list (e.g: "job_id,run_id")
+      version_added: 2.6.0
+      type: string
+      example: job_id,run_id,dag_id,task_id
+      default: job_id,run_id
     statsd_influxdb_enabled:
       description: |
         To enable sending Airflow metrics with StatsD-Influxdb tagging convention.
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index a8e8266a85..8454fb5300 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -486,6 +486,12 @@ statsd_prefix = airflow
 # start with the elements of the list (e.g: "scheduler,executor,dagrun")
 statsd_allow_list =
 
+# If you want to avoid sending all the available metrics to StatsD,
+# you can configure a block list of prefixes (comma separated) to filter out metrics that
+# start with the elements of the list (e.g: "scheduler,executor,dagrun").
+# If statsd_allow_list and statsd_block_list are both configured, statsd_block_list is ignored
+statsd_block_list =
+
 # A function that validate the StatsD stat name, apply changes to the stat name if necessary and return
 # the transformed stat name.
 #
@@ -507,6 +513,12 @@ statsd_datadog_metrics_tags = True
 # Note: The module path must exist on your PYTHONPATH for Airflow to pick it up
 # statsd_custom_client_path =
 
+# If you want to avoid sending all the available metrics tags to StatsD,
+# you can configure a block list of prefixes (comma separated) to filter out metric tags
+# that start with the elements of the list (e.g: "job_id,run_id")
+# Example: statsd_disabled_tags = job_id,run_id,dag_id,task_id
+statsd_disabled_tags = job_id,run_id
+
 # To enable sending Airflow metrics with StatsD-Influxdb tagging convention.
 statsd_influxdb_enabled = False
 
diff --git a/airflow/stats.py b/airflow/stats.py
index ae528a4e66..d20ec2ad77 100644
--- a/airflow/stats.py
+++ b/airflow/stats.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import abc
 import datetime
 import logging
 import socket
@@ -265,22 +266,45 @@ def validate_stat(fn: T) -> T:
     return cast(T, wrapper)
 
 
-class AllowListValidator:
-    """Class to filter unwanted stats."""
+class ListValidator(metaclass=abc.ABCMeta):
+    """
+    ListValidator metaclass that can be implemented as a AllowListValidator
+    or BlockListValidator. The test method must be overridden by its subclass.
+    """
 
-    def __init__(self, allow_list=None):
-        if allow_list:
+    def __init__(self, validate_list: str | None = None) -> None:
+        self.validate_list: tuple[str, ...] | None = (
+            tuple(item.strip().lower() for item in validate_list.split(",")) if validate_list else None
+        )
+
+    @classmethod
+    def __subclasshook__(cls, subclass):
+        return hasattr(subclass, "test") and callable(subclass.test) or NotImplemented
 
-            self.allow_list = tuple(item.strip().lower() for item in allow_list.split(","))
+    @abc.abstractmethod
+    def test(self, name):
+        """Test if name is allowed"""
+        raise NotImplementedError
+
+
+class AllowListValidator(ListValidator):
+    """AllowListValidator only allows names that match the allowed prefixes."""
+
+    def test(self, name):
+        if self.validate_list is not None:
+            return name.strip().lower().startswith(self.validate_list)
         else:
-            self.allow_list = None
+            return True  # default is all metrics are allowed
+
 
-    def test(self, stat):
-        """Test if stat is in the Allow List."""
-        if self.allow_list is not None:
-            return stat.strip().lower().startswith(self.allow_list)
+class BlockListValidator(ListValidator):
+    """BlockListValidator only allows names that do not match the blocked prefixes."""
+
+    def test(self, name):
+        if self.validate_list is not None:
+            return not name.strip().lower().startswith(self.validate_list)
         else:
-            return True  # default is all metrics allowed
+            return True  # default is all metrics are allowed
 
 
 def prepare_stat_with_tags(fn: T) -> T:
@@ -291,10 +315,11 @@ def prepare_stat_with_tags(fn: T) -> T:
         if self.influxdb_tags_enabled:
             if stat is not None and tags is not None:
                 for k, v in tags.items():
-                    if all((c not in [",", "="] for c in v + k)):
-                        stat += f",{k}={v}"
-                    else:
-                        log.error("Dropping invalid tag: %s=%s.", k, v)
+                    if self.metric_tags_validator.test(k):
+                        if all((c not in [",", "="] for c in v + k)):
+                            stat += f",{k}={v}"
+                        else:
+                            log.error("Dropping invalid tag: %s=%s.", k, v)
         return fn(self, stat, *args, tags=tags, **kwargs)
 
     return cast(T, wrapper)
@@ -306,14 +331,14 @@ class SafeStatsdLogger:
     def __init__(
         self,
         statsd_client,
-        allow_list_validator=AllowListValidator(),
-        aggregation_optimizer_enabled=False,
+        metrics_validator: ListValidator = AllowListValidator(),
         influxdb_tags_enabled=False,
+        metric_tags_validator: ListValidator = AllowListValidator(),
     ):
         self.statsd = statsd_client
-        self.allow_list_validator = allow_list_validator
-        self.aggregation_optimizer_enabled = aggregation_optimizer_enabled
+        self.metrics_validator = metrics_validator
         self.influxdb_tags_enabled = influxdb_tags_enabled
+        self.metric_tags_validator = metric_tags_validator
 
     @prepare_stat_with_tags
     @validate_stat
@@ -326,7 +351,7 @@ class SafeStatsdLogger:
         tags: dict[str, str] | None = None,
     ):
         """Increment stat."""
-        if self.allow_list_validator.test(stat):
+        if self.metrics_validator.test(stat):
             return self.statsd.incr(stat, count, rate)
         return None
 
@@ -341,7 +366,7 @@ class SafeStatsdLogger:
         tags: dict[str, str] | None = None,
     ):
         """Decrement stat."""
-        if self.allow_list_validator.test(stat):
+        if self.metrics_validator.test(stat):
             return self.statsd.decr(stat, count, rate)
         return None
 
@@ -357,7 +382,7 @@ class SafeStatsdLogger:
         tags: dict[str, str] | None = None,
     ):
         """Gauge stat."""
-        if self.allow_list_validator.test(stat):
+        if self.metrics_validator.test(stat):
             return self.statsd.gauge(stat, value, rate, delta)
         return None
 
@@ -371,7 +396,7 @@ class SafeStatsdLogger:
         tags: dict[str, str] | None = None,
     ):
         """Stats timing."""
-        if self.allow_list_validator.test(stat):
+        if self.metrics_validator.test(stat):
             return self.statsd.timing(stat, dt)
         return None
 
@@ -385,7 +410,7 @@ class SafeStatsdLogger:
         **kwargs,
     ):
         """Timer metric that can be cancelled."""
-        if stat and self.allow_list_validator.test(stat):
+        if stat and self.metrics_validator.test(stat):
             return Timer(self.statsd.timer(stat, *args, **kwargs))
         return Timer()
 
@@ -396,14 +421,14 @@ class SafeDogStatsdLogger:
     def __init__(
         self,
         dogstatsd_client,
-        allow_list_validator=AllowListValidator(),
+        metrics_validator: ListValidator = AllowListValidator(),
         metrics_tags=False,
-        aggregation_optimizer_enabled=False,
+        metric_tags_validator: ListValidator = AllowListValidator(),
     ):
         self.dogstatsd = dogstatsd_client
-        self.allow_list_validator = allow_list_validator
+        self.metrics_validator = metrics_validator
         self.metrics_tags = metrics_tags
-        self.aggregation_optimizer_enabled = aggregation_optimizer_enabled
+        self.metric_tags_validator = metric_tags_validator
 
     @validate_stat
     def incr(
@@ -416,10 +441,12 @@ class SafeDogStatsdLogger:
     ):
         """Increment stat."""
         if self.metrics_tags and isinstance(tags, dict):
-            tags_list = [f"{key}:{value}" for key, value in tags.items()]
+            tags_list = [
+                f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
+            ]
         else:
             tags_list = []
-        if self.allow_list_validator.test(stat):
+        if self.metrics_validator.test(stat):
             return self.dogstatsd.increment(metric=stat, value=count, tags=tags_list, sample_rate=rate)
         return None
 
@@ -434,10 +461,12 @@ class SafeDogStatsdLogger:
     ):
         """Decrement stat."""
         if self.metrics_tags and isinstance(tags, dict):
-            tags_list = [f"{key}:{value}" for key, value in tags.items()]
+            tags_list = [
+                f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
+            ]
         else:
             tags_list = []
-        if self.allow_list_validator.test(stat):
+        if self.metrics_validator.test(stat):
             return self.dogstatsd.decrement(metric=stat, value=count, tags=tags_list, sample_rate=rate)
         return None
 
@@ -453,10 +482,12 @@ class SafeDogStatsdLogger:
     ):
         """Gauge stat."""
         if self.metrics_tags and isinstance(tags, dict):
-            tags_list = [f"{key}:{value}" for key, value in tags.items()]
+            tags_list = [
+                f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
+            ]
         else:
             tags_list = []
-        if self.allow_list_validator.test(stat):
+        if self.metrics_validator.test(stat):
             return self.dogstatsd.gauge(metric=stat, value=value, tags=tags_list, sample_rate=rate)
         return None
 
@@ -470,10 +501,12 @@ class SafeDogStatsdLogger:
     ):
         """Stats timing."""
         if self.metrics_tags and isinstance(tags, dict):
-            tags_list = [f"{key}:{value}" for key, value in tags.items()]
+            tags_list = [
+                f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
+            ]
         else:
             tags_list = []
-        if self.allow_list_validator.test(stat):
+        if self.metrics_validator.test(stat):
             if isinstance(dt, datetime.timedelta):
                 dt = dt.total_seconds()
             return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list)
@@ -489,10 +522,12 @@ class SafeDogStatsdLogger:
     ):
         """Timer metric that can be cancelled."""
         if self.metrics_tags and isinstance(tags, dict):
-            tags_list = [f"{key}:{value}" for key, value in tags.items()]
+            tags_list = [
+                f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
+            ]
         else:
             tags_list = []
-        if stat and self.allow_list_validator.test(stat):
+        if stat and self.metrics_validator.test(stat):
             return Timer(self.dogstatsd.timed(stat, *args, tags=tags_list, **kwargs))
         return Timer()
 
@@ -547,14 +582,19 @@ class _Stats(type):
             port=conf.getint("metrics", "statsd_port"),
             prefix=conf.get("metrics", "statsd_prefix"),
         )
-        allow_list_validator = AllowListValidator(conf.get("metrics", "statsd_allow_list", fallback=None))
-        aggregation_optimizer_enabled = conf.get(
-            "metrics", "statsd_aggregation_optimized_naming_enabled", fallback=False
-        )
+        if conf.get("metrics", "statsd_allow_list", fallback=None):
+            metrics_validator = AllowListValidator(conf.get("metrics", "statsd_allow_list"))
+            if conf.get("metrics", "statsd_block_list", fallback=None):
+                log.warning(
+                    "Ignoring statsd_block_list as both statsd_allow_list and statsd_block_list have been set"
+                )
+        elif conf.get("metrics", "statsd_block_list", fallback=None):
+            metrics_validator = BlockListValidator(conf.get("metrics", "statsd_block_list"))
+        else:
+            metrics_validator = AllowListValidator()
         influxdb_tags_enabled = conf.get("metrics", "statsd_influxdb_enabled", fallback=False)
-        return SafeStatsdLogger(
-            statsd, allow_list_validator, aggregation_optimizer_enabled, influxdb_tags_enabled
-        )
+        metric_tags_validator = BlockListValidator(conf.get("metrics", "statsd_disabled_tags", fallback=None))
+        return SafeStatsdLogger(statsd, metrics_validator, influxdb_tags_enabled, metric_tags_validator)
 
     @classmethod
     def get_dogstatsd_logger(cls):
@@ -567,15 +607,19 @@ class _Stats(type):
             namespace=conf.get("metrics", "statsd_prefix"),
             constant_tags=cls.get_constant_tags(),
         )
-        dogstatsd_allow_list = conf.get("metrics", "statsd_allow_list", fallback=None)
-        allow_list_validator = AllowListValidator(dogstatsd_allow_list)
+        if conf.get("metrics", "statsd_allow_list", fallback=None):
+            metrics_validator = AllowListValidator(conf.get("metrics", "statsd_allow_list"))
+            if conf.get("metrics", "statsd_block_list", fallback=None):
+                log.warning(
+                    "Ignoring statsd_block_list as both statsd_allow_list and statsd_block_list have been set"
+                )
+        elif conf.get("metrics", "statsd_block_list", fallback=None):
+            metrics_validator = BlockListValidator(conf.get("metrics", "statsd_block_list"))
+        else:
+            metrics_validator = AllowListValidator()
         datadog_metrics_tags = conf.get("metrics", "statsd_datadog_metrics_tags", fallback=True)
-        aggregation_optimizer_enabled = conf.get(
-            "metrics", "statsd_aggregation_optimized_naming_enabled", fallback=False
-        )
-        return SafeDogStatsdLogger(
-            dogstatsd, allow_list_validator, datadog_metrics_tags, aggregation_optimizer_enabled
-        )
+        metric_tags_validator = BlockListValidator(conf.get("metrics", "statsd_disabled_tags", fallback=None))
+        return SafeDogStatsdLogger(dogstatsd, metrics_validator, datadog_metrics_tags, metric_tags_validator)
 
     @classmethod
     def get_constant_tags(cls):
diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py
index c6a6109066..005133967f 100644
--- a/tests/core/test_stats.py
+++ b/tests/core/test_stats.py
@@ -27,7 +27,7 @@ import statsd
 
 import airflow
 from airflow.exceptions import AirflowConfigException, InvalidStatsNameException
-from airflow.stats import AllowListValidator, SafeDogStatsdLogger, SafeStatsdLogger
+from airflow.stats import AllowListValidator, BlockListValidator, SafeDogStatsdLogger, SafeStatsdLogger
 from tests.test_utils.config import conf_vars
 
 
@@ -126,6 +126,46 @@ class TestStats:
             airflow.stats.Stats.incr("empty_key")
         importlib.reload(airflow.stats)
 
+    def test_load_allow_list_validator(self):
+        with conf_vars(
+            {
+                ("metrics", "statsd_on"): "True",
+                ("metrics", "statsd_allow_list"): "name1,name2",
+            }
+        ):
+            importlib.reload(airflow.stats)
+            assert isinstance(airflow.stats.Stats.metrics_validator, AllowListValidator)
+            assert airflow.stats.Stats.metrics_validator.validate_list == ("name1", "name2")
+        # Avoid side-effects
+        importlib.reload(airflow.stats)
+
+    def test_load_block_list_validator(self):
+        with conf_vars(
+            {
+                ("metrics", "statsd_on"): "True",
+                ("metrics", "statsd_block_list"): "name1,name2",
+            }
+        ):
+            importlib.reload(airflow.stats)
+            assert isinstance(airflow.stats.Stats.metrics_validator, BlockListValidator)
+            assert airflow.stats.Stats.metrics_validator.validate_list == ("name1", "name2")
+        # Avoid side-effects
+        importlib.reload(airflow.stats)
+
+    def test_load_allow_and_block_list_validator_loads_only_allow_list_validator(self):
+        with conf_vars(
+            {
+                ("metrics", "statsd_on"): "True",
+                ("metrics", "statsd_allow_list"): "name1,name2",
+                ("metrics", "statsd_block_list"): "name1,name2",
+            }
+        ):
+            importlib.reload(airflow.stats)
+            assert isinstance(airflow.stats.Stats.metrics_validator, AllowListValidator)
+            assert airflow.stats.Stats.metrics_validator.validate_list == ("name1", "name2")
+        # Avoid side-effects
+        importlib.reload(airflow.stats)
+
 
 class TestDogStats:
     def setup_method(self):
@@ -240,6 +280,24 @@ class TestStatsWithAllowList:
         self.statsd_client.assert_not_called()
 
 
+class TestStatsWithBlockList:
+    def setup_method(self):
+        self.statsd_client = Mock(spec=statsd.StatsClient)
+        self.stats = SafeStatsdLogger(self.statsd_client, BlockListValidator("stats_one, stats_two"))
+
+    def test_increment_counter_with_allowed_key(self):
+        self.stats.incr("stats_one")
+        self.statsd_client.assert_not_called()
+
+    def test_increment_counter_with_allowed_prefix(self):
+        self.stats.incr("stats_two.bla")
+        self.statsd_client.assert_not_called()
+
+    def test_not_increment_counter_if_not_allowed(self):
+        self.stats.incr("stats_three")
+        self.statsd_client.incr.assert_called_once_with("stats_three", 1, 1)
+
+
 class TestDogStatsWithAllowList:
     def setup_method(self):
         pytest.importorskip("datadog")
@@ -280,6 +338,25 @@ class TestDogStatsWithMetricsTags:
         )
 
 
+class TestDogStatsWithDisabledMetricsTags:
+    def setup_method(self):
+        pytest.importorskip("datadog")
+        from datadog import DogStatsd
+
+        self.dogstatsd_client = Mock(speck=DogStatsd)
+        self.dogstatsd = SafeDogStatsdLogger(
+            self.dogstatsd_client,
+            metrics_tags=True,
+            metric_tags_validator=BlockListValidator("key1"),
+        )
+
+    def test_does_send_stats_using_dogstatsd_with_tags(self):
+        self.dogstatsd.incr("empty_key", 1, 1, tags={"key1": "value1", "key2": "value2"})
+        self.dogstatsd_client.increment.assert_called_once_with(
+            metric="empty_key", sample_rate=1, tags=["key2:value2"], value=1
+        )
+
+
 class TestStatsWithInfluxDBEnabled:
     def setup_method(self):
         with conf_vars(
@@ -289,7 +366,11 @@ class TestStatsWithInfluxDBEnabled:
             }
         ):
             self.statsd_client = Mock(spec=statsd.StatsClient)
-            self.stats = SafeStatsdLogger(self.statsd_client, influxdb_tags_enabled=True)
+            self.stats = SafeStatsdLogger(
+                self.statsd_client,
+                influxdb_tags_enabled=True,
+                metric_tags_validator=BlockListValidator("key2,key3"),
+            )
 
     def test_increment_counter(self):
         self.stats.incr(
@@ -302,16 +383,14 @@ class TestStatsWithInfluxDBEnabled:
             "test_stats_run.delay",
             tags={"key0": "val0", "key1": "val1", "key2": "val2"},
         )
-        self.statsd_client.incr.assert_called_once_with(
-            "test_stats_run.delay,key0=val0,key1=val1,key2=val2", 1, 1
-        )
+        self.statsd_client.incr.assert_called_once_with("test_stats_run.delay,key0=val0,key1=val1", 1, 1)
 
     def test_does_not_increment_counter_drops_invalid_tags(self):
         self.stats.incr(
             "test_stats_run.delay",
-            tags={"key0,": "val0", "key1": "val1", "key2": "val2"},
+            tags={"key0,": "val0", "key1": "val1", "key2": "val2", "key3": "val3"},
         )
-        self.statsd_client.incr.assert_called_once_with("test_stats_run.delay,key1=val1,key2=val2", 1, 1)
+        self.statsd_client.incr.assert_called_once_with("test_stats_run.delay,key1=val1", 1, 1)
 
 
 def always_invalid(stat_name):