You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/09 19:00:29 UTC

[GitHub] [airflow] syun64 commented on issue #11463: Tags rather than names in variable parts of the metrics

syun64 commented on issue #11463:
URL: https://github.com/apache/airflow/issues/11463#issuecomment-1376134805

   Hi folks - we are actively interested in working on this item as users of Airflow x Graphana dashboards
   
   The current state of airflow-stats module without any tagging capability is very limiting for us. Here are some example use cases where introducing tagging would be a significant improvement.
   1.  Setting up Graphana dashboards to track and aggregate similar metrics (e.g. "dag.<dag_id>.<task_id>.duration") is difficult, as each new task or DAG results in a new name. If we use dag.duration with tags like dag_id="dag_1", task_id = "task_x" instead, our metric aggregation will be greatly simplified on Graphana
   2. Setting up alerting rules against similar statsd metrics (e.g. "operator_failures_<operator_name>") is difficult as each new task or DAG results in a new name. If we were to publish this metric as "operator_failure" with tag operator_name = "operator_y", we will be able to set up alerting rules against the metric "operator_failure"
   
   As users of [influxdb-statsd](https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/), we are interested in introducing an optional config-driven change that will allow us add these input variable values as tags rather than as concatenated names.
   
   Keeping in mind that different implementations of Statsd has different ways of tagging, I think it would be necessary to have different implementations of the tagging interface for different supported statsd plugins. Similar to how we instantiate the [SafeDogStatsdLogger](https://github.com/apache/airflow/blob/main/airflow/stats.py#L294) if we have the option "statsd_datadog_enabled" set to True, we could instantiate the SafeInfluxDBStatsdLogger that implements the specified tagging approach if statsd_influxdb_enabled is set to True.
   
   As a part of this exercise, I'm wondering if revising the current [StatsLogger](https://github.com/apache/airflow/blob/main/airflow/stats.py#L53) Protocol to have "tags" parameter as an OrderedDict in its method signatures would be a good idea. Just as it has already been proposed before on https://github.com/apache/airflow/pull/12158 we could then use these **input tags** to either just publish the tags to tag-supported plugin, or concatenate the tags in the given order at the end of the stats name.
   
   ```
   class StatsLogger(Protocol):
       @classmethod
       def incr(cls, stat: str, count: int = 1, rate: int = 1, tags: OrderedDict[str,Union[str,int]]) -> None:
           """Increment stat."""
   ...
   
   ```
   
   ```
   def concatenate_tags_to_classic_stat(fn: T) -> T:
       """
       Concatenate tags to statname if using classic Statsd
       """
   
       @wraps(fn)
       def wrapper(_self, stat=None, tags=OrderedDict(), *args, **kwargs):
           if stat is not None:
               for k, v in tags.items():
                   stat += f".{v}"
           return fn(_self, stat, *args, **kwargs)
   
       return cast(T, wrapper)
   
   class SafeStatsdLogger:
       @concatenate_tags_to_classic_stat
       @validate_stat
       def incr(self, stat, count=1, rate=1):
           """Increment stat."""
           if self.allow_list_validator.test(stat):
               return self.statsd.incr(stat, count, rate)
           return None
   ...
   
   ```
   
   ```
   class SafeDogStatsdLogger:
       @validate_stat
       def incr(self, stat, count=1, rate=1, tags=None):
           """Increment stat."""
           if self.allow_list_validator.test(stat):
               tags = [f"{k}:{v}" for k, v in tags.items()] if tags else None
               return self.dogstatsd.increment(metric=stat, value=count, tags=tags, sample_rate=rate)
           return None
   ...
   
   ```
   
   ```
   def concatenate_tags_to_influxdb_statname(fn: T) -> T:
       """
       Concatenate tags to statname as InfluxDB recognized Statsd tags
       """
   
       @wraps(fn)
       def wrapper(_self, stat=None, tags=OrderedDict(), *args, **kwargs):
           if stat is not None:
               for k, v in tags.items():
                   stat += f",{k}={v}"
           return fn(_self, stat, *args, **kwargs)
   
       return cast(T, wrapper)
   
   class SafeInfluxDBStatsdLogger:
       @concatenate_tags_to_influxdb_statname
       @validate_stat
       def incr(self, stat, count=1, rate=1, tags=None):
           """Increment stat."""
           if self.allow_list_validator.test(stat):
               return self.statsd.incr(stat, count, rate)
           return None
   ...
   
   ```
   
   There are some open questions regarding how this proposal would be a breaking change for some of the existing metrics like: "dag.<dag_id>.<task_id>.duration", and whether or not we will support the addition of new tags to existing metrics as breaking changes to Classic Statsd. However, I think supporting tags on statsd plugins and normalizing the tag publishing interface is an improvement worth the effort to revisit this conversation. Please let me know what your thoughts are on this proposal, and whether you'd be open to me opening a PR and organizing some follow up discussions to formalize the proposal.
   @potiuk @ashb 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org