You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "uranusjr (via GitHub)" <gi...@apache.org> on 2023/02/02 06:39:04 UTC

[GitHub] [airflow] uranusjr commented on a diff in pull request #29093: Enable tagged metric names for existing Statsd metric publishing events | influxdb-statsd support

uranusjr commented on code in PR #29093:
URL: https://github.com/apache/airflow/pull/29093#discussion_r1094097614


##########
airflow/config_templates/config.yml:
##########
@@ -867,6 +867,13 @@ metrics:
       type: string
       example: ~
       default: ~
+    statsd_influxdb_enabled:
+      description: |
+        To enable sending airflow metrics with StatsD-Influxdb tagging convention.

Review Comment:
   ```suggestion
           To enable sending Airflow metrics with StatsD-Influxdb tagging convention.
   ```



##########
airflow/jobs/scheduler_job.py:
##########
@@ -1240,7 +1240,13 @@ def _update_state(dag: DAG, dag_run: DagRun):
                 # always happening immediately after the data interval.
                 expected_start_date = dag.get_run_data_interval(dag_run).end
                 schedule_delay = dag_run.start_date - expected_start_date
+                # Publish metrics twice with backward compatible name, and then with tags
                 Stats.timing(f"dagrun.schedule_delay.{dag.dag_id}", schedule_delay)
+                Stats.timing(
+                    "dagrun.schedule_delay",
+                    schedule_delay,
+                    tags={"dag_id": f"{dag.dag_id}"},

Review Comment:
   ```suggestion
                       tags={"dag_id": dag.dag_id},
   ```



##########
airflow/stats.py:
##########
@@ -196,7 +222,23 @@ def stat_name_default_handler(stat_name, max_length=250) -> str:
         raise InvalidStatsNameException(
             f"The stat_name ({stat_name}) has to be less than {max_length} characters."
         )
-    if not all((c in ALLOWED_CHARACTERS) for c in stat_name):
+    if not all((c in allowed_chars) for c in stat_name):
+        raise InvalidStatsNameException(
+            f"The stat name ({stat_name}) has to be composed of ASCII "
+            f"alphabets, numbers, or the underscore, dot, or dash characters."
+        )
+    return stat_name
+
+
+def stat_name_influxdb_handler(stat_name, max_length=250) -> str:
+    """InfluxDB-Statsd default name validator."""
+    if not isinstance(stat_name, str):
+        raise InvalidStatsNameException("The stat_name has to be a string")
+    if len(stat_name) > max_length:
+        raise InvalidStatsNameException(
+            f"The stat_name ({stat_name}) has to be less than {max_length} characters."
+        )
+    if not all((c in ALLOWED_CHARACTERS | set([",", "="])) for c in stat_name):

Review Comment:
   ```suggestion
       if not all((c in {*ALLOWED_CHARACTERS, ",", "="}) for c in stat_name):
   ```



##########
airflow/stats.py:
##########
@@ -206,7 +248,13 @@ def stat_name_default_handler(stat_name, max_length=250) -> str:
 
 def get_current_handler_stat_name_func() -> Callable[[str], str]:
     """Get Stat Name Handler from airflow.cfg."""
-    return conf.getimport("metrics", "stat_name_handler") or stat_name_default_handler
+    handler = conf.getimport("metrics", "stat_name_handler")
+    if handler is None:
+        if conf.get("metrics", "statsd_influxdb_enabled", fallback=False):
+            handler = partial(stat_name_default_handler, allowed_chars=ALLOWED_CHARACTERS | set([",", "="]))

Review Comment:
   ```suggestion
               handler = partial(stat_name_default_handler, allowed_chars={*ALLOWED_CHARACTERS, ",", "="})
   ```



##########
airflow/models/dagrun.py:
##########
@@ -877,6 +877,11 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis: lis
                 true_delay = first_start_date - data_interval_end
                 if true_delay.total_seconds() > 0:
                     Stats.timing(f"dagrun.{dag.dag_id}.first_task_scheduling_delay", true_delay)
+                    Stats.timing(
+                        "dagrun.first_task_scheduling_delay",
+                        true_delay,
+                        tags={"dag_id": f"{dag.dag_id}"},

Review Comment:
   ```suggestion
                           tags={"dag_id": dag.dag_id},
   ```



##########
airflow/stats.py:
##########
@@ -250,43 +298,107 @@ def test(self, stat):
             return True  # default is all metrics allowed
 
 
+def prepare_stat_with_tags(fn: T) -> T:
+    """Add tags to stat with influxdb standard format if influxdb_tags_enabled is True."""
+
+    @wraps(fn)
+    def wrapper(self, stat=None, *args, tags=None, **kwargs):
+        if self.influxdb_tags_enabled:
+            if stat is not None and tags is not None:
+                for k, v in tags.items():
+                    if not set(",=").intersection(set(v + k)):
+                        stat += f",{k}={v}"
+                    else:
+                        log.error("Dropping invalid tag: %s=%s.", k, v)
+        return fn(self, stat, *args, tags=tags, **kwargs)
+
+    return cast(T, wrapper)
+
+
 class SafeStatsdLogger:
     """StatsD Logger."""
 
-    def __init__(self, statsd_client, allow_list_validator=AllowListValidator()):
+    def __init__(
+        self,
+        statsd_client,
+        allow_list_validator=AllowListValidator(),
+        aggregation_optimizer_enabled=False,
+        influxdb_tags_enabled=False,
+    ):
         self.statsd = statsd_client
         self.allow_list_validator = allow_list_validator
+        self.aggregation_optimizer_enabled = aggregation_optimizer_enabled
+        self.influxdb_tags_enabled = influxdb_tags_enabled
 
+    @prepare_stat_with_tags
     @validate_stat
-    def incr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None):
+    def incr(
+        self,
+        stat,
+        count=1,
+        rate=1,
+        *,
+        tags: dict[str, str] | None = None,
+    ):

Review Comment:
   Add types to other arguments while we’re changing these? (Same for other functions)



##########
airflow/stats.py:
##########
@@ -250,43 +298,107 @@ def test(self, stat):
             return True  # default is all metrics allowed
 
 
+def prepare_stat_with_tags(fn: T) -> T:
+    """Add tags to stat with influxdb standard format if influxdb_tags_enabled is True."""
+
+    @wraps(fn)
+    def wrapper(self, stat=None, *args, tags=None, **kwargs):
+        if self.influxdb_tags_enabled:
+            if stat is not None and tags is not None:
+                for k, v in tags.items():
+                    if not set(",=").intersection(set(v + k)):

Review Comment:
   Probably better to use the same `all((x not in y) for x in v + k)` approach; it’s much more readable than using set here.
   
   (Or we can introduce a util function that wraps the set operation to make it easier to understand for readers)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org