You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/03/06 06:20:38 UTC
[airflow] branch main updated: Blocklist to disable specific metric tags or metric names (#29881)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 86cd79ffa7 Blocklist to disable specific metric tags or metric names (#29881)
86cd79ffa7 is described below
commit 86cd79ffa76d4e4d4abe3fe829d7797852a713a5
Author: Sung Yun <10...@users.noreply.github.com>
AuthorDate: Mon Mar 6 01:20:05 2023 -0500
Blocklist to disable specific metric tags or metric names (#29881)
---
airflow/config_templates/config.yml | 19 ++++
airflow/config_templates/default_airflow.cfg | 12 +++
airflow/stats.py | 150 +++++++++++++++++----------
tests/core/test_stats.py | 93 +++++++++++++++--
4 files changed, 214 insertions(+), 60 deletions(-)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index c9d0bf92ac..4d0d234416 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -882,6 +882,16 @@ metrics:
type: string
example: ~
default: ""
+ statsd_block_list:
+ description: |
+ If you want to avoid sending all the available metrics to StatsD,
+ you can configure a block list of prefixes (comma separated) to filter out metrics that
+ start with the elements of the list (e.g: "scheduler,executor,dagrun").
+ If statsd_allow_list and statsd_block_list are both configured, statsd_block_list is ignored
+ version_added: 2.6.0
+ type: string
+ example: ~
+ default: ""
stat_name_handler:
description: |
A function that validate the StatsD stat name, apply changes to the stat name if necessary and return
@@ -923,6 +933,15 @@ metrics:
type: string
example: ~
default: ~
+ statsd_disabled_tags:
+ description: |
+ If you want to avoid sending all the available metrics tags to StatsD,
+ you can configure a block list of prefixes (comma separated) to filter out metric tags
+ that start with the elements of the list (e.g: "job_id,run_id")
+ version_added: 2.6.0
+ type: string
+ example: job_id,run_id,dag_id,task_id
+ default: job_id,run_id
statsd_influxdb_enabled:
description: |
To enable sending Airflow metrics with StatsD-Influxdb tagging convention.
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index a8e8266a85..8454fb5300 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -486,6 +486,12 @@ statsd_prefix = airflow
# start with the elements of the list (e.g: "scheduler,executor,dagrun")
statsd_allow_list =
+# If you want to avoid sending all the available metrics to StatsD,
+# you can configure a block list of prefixes (comma separated) to filter out metrics that
+# start with the elements of the list (e.g: "scheduler,executor,dagrun").
+# If statsd_allow_list and statsd_block_list are both configured, statsd_block_list is ignored
+statsd_block_list =
+
# A function that validate the StatsD stat name, apply changes to the stat name if necessary and return
# the transformed stat name.
#
@@ -507,6 +513,12 @@ statsd_datadog_metrics_tags = True
# Note: The module path must exist on your PYTHONPATH for Airflow to pick it up
# statsd_custom_client_path =
+# If you want to avoid sending all the available metrics tags to StatsD,
+# you can configure a block list of prefixes (comma separated) to filter out metric tags
+# that start with the elements of the list (e.g: "job_id,run_id")
+# Example: statsd_disabled_tags = job_id,run_id,dag_id,task_id
+statsd_disabled_tags = job_id,run_id
+
# To enable sending Airflow metrics with StatsD-Influxdb tagging convention.
statsd_influxdb_enabled = False
diff --git a/airflow/stats.py b/airflow/stats.py
index ae528a4e66..d20ec2ad77 100644
--- a/airflow/stats.py
+++ b/airflow/stats.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+import abc
import datetime
import logging
import socket
@@ -265,22 +266,45 @@ def validate_stat(fn: T) -> T:
return cast(T, wrapper)
-class AllowListValidator:
- """Class to filter unwanted stats."""
+class ListValidator(metaclass=abc.ABCMeta):
+ """
+ ListValidator metaclass that can be implemented as a AllowListValidator
+ or BlockListValidator. The test method must be overridden by its subclass.
+ """
- def __init__(self, allow_list=None):
- if allow_list:
+ def __init__(self, validate_list: str | None = None) -> None:
+ self.validate_list: tuple[str, ...] | None = (
+ tuple(item.strip().lower() for item in validate_list.split(",")) if validate_list else None
+ )
+
+ @classmethod
+ def __subclasshook__(cls, subclass):
+ return hasattr(subclass, "test") and callable(subclass.test) or NotImplemented
- self.allow_list = tuple(item.strip().lower() for item in allow_list.split(","))
+ @abc.abstractmethod
+ def test(self, name):
+ """Test if name is allowed"""
+ raise NotImplementedError
+
+
+class AllowListValidator(ListValidator):
+ """AllowListValidator only allows names that match the allowed prefixes."""
+
+ def test(self, name):
+ if self.validate_list is not None:
+ return name.strip().lower().startswith(self.validate_list)
else:
- self.allow_list = None
+ return True # default is all metrics are allowed
+
- def test(self, stat):
- """Test if stat is in the Allow List."""
- if self.allow_list is not None:
- return stat.strip().lower().startswith(self.allow_list)
+class BlockListValidator(ListValidator):
+ """BlockListValidator only allows names that do not match the blocked prefixes."""
+
+ def test(self, name):
+ if self.validate_list is not None:
+ return not name.strip().lower().startswith(self.validate_list)
else:
- return True # default is all metrics allowed
+ return True # default is all metrics are allowed
def prepare_stat_with_tags(fn: T) -> T:
@@ -291,10 +315,11 @@ def prepare_stat_with_tags(fn: T) -> T:
if self.influxdb_tags_enabled:
if stat is not None and tags is not None:
for k, v in tags.items():
- if all((c not in [",", "="] for c in v + k)):
- stat += f",{k}={v}"
- else:
- log.error("Dropping invalid tag: %s=%s.", k, v)
+ if self.metric_tags_validator.test(k):
+ if all((c not in [",", "="] for c in v + k)):
+ stat += f",{k}={v}"
+ else:
+ log.error("Dropping invalid tag: %s=%s.", k, v)
return fn(self, stat, *args, tags=tags, **kwargs)
return cast(T, wrapper)
@@ -306,14 +331,14 @@ class SafeStatsdLogger:
def __init__(
self,
statsd_client,
- allow_list_validator=AllowListValidator(),
- aggregation_optimizer_enabled=False,
+ metrics_validator: ListValidator = AllowListValidator(),
influxdb_tags_enabled=False,
+ metric_tags_validator: ListValidator = AllowListValidator(),
):
self.statsd = statsd_client
- self.allow_list_validator = allow_list_validator
- self.aggregation_optimizer_enabled = aggregation_optimizer_enabled
+ self.metrics_validator = metrics_validator
self.influxdb_tags_enabled = influxdb_tags_enabled
+ self.metric_tags_validator = metric_tags_validator
@prepare_stat_with_tags
@validate_stat
@@ -326,7 +351,7 @@ class SafeStatsdLogger:
tags: dict[str, str] | None = None,
):
"""Increment stat."""
- if self.allow_list_validator.test(stat):
+ if self.metrics_validator.test(stat):
return self.statsd.incr(stat, count, rate)
return None
@@ -341,7 +366,7 @@ class SafeStatsdLogger:
tags: dict[str, str] | None = None,
):
"""Decrement stat."""
- if self.allow_list_validator.test(stat):
+ if self.metrics_validator.test(stat):
return self.statsd.decr(stat, count, rate)
return None
@@ -357,7 +382,7 @@ class SafeStatsdLogger:
tags: dict[str, str] | None = None,
):
"""Gauge stat."""
- if self.allow_list_validator.test(stat):
+ if self.metrics_validator.test(stat):
return self.statsd.gauge(stat, value, rate, delta)
return None
@@ -371,7 +396,7 @@ class SafeStatsdLogger:
tags: dict[str, str] | None = None,
):
"""Stats timing."""
- if self.allow_list_validator.test(stat):
+ if self.metrics_validator.test(stat):
return self.statsd.timing(stat, dt)
return None
@@ -385,7 +410,7 @@ class SafeStatsdLogger:
**kwargs,
):
"""Timer metric that can be cancelled."""
- if stat and self.allow_list_validator.test(stat):
+ if stat and self.metrics_validator.test(stat):
return Timer(self.statsd.timer(stat, *args, **kwargs))
return Timer()
@@ -396,14 +421,14 @@ class SafeDogStatsdLogger:
def __init__(
self,
dogstatsd_client,
- allow_list_validator=AllowListValidator(),
+ metrics_validator: ListValidator = AllowListValidator(),
metrics_tags=False,
- aggregation_optimizer_enabled=False,
+ metric_tags_validator: ListValidator = AllowListValidator(),
):
self.dogstatsd = dogstatsd_client
- self.allow_list_validator = allow_list_validator
+ self.metrics_validator = metrics_validator
self.metrics_tags = metrics_tags
- self.aggregation_optimizer_enabled = aggregation_optimizer_enabled
+ self.metric_tags_validator = metric_tags_validator
@validate_stat
def incr(
@@ -416,10 +441,12 @@ class SafeDogStatsdLogger:
):
"""Increment stat."""
if self.metrics_tags and isinstance(tags, dict):
- tags_list = [f"{key}:{value}" for key, value in tags.items()]
+ tags_list = [
+ f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
+ ]
else:
tags_list = []
- if self.allow_list_validator.test(stat):
+ if self.metrics_validator.test(stat):
return self.dogstatsd.increment(metric=stat, value=count, tags=tags_list, sample_rate=rate)
return None
@@ -434,10 +461,12 @@ class SafeDogStatsdLogger:
):
"""Decrement stat."""
if self.metrics_tags and isinstance(tags, dict):
- tags_list = [f"{key}:{value}" for key, value in tags.items()]
+ tags_list = [
+ f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
+ ]
else:
tags_list = []
- if self.allow_list_validator.test(stat):
+ if self.metrics_validator.test(stat):
return self.dogstatsd.decrement(metric=stat, value=count, tags=tags_list, sample_rate=rate)
return None
@@ -453,10 +482,12 @@ class SafeDogStatsdLogger:
):
"""Gauge stat."""
if self.metrics_tags and isinstance(tags, dict):
- tags_list = [f"{key}:{value}" for key, value in tags.items()]
+ tags_list = [
+ f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
+ ]
else:
tags_list = []
- if self.allow_list_validator.test(stat):
+ if self.metrics_validator.test(stat):
return self.dogstatsd.gauge(metric=stat, value=value, tags=tags_list, sample_rate=rate)
return None
@@ -470,10 +501,12 @@ class SafeDogStatsdLogger:
):
"""Stats timing."""
if self.metrics_tags and isinstance(tags, dict):
- tags_list = [f"{key}:{value}" for key, value in tags.items()]
+ tags_list = [
+ f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
+ ]
else:
tags_list = []
- if self.allow_list_validator.test(stat):
+ if self.metrics_validator.test(stat):
if isinstance(dt, datetime.timedelta):
dt = dt.total_seconds()
return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list)
@@ -489,10 +522,12 @@ class SafeDogStatsdLogger:
):
"""Timer metric that can be cancelled."""
if self.metrics_tags and isinstance(tags, dict):
- tags_list = [f"{key}:{value}" for key, value in tags.items()]
+ tags_list = [
+ f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
+ ]
else:
tags_list = []
- if stat and self.allow_list_validator.test(stat):
+ if stat and self.metrics_validator.test(stat):
return Timer(self.dogstatsd.timed(stat, *args, tags=tags_list, **kwargs))
return Timer()
@@ -547,14 +582,19 @@ class _Stats(type):
port=conf.getint("metrics", "statsd_port"),
prefix=conf.get("metrics", "statsd_prefix"),
)
- allow_list_validator = AllowListValidator(conf.get("metrics", "statsd_allow_list", fallback=None))
- aggregation_optimizer_enabled = conf.get(
- "metrics", "statsd_aggregation_optimized_naming_enabled", fallback=False
- )
+ if conf.get("metrics", "statsd_allow_list", fallback=None):
+ metrics_validator = AllowListValidator(conf.get("metrics", "statsd_allow_list"))
+ if conf.get("metrics", "statsd_block_list", fallback=None):
+ log.warning(
+ "Ignoring statsd_block_list as both statsd_allow_list and statsd_block_list have been set"
+ )
+ elif conf.get("metrics", "statsd_block_list", fallback=None):
+ metrics_validator = BlockListValidator(conf.get("metrics", "statsd_block_list"))
+ else:
+ metrics_validator = AllowListValidator()
influxdb_tags_enabled = conf.get("metrics", "statsd_influxdb_enabled", fallback=False)
- return SafeStatsdLogger(
- statsd, allow_list_validator, aggregation_optimizer_enabled, influxdb_tags_enabled
- )
+ metric_tags_validator = BlockListValidator(conf.get("metrics", "statsd_disabled_tags", fallback=None))
+ return SafeStatsdLogger(statsd, metrics_validator, influxdb_tags_enabled, metric_tags_validator)
@classmethod
def get_dogstatsd_logger(cls):
@@ -567,15 +607,19 @@ class _Stats(type):
namespace=conf.get("metrics", "statsd_prefix"),
constant_tags=cls.get_constant_tags(),
)
- dogstatsd_allow_list = conf.get("metrics", "statsd_allow_list", fallback=None)
- allow_list_validator = AllowListValidator(dogstatsd_allow_list)
+ if conf.get("metrics", "statsd_allow_list", fallback=None):
+ metrics_validator = AllowListValidator(conf.get("metrics", "statsd_allow_list"))
+ if conf.get("metrics", "statsd_block_list", fallback=None):
+ log.warning(
+ "Ignoring statsd_block_list as both statsd_allow_list and statsd_block_list have been set"
+ )
+ elif conf.get("metrics", "statsd_block_list", fallback=None):
+ metrics_validator = BlockListValidator(conf.get("metrics", "statsd_block_list"))
+ else:
+ metrics_validator = AllowListValidator()
datadog_metrics_tags = conf.get("metrics", "statsd_datadog_metrics_tags", fallback=True)
- aggregation_optimizer_enabled = conf.get(
- "metrics", "statsd_aggregation_optimized_naming_enabled", fallback=False
- )
- return SafeDogStatsdLogger(
- dogstatsd, allow_list_validator, datadog_metrics_tags, aggregation_optimizer_enabled
- )
+ metric_tags_validator = BlockListValidator(conf.get("metrics", "statsd_disabled_tags", fallback=None))
+ return SafeDogStatsdLogger(dogstatsd, metrics_validator, datadog_metrics_tags, metric_tags_validator)
@classmethod
def get_constant_tags(cls):
diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py
index c6a6109066..005133967f 100644
--- a/tests/core/test_stats.py
+++ b/tests/core/test_stats.py
@@ -27,7 +27,7 @@ import statsd
import airflow
from airflow.exceptions import AirflowConfigException, InvalidStatsNameException
-from airflow.stats import AllowListValidator, SafeDogStatsdLogger, SafeStatsdLogger
+from airflow.stats import AllowListValidator, BlockListValidator, SafeDogStatsdLogger, SafeStatsdLogger
from tests.test_utils.config import conf_vars
@@ -126,6 +126,46 @@ class TestStats:
airflow.stats.Stats.incr("empty_key")
importlib.reload(airflow.stats)
+ def test_load_allow_list_validator(self):
+ with conf_vars(
+ {
+ ("metrics", "statsd_on"): "True",
+ ("metrics", "statsd_allow_list"): "name1,name2",
+ }
+ ):
+ importlib.reload(airflow.stats)
+ assert isinstance(airflow.stats.Stats.metrics_validator, AllowListValidator)
+ assert airflow.stats.Stats.metrics_validator.validate_list == ("name1", "name2")
+ # Avoid side-effects
+ importlib.reload(airflow.stats)
+
+ def test_load_block_list_validator(self):
+ with conf_vars(
+ {
+ ("metrics", "statsd_on"): "True",
+ ("metrics", "statsd_block_list"): "name1,name2",
+ }
+ ):
+ importlib.reload(airflow.stats)
+ assert isinstance(airflow.stats.Stats.metrics_validator, BlockListValidator)
+ assert airflow.stats.Stats.metrics_validator.validate_list == ("name1", "name2")
+ # Avoid side-effects
+ importlib.reload(airflow.stats)
+
+ def test_load_allow_and_block_list_validator_loads_only_allow_list_validator(self):
+ with conf_vars(
+ {
+ ("metrics", "statsd_on"): "True",
+ ("metrics", "statsd_allow_list"): "name1,name2",
+ ("metrics", "statsd_block_list"): "name1,name2",
+ }
+ ):
+ importlib.reload(airflow.stats)
+ assert isinstance(airflow.stats.Stats.metrics_validator, AllowListValidator)
+ assert airflow.stats.Stats.metrics_validator.validate_list == ("name1", "name2")
+ # Avoid side-effects
+ importlib.reload(airflow.stats)
+
class TestDogStats:
def setup_method(self):
@@ -240,6 +280,24 @@ class TestStatsWithAllowList:
self.statsd_client.assert_not_called()
+class TestStatsWithBlockList:
+ def setup_method(self):
+ self.statsd_client = Mock(spec=statsd.StatsClient)
+ self.stats = SafeStatsdLogger(self.statsd_client, BlockListValidator("stats_one, stats_two"))
+
+ def test_increment_counter_with_allowed_key(self):
+ self.stats.incr("stats_one")
+ self.statsd_client.assert_not_called()
+
+ def test_increment_counter_with_allowed_prefix(self):
+ self.stats.incr("stats_two.bla")
+ self.statsd_client.assert_not_called()
+
+ def test_not_increment_counter_if_not_allowed(self):
+ self.stats.incr("stats_three")
+ self.statsd_client.incr.assert_called_once_with("stats_three", 1, 1)
+
+
class TestDogStatsWithAllowList:
def setup_method(self):
pytest.importorskip("datadog")
@@ -280,6 +338,25 @@ class TestDogStatsWithMetricsTags:
)
+class TestDogStatsWithDisabledMetricsTags:
+ def setup_method(self):
+ pytest.importorskip("datadog")
+ from datadog import DogStatsd
+
+ self.dogstatsd_client = Mock(speck=DogStatsd)
+ self.dogstatsd = SafeDogStatsdLogger(
+ self.dogstatsd_client,
+ metrics_tags=True,
+ metric_tags_validator=BlockListValidator("key1"),
+ )
+
+ def test_does_send_stats_using_dogstatsd_with_tags(self):
+ self.dogstatsd.incr("empty_key", 1, 1, tags={"key1": "value1", "key2": "value2"})
+ self.dogstatsd_client.increment.assert_called_once_with(
+ metric="empty_key", sample_rate=1, tags=["key2:value2"], value=1
+ )
+
+
class TestStatsWithInfluxDBEnabled:
def setup_method(self):
with conf_vars(
@@ -289,7 +366,11 @@ class TestStatsWithInfluxDBEnabled:
}
):
self.statsd_client = Mock(spec=statsd.StatsClient)
- self.stats = SafeStatsdLogger(self.statsd_client, influxdb_tags_enabled=True)
+ self.stats = SafeStatsdLogger(
+ self.statsd_client,
+ influxdb_tags_enabled=True,
+ metric_tags_validator=BlockListValidator("key2,key3"),
+ )
def test_increment_counter(self):
self.stats.incr(
@@ -302,16 +383,14 @@ class TestStatsWithInfluxDBEnabled:
"test_stats_run.delay",
tags={"key0": "val0", "key1": "val1", "key2": "val2"},
)
- self.statsd_client.incr.assert_called_once_with(
- "test_stats_run.delay,key0=val0,key1=val1,key2=val2", 1, 1
- )
+ self.statsd_client.incr.assert_called_once_with("test_stats_run.delay,key0=val0,key1=val1", 1, 1)
def test_does_not_increment_counter_drops_invalid_tags(self):
self.stats.incr(
"test_stats_run.delay",
- tags={"key0,": "val0", "key1": "val1", "key2": "val2"},
+ tags={"key0,": "val0", "key1": "val1", "key2": "val2", "key3": "val3"},
)
- self.statsd_client.incr.assert_called_once_with("test_stats_run.delay,key1=val1,key2=val2", 1, 1)
+ self.statsd_client.incr.assert_called_once_with("test_stats_run.delay,key1=val1", 1, 1)
def always_invalid(stat_name):