You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mbode <gi...@git.apache.org> on 2017/05/06 10:52:07 UTC

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

GitHub user mbode opened a pull request:

    https://github.com/apache/flink/pull/3833

    [FLINK-6221] Add PrometheusReporter

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mbode/flink PrometheusReporter

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3833.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3833
    
----
commit 9c1889abcd5591d89dde3d5b032b6c54d4d518ba
Author: Maximilian Bode <ma...@tngtech.com>
Date:   2017-05-06T00:49:42Z

    [FLINK-6221] Add PrometheusReporter

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by mbode <gi...@git.apache.org>.
Github user mbode commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    Oh, I broke the stricter checkstyle. To me, it feels a bit weird to have to put javadoc on tests, is that intended?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by mbode <gi...@git.apache.org>.
Github user mbode commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    Okay, guava is not used anymore. I am not sure about the shading part. Would you expect either prometheus clients or nanohttpd to be used in Flink user code? Or are there other advantages to shading? If so, could you point me to a module I could copy the "Flink way of shading" from?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by mbode <gi...@git.apache.org>.
Github user mbode commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    @zentol Would you mind checking that I got the shading right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116368563
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import com.google.common.base.CharMatcher;
    +import com.google.common.collect.ImmutableList;
    +import fi.iki.elonen.NanoHTTPD;
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.common.TextFormat;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.metrics.CharacterFilter;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.HistogramStatistics;
    +import org.apache.flink.metrics.Meter;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +
    +import static com.google.common.collect.Iterables.toArray;
    +
    +@PublicEvolving
    +public class PrometheusReporter implements MetricReporter {
    +	private static final Logger log = LoggerFactory.getLogger(PrometheusReporter.class);
    +
    +	static final         String ARG_PORT     = "port";
    +	private static final int    DEFAULT_PORT = 9249;
    +
    +	private static final Pattern         UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
    +	private static final CharacterFilter CHARACTER_FILTER       = new CharacterFilter() {
    +		@Override
    +		public String filterCharacters(String input) {
    +			return replaceInvalidChars(input);
    +		}
    +	};
    +
    +	private static final char SCOPE_SEPARATOR = '_';
    +
    +	private PrometheusEndpoint prometheusEndpoint;
    +	private Map<String, Collector> collectorsByMetricName = new HashMap<>();
    +
    +	@VisibleForTesting
    +	static String replaceInvalidChars(final String input) {
    +		// https://prometheus.io/docs/instrumenting/writing_exporters/
    +		// Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore.
    +		return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
    +	}
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +		int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
    +		log.info("Using port {}.", port);
    +		prometheusEndpoint = new PrometheusEndpoint(port);
    +		try {
    +			prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
    +		} catch (IOException e) {
    +			log.error("Could not start PrometheusEndpoint on port " + port, e);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		prometheusEndpoint.stop();
    --- End diff --
    
    Can this throw an exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116368588
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import com.google.common.base.CharMatcher;
    +import com.google.common.collect.ImmutableList;
    +import fi.iki.elonen.NanoHTTPD;
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.common.TextFormat;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.metrics.CharacterFilter;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.HistogramStatistics;
    +import org.apache.flink.metrics.Meter;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +
    +import static com.google.common.collect.Iterables.toArray;
    +
    +@PublicEvolving
    +public class PrometheusReporter implements MetricReporter {
    +	private static final Logger log = LoggerFactory.getLogger(PrometheusReporter.class);
    +
    +	static final         String ARG_PORT     = "port";
    +	private static final int    DEFAULT_PORT = 9249;
    +
    +	private static final Pattern         UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
    +	private static final CharacterFilter CHARACTER_FILTER       = new CharacterFilter() {
    +		@Override
    +		public String filterCharacters(String input) {
    +			return replaceInvalidChars(input);
    +		}
    +	};
    +
    +	private static final char SCOPE_SEPARATOR = '_';
    +
    +	private PrometheusEndpoint prometheusEndpoint;
    +	private Map<String, Collector> collectorsByMetricName = new HashMap<>();
    +
    +	@VisibleForTesting
    +	static String replaceInvalidChars(final String input) {
    +		// https://prometheus.io/docs/instrumenting/writing_exporters/
    +		// Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore.
    +		return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
    +	}
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +		int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
    +		log.info("Using port {}.", port);
    +		prometheusEndpoint = new PrometheusEndpoint(port);
    +		try {
    +			prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
    +		} catch (IOException e) {
    +			log.error("Could not start PrometheusEndpoint on port " + port, e);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		prometheusEndpoint.stop();
    +		CollectorRegistry.defaultRegistry.clear();
    +	}
    +
    +	@Override
    +	public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) {
    +		final String scope = ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
    +		List<String> dimensionKeys = new LinkedList<>();
    +		List<String> dimensionValues = new LinkedList<>();
    +		for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
    +			dimensionKeys.add(CHARACTER_FILTER.filterCharacters(CharMatcher.anyOf("<>").trimFrom(dimension.getKey())));
    +			dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
    +		}
    +
    +		final String validMetricName = scope + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName);
    +		final String metricIdentifier = group.getMetricIdentifier(metricName);
    +		final Collector collector;
    +		if (metric instanceof Gauge) {
    +			collector = createGauge((Gauge) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Counter) {
    +			collector = createGauge((Counter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Histogram) {
    +			collector = createSummary((Histogram) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Meter) {
    +			collector = createCounter((Meter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else {
    +			log.error("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
    --- End diff --
    
    By convention classes that are part of the metric system never log something as errors but only warnings, the reason being that errors are something that interfere with the job being executed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116377143
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---
    @@ -133,61 +136,58 @@ public void notifyOfRemovedMetric(final Metric metric, final String metricName,
     		collectorsByMetricName.remove(metricName);
     	}
     
    +	@SuppressWarnings("unchecked")
    +	private static String getLogicalScope(MetricGroup group) {
    +		return ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
    +	}
    +
     	private Collector createGauge(final Gauge gauge, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
    -		return io.prometheus.client.Gauge
    -			.build()
    -			.name(name)
    -			.help(identifier)
    -			.labelNames(toArray(labelNames, String.class))
    -			.create()
    -			.setChild(new io.prometheus.client.Gauge.Child() {
    -				@Override
    -				public double get() {
    -					final Object value = gauge.getValue();
    -					if (value instanceof Double) {
    -						return (double) value;
    -					}
    -					if (value instanceof Number) {
    -						return ((Number) value).doubleValue();
    -					} else if (value instanceof Boolean) {
    -						return ((Boolean) value) ? 1 : 0;
    -					} else {
    -						log.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.",
    -							gauge, value.getClass().getName());
    -						return 0;
    -					}
    +		return newGauge(name, identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() {
    +			@Override
    +			public double get() {
    +				final Object value = gauge.getValue();
    +				if (value instanceof Double) {
    +					return (double) value;
    +				}
    +				if (value instanceof Number) {
    +					return ((Number) value).doubleValue();
    +				} else if (value instanceof Boolean) {
    +					return ((Boolean) value) ? 1 : 0;
    +				} else {
    +					LOG.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.",
    +						gauge, value.getClass().getName());
    +					return 0;
     				}
    -			}, toArray(labelValues, String.class));
    +			}
    +		});
     	}
     
     	private static Collector createGauge(final Counter counter, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
    -		return io.prometheus.client.Gauge
    -			.build()
    -			.name(name)
    -			.help(identifier)
    -			.labelNames(toArray(labelNames, String.class))
    -			.create()
    -			.setChild(new io.prometheus.client.Gauge.Child() {
    -				@Override
    -				public double get() {
    -					return (double) counter.getCount();
    -				}
    -			}, toArray(labelValues, String.class));
    +		return newGauge(name, identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() {
    +			@Override
    +			public double get() {
    +				return (double) counter.getCount();
    +			}
    +		});
    +	}
    +
    +	private Collector createGauge(final Meter meter, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
    +		return newGauge(name + "_rate", identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() {
    --- End diff --
    
    Meter metrics should contain "rate" in their name already, so we don't have to append "_rate" here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by mbode <gi...@git.apache.org>.
Github user mbode commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    @hadronzoo I tried to keep it as general as possible by exporting all variables available to the metric group as labels. I am not sure if this might lead to [label overuse](https://prometheus.io/docs/practices/instrumentation/#do-not-overuse-labels) at some point, did you ever run into difficulties in that context? Unfortunately I do not have a lot of experience running Prometheus in a production environment yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    Finally found time to try this out, and it works great, merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    Shading looks correct; dependencies are included and the correct jar is included in flink-dist.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116368532
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import com.google.common.base.CharMatcher;
    +import com.google.common.collect.ImmutableList;
    +import fi.iki.elonen.NanoHTTPD;
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.common.TextFormat;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.metrics.CharacterFilter;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.HistogramStatistics;
    +import org.apache.flink.metrics.Meter;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +
    +import static com.google.common.collect.Iterables.toArray;
    +
    +@PublicEvolving
    +public class PrometheusReporter implements MetricReporter {
    +	private static final Logger log = LoggerFactory.getLogger(PrometheusReporter.class);
    +
    +	static final         String ARG_PORT     = "port";
    +	private static final int    DEFAULT_PORT = 9249;
    +
    +	private static final Pattern         UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
    +	private static final CharacterFilter CHARACTER_FILTER       = new CharacterFilter() {
    +		@Override
    +		public String filterCharacters(String input) {
    +			return replaceInvalidChars(input);
    +		}
    +	};
    +
    +	private static final char SCOPE_SEPARATOR = '_';
    +
    +	private PrometheusEndpoint prometheusEndpoint;
    +	private Map<String, Collector> collectorsByMetricName = new HashMap<>();
    --- End diff --
    
    Could be final.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    Another thing to consider is that this rule also covers the myriad of testing utilities that so far did not have to be documented.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116368561
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import com.google.common.base.CharMatcher;
    +import com.google.common.collect.ImmutableList;
    +import fi.iki.elonen.NanoHTTPD;
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.common.TextFormat;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.metrics.CharacterFilter;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.HistogramStatistics;
    +import org.apache.flink.metrics.Meter;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +
    +import static com.google.common.collect.Iterables.toArray;
    +
    +@PublicEvolving
    +public class PrometheusReporter implements MetricReporter {
    +	private static final Logger log = LoggerFactory.getLogger(PrometheusReporter.class);
    +
    +	static final         String ARG_PORT     = "port";
    +	private static final int    DEFAULT_PORT = 9249;
    +
    +	private static final Pattern         UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
    +	private static final CharacterFilter CHARACTER_FILTER       = new CharacterFilter() {
    +		@Override
    +		public String filterCharacters(String input) {
    +			return replaceInvalidChars(input);
    +		}
    +	};
    +
    +	private static final char SCOPE_SEPARATOR = '_';
    +
    +	private PrometheusEndpoint prometheusEndpoint;
    +	private Map<String, Collector> collectorsByMetricName = new HashMap<>();
    +
    +	@VisibleForTesting
    +	static String replaceInvalidChars(final String input) {
    +		// https://prometheus.io/docs/instrumenting/writing_exporters/
    +		// Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore.
    +		return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
    +	}
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +		int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
    +		log.info("Using port {}.", port);
    +		prometheusEndpoint = new PrometheusEndpoint(port);
    +		try {
    +			prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
    +		} catch (IOException e) {
    +			log.error("Could not start PrometheusEndpoint on port " + port, e);
    --- End diff --
    
    It would be good to throw an exception here despite the current interface. Otherwise the reporter will still be notified of added/removed metrics, even though there's no benefit to it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by mbode <gi...@git.apache.org>.
Github user mbode commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    I see, makes sense. Finally got around to fixing the dependency and getting a [Green Travis](https://travis-ci.org/mbode/flink/builds/240926382).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116368522
  
    --- Diff: flink-metrics/flink-metrics-prometheus/pom.xml ---
    @@ -0,0 +1,128 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    +
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.flink</groupId>
    +		<artifactId>flink-metrics</artifactId>
    +		<version>1.3-SNAPSHOT</version>
    +		<relativePath>..</relativePath>
    +	</parent>
    +
    +	<artifactId>flink-metrics-prometheus</artifactId>
    +	<name>flink-metrics-prometheus</name>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-annotations</artifactId>
    +			<version>${project.version}</version>
    +			<scope>provided</scope>
    +		</dependency>
    +
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-runtime_2.10</artifactId>
    +			<version>${project.version}</version>
    +			<scope>provided</scope>
    +		</dependency>
    +
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-metrics-core</artifactId>
    +			<version>${project.version}</version>
    +			<scope>provided</scope>
    +		</dependency>
    +
    +		<dependency>
    +			<groupId>io.prometheus</groupId>
    +			<artifactId>simpleclient</artifactId>
    +			<version>${prometheus.version}</version>
    +		</dependency>
    +
    +		<dependency>
    +			<groupId>io.prometheus</groupId>
    +			<artifactId>simpleclient_servlet</artifactId>
    +			<version>${prometheus.version}</version>
    +		</dependency>
    +
    +		<dependency>
    +			<groupId>com.nanohttpd</groupId>
    +			<artifactId>nanohttpd</artifactId>
    +			<version>2.2.0</version>
    +		</dependency>
    +
    +		<dependency>
    +			<groupId>com.google.guava</groupId>
    --- End diff --
    
    We try to avoid guava unless absolutely necessary, and virtually all modules that do depend on it shade it away.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    Yu can find an example on how to shade here: https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-datadog/pom.xml.
    
    Shading dependencies in reporters/connectors has become a safety-precaution form our side.
    
    It is not that unlikely that user-code contains the same dependencies. For one user-code also includes other reporters, so by the very existence of this reporter there is a precedent :) Besides that something that pops up from time to time on the mailing lists is users talking to REST endpoints in their functions/source/sinks, which may also rely on http-related dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    I've glanced over this for now but it looks pretty good.
    
    If possible though i would like to not use the DropwizardExports class, for 2 reasons:
    
    1) We now introduce this odd pattern of extending `ScheduledDropwizardReporter` without actually being scheduled nor creating an actual Dropwizard reporter.
    
    2) I've looked into the source code and it has the typical Dropwizard reporter problem where absolutely nothing gets cached and effectively constant objects are re-created again and again. We can make this way more efficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116368530
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import com.google.common.base.CharMatcher;
    +import com.google.common.collect.ImmutableList;
    +import fi.iki.elonen.NanoHTTPD;
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.common.TextFormat;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.metrics.CharacterFilter;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.HistogramStatistics;
    +import org.apache.flink.metrics.Meter;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +
    +import static com.google.common.collect.Iterables.toArray;
    +
    +@PublicEvolving
    +public class PrometheusReporter implements MetricReporter {
    +	private static final Logger log = LoggerFactory.getLogger(PrometheusReporter.class);
    +
    +	static final         String ARG_PORT     = "port";
    --- End diff --
    
    The indentation here in inconsistent with the rest of the code base.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3833


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    I think it is intended. The majority of tests can be (and are) commented with "Tests for the {@link <classToTest>}.", which is totally OK because it tells us that this is the general test battery for that class and not something else. These are also easy to add so there's not a real overhead in doing so.
    
    But this doesn't apply to all of them; and you only really notice the missing javadoc when you stumble on a test and you spend 10 minutes trying to figure out what it is doing because the method names aren't helping at all.
    
    The widely-accepted notion that the name of a class and test method are supposed to be enough documentation for test is quite questionable IMO; and by enforcing a javadoc on the class we give a little nudge to make the developer hopefully think about whether something needs to be documented or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by mbode <gi...@git.apache.org>.
Github user mbode commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r119661118
  
    --- Diff: flink-metrics/flink-metrics-prometheus/pom.xml ---
    @@ -0,0 +1,129 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    +
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.flink</groupId>
    +		<artifactId>flink-metrics</artifactId>
    +		<version>1.3-SNAPSHOT</version>
    --- End diff --
    
    Done 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116368696
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import com.google.common.base.CharMatcher;
    +import com.google.common.collect.ImmutableList;
    +import fi.iki.elonen.NanoHTTPD;
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.common.TextFormat;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.metrics.CharacterFilter;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.HistogramStatistics;
    +import org.apache.flink.metrics.Meter;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +
    +import static com.google.common.collect.Iterables.toArray;
    +
    +@PublicEvolving
    +public class PrometheusReporter implements MetricReporter {
    +	private static final Logger log = LoggerFactory.getLogger(PrometheusReporter.class);
    +
    +	static final         String ARG_PORT     = "port";
    +	private static final int    DEFAULT_PORT = 9249;
    +
    +	private static final Pattern         UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
    +	private static final CharacterFilter CHARACTER_FILTER       = new CharacterFilter() {
    +		@Override
    +		public String filterCharacters(String input) {
    +			return replaceInvalidChars(input);
    +		}
    +	};
    +
    +	private static final char SCOPE_SEPARATOR = '_';
    +
    +	private PrometheusEndpoint prometheusEndpoint;
    +	private Map<String, Collector> collectorsByMetricName = new HashMap<>();
    +
    +	@VisibleForTesting
    +	static String replaceInvalidChars(final String input) {
    +		// https://prometheus.io/docs/instrumenting/writing_exporters/
    +		// Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore.
    +		return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
    +	}
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +		int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
    +		log.info("Using port {}.", port);
    +		prometheusEndpoint = new PrometheusEndpoint(port);
    +		try {
    +			prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
    +		} catch (IOException e) {
    +			log.error("Could not start PrometheusEndpoint on port " + port, e);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		prometheusEndpoint.stop();
    +		CollectorRegistry.defaultRegistry.clear();
    +	}
    +
    +	@Override
    +	public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) {
    +		final String scope = ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
    +		List<String> dimensionKeys = new LinkedList<>();
    +		List<String> dimensionValues = new LinkedList<>();
    +		for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
    +			dimensionKeys.add(CHARACTER_FILTER.filterCharacters(CharMatcher.anyOf("<>").trimFrom(dimension.getKey())));
    +			dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
    +		}
    +
    +		final String validMetricName = scope + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName);
    +		final String metricIdentifier = group.getMetricIdentifier(metricName);
    +		final Collector collector;
    +		if (metric instanceof Gauge) {
    +			collector = createGauge((Gauge) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Counter) {
    +			collector = createGauge((Counter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Histogram) {
    +			collector = createSummary((Histogram) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Meter) {
    +			collector = createCounter((Meter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else {
    +			log.error("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
    +				metric.getClass().getName());
    +			return;
    +		}
    +		collector.register();
    +		collectorsByMetricName.put(metricName, collector);
    +	}
    +
    +	@Override
    +	public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) {
    +		CollectorRegistry.defaultRegistry.unregister(collectorsByMetricName.get(metricName));
    +		collectorsByMetricName.remove(metricName);
    +	}
    +
    +	private Collector createGauge(final Gauge gauge, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
    +		return io.prometheus.client.Gauge
    +			.build()
    +			.name(name)
    +			.help(identifier)
    +			.labelNames(toArray(labelNames, String.class))
    +			.create()
    +			.setChild(new io.prometheus.client.Gauge.Child() {
    +				@Override
    +				public double get() {
    +					final Object value = gauge.getValue();
    +					if (value instanceof Double) {
    +						return (double) value;
    +					}
    +					if (value instanceof Number) {
    +						return ((Number) value).doubleValue();
    +					} else if (value instanceof Boolean) {
    +						return ((Boolean) value) ? 1 : 0;
    +					} else {
    +						log.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.",
    +							gauge, value.getClass().getName());
    +						return 0;
    +					}
    +				}
    +			}, toArray(labelValues, String.class));
    +	}
    +
    +	private static Collector createGauge(final Counter counter, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
    +		return io.prometheus.client.Gauge
    +			.build()
    +			.name(name)
    +			.help(identifier)
    +			.labelNames(toArray(labelNames, String.class))
    +			.create()
    +			.setChild(new io.prometheus.client.Gauge.Child() {
    +				@Override
    +				public double get() {
    +					return (double) counter.getCount();
    +				}
    +			}, toArray(labelValues, String.class));
    +	}
    +
    +	private Collector createCounter(final Meter meter, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
    +		return io.prometheus.client.Counter
    +			.build()
    +			.name(name + "_total")
    +			.help(identifier)
    +			.labelNames(toArray(labelNames, String.class))
    +			.create()
    +			.setChild(new io.prometheus.client.Counter.Child() {
    +				@Override
    +				public double get() {
    +					return (double) meter.getCount();
    +				}
    +			}, toArray(labelValues, String.class));
    +	}
    +
    +	private static HistogramSummaryProxy createSummary(final Histogram histogram, final String name, final String identifier, final List<String> dimensionKeys, final List<String> dimensionValues) {
    +		return new HistogramSummaryProxy(histogram, name, identifier, dimensionKeys, dimensionValues);
    +	}
    +
    +	static class PrometheusEndpoint extends NanoHTTPD {
    +		static final String MIME_TYPE = "plain/text";
    +
    +		PrometheusEndpoint(int port) {
    +			super(port);
    +		}
    +
    +		@Override
    +		public Response serve(IHTTPSession session) {
    +			if (session.getUri().equals("/metrics")) {
    +				StringWriter writer = new StringWriter();
    +				try {
    +					TextFormat.write004(writer, CollectorRegistry.defaultRegistry.metricFamilySamples());
    +				} catch (IOException e) {
    +					return newFixedLengthResponse(Response.Status.INTERNAL_ERROR, MIME_TYPE, "Unable to output metrics");
    +				}
    +				return newFixedLengthResponse(Response.Status.OK, TextFormat.CONTENT_TYPE_004, writer.toString());
    +			} else {
    +				return newFixedLengthResponse(Response.Status.NOT_FOUND, MIME_TYPE, "Not found");
    +			}
    +		}
    +	}
    +
    +	private static class HistogramSummaryProxy extends Collector {
    +		private static final ImmutableList<Double> QUANTILES = ImmutableList.of(.5, .75, .95, .98, .99, .999);
    +
    +		private final Histogram    histogram;
    +		private final String       metricName;
    +		private final String       metricIdentifier;
    +		private final List<String> labelNamesWithQuantile;
    +		private final List<String> labelValues;
    +
    +		HistogramSummaryProxy(final Histogram histogram, final String metricName, final String metricIdentifier, final List<String> labelNames, final List<String> labelValues) {
    +			this.histogram = histogram;
    +			this.metricName = metricName;
    +			this.metricIdentifier = metricIdentifier;
    +			labelNamesWithQuantile = ImmutableList.<String>builder().addAll(labelNames).add("quantile").build();
    --- End diff --
    
    should be prefixed with `this` as well for consistency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    As it stands a metric can have at most 10 labels, and there aren't any current plans to extend this set; according to the docs that's still ok. If this does indeed become a problem we can add a switch to rely on the scope formats instead, giving the user control as to how many labels there are.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    I'll take a look later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by hadronzoo <gi...@git.apache.org>.
Github user hadronzoo commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    @mbode thanks for working on this!
    
    One thing that I've found useful when exporting Flink's statsd metrics to Prometheus is to make several of the metric fields tags: like `job_name`, `task_name`, `operator_name`, etc. This [statsd-exporter mapping](https://gist.github.com/hadronzoo/621b6a6dce7e2596d5643ce8d1e954ea) has tags that have worked well for me. I'm not tagging host names or IP addresses because Prometheus's Kubernetes support does that already, but that could be useful for people running standalone clusters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116368510
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import com.google.common.base.CharMatcher;
    +import com.google.common.collect.ImmutableList;
    +import fi.iki.elonen.NanoHTTPD;
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.common.TextFormat;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.metrics.CharacterFilter;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.HistogramStatistics;
    +import org.apache.flink.metrics.Meter;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +
    +import static com.google.common.collect.Iterables.toArray;
    +
    +@PublicEvolving
    +public class PrometheusReporter implements MetricReporter {
    +	private static final Logger log = LoggerFactory.getLogger(PrometheusReporter.class);
    --- End diff --
    
    by convention this should be upper-case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by mbode <gi...@git.apache.org>.
Github user mbode commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116375209
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import com.google.common.base.CharMatcher;
    +import com.google.common.collect.ImmutableList;
    +import fi.iki.elonen.NanoHTTPD;
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.common.TextFormat;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.metrics.CharacterFilter;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.HistogramStatistics;
    +import org.apache.flink.metrics.Meter;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +
    +import static com.google.common.collect.Iterables.toArray;
    +
    +@PublicEvolving
    +public class PrometheusReporter implements MetricReporter {
    +	private static final Logger log = LoggerFactory.getLogger(PrometheusReporter.class);
    +
    +	static final         String ARG_PORT     = "port";
    +	private static final int    DEFAULT_PORT = 9249;
    +
    +	private static final Pattern         UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
    +	private static final CharacterFilter CHARACTER_FILTER       = new CharacterFilter() {
    +		@Override
    +		public String filterCharacters(String input) {
    +			return replaceInvalidChars(input);
    +		}
    +	};
    +
    +	private static final char SCOPE_SEPARATOR = '_';
    +
    +	private PrometheusEndpoint prometheusEndpoint;
    +	private Map<String, Collector> collectorsByMetricName = new HashMap<>();
    +
    +	@VisibleForTesting
    +	static String replaceInvalidChars(final String input) {
    +		// https://prometheus.io/docs/instrumenting/writing_exporters/
    +		// Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore.
    +		return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
    +	}
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +		int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
    +		log.info("Using port {}.", port);
    +		prometheusEndpoint = new PrometheusEndpoint(port);
    +		try {
    +			prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
    +		} catch (IOException e) {
    +			log.error("Could not start PrometheusEndpoint on port " + port, e);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		prometheusEndpoint.stop();
    --- End diff --
    
    I don't think so, `NanoHTTPD.stop()` seems to catch everything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116368672
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java ---
    @@ -0,0 +1,192 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import com.mashape.unirest.http.HttpResponse;
    +import com.mashape.unirest.http.Unirest;
    +import com.mashape.unirest.http.exceptions.UnirestException;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.MetricOptions;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.Meter;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.SimpleCounter;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.metrics.util.TestMeter;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
    +import org.apache.flink.runtime.metrics.util.TestingHistogram;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.After;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.ExpectedException;
    +
    +import java.util.Arrays;
    +
    +import static org.apache.flink.metrics.prometheus.PrometheusReporter.ARG_PORT;
    +import static org.apache.flink.runtime.metrics.scope.ScopeFormat.SCOPE_SEPARATOR;
    +import static org.hamcrest.Matchers.both;
    +import static org.hamcrest.Matchers.containsString;
    +import static org.hamcrest.Matchers.equalTo;
    +import static org.junit.Assert.assertThat;
    +
    +public class PrometheusReporterTest extends TestLogger {
    +	private static final int NON_DEFAULT_PORT = 9429;
    +
    +	private static final String HOST_NAME    = "hostname";
    +	private static final String TASK_MANAGER = "tm";
    +
    +	private static final String HELP_PREFIX    = "# HELP ";
    +	private static final String TYPE_PREFIX    = "# TYPE ";
    +	private static final String DIMENSIONS     = "host=\"" + HOST_NAME + "\",tm_id=\"" + TASK_MANAGER + "\"";
    +	private static final String DEFAULT_LABELS = "{" + DIMENSIONS + ",}";
    +	private static final String SCOPE_PREFIX   = "taskmanager_";
    +
    +	@Rule
    +	public ExpectedException thrown = ExpectedException.none();
    +
    +	private MetricRegistry registry = prepareMetricRegistry();
    --- End diff --
    
    could be final.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116368667
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java ---
    @@ -0,0 +1,192 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import com.mashape.unirest.http.HttpResponse;
    +import com.mashape.unirest.http.Unirest;
    +import com.mashape.unirest.http.exceptions.UnirestException;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.MetricOptions;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.Meter;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.SimpleCounter;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.metrics.util.TestMeter;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
    +import org.apache.flink.runtime.metrics.util.TestingHistogram;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.After;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.ExpectedException;
    +
    +import java.util.Arrays;
    +
    +import static org.apache.flink.metrics.prometheus.PrometheusReporter.ARG_PORT;
    +import static org.apache.flink.runtime.metrics.scope.ScopeFormat.SCOPE_SEPARATOR;
    +import static org.hamcrest.Matchers.both;
    +import static org.hamcrest.Matchers.containsString;
    +import static org.hamcrest.Matchers.equalTo;
    +import static org.junit.Assert.assertThat;
    +
    +public class PrometheusReporterTest extends TestLogger {
    +	private static final int NON_DEFAULT_PORT = 9429;
    +
    +	private static final String HOST_NAME    = "hostname";
    +	private static final String TASK_MANAGER = "tm";
    +
    +	private static final String HELP_PREFIX    = "# HELP ";
    +	private static final String TYPE_PREFIX    = "# TYPE ";
    +	private static final String DIMENSIONS     = "host=\"" + HOST_NAME + "\",tm_id=\"" + TASK_MANAGER + "\"";
    +	private static final String DEFAULT_LABELS = "{" + DIMENSIONS + ",}";
    +	private static final String SCOPE_PREFIX   = "taskmanager_";
    +
    +	@Rule
    +	public ExpectedException thrown = ExpectedException.none();
    +
    +	private MetricRegistry registry = prepareMetricRegistry();
    +
    +	private final MetricReporter reporter = registry.getReporters().get(0);
    +
    +	@Test
    +	public void counterIsReportedAsPrometheusGauge() throws UnirestException {
    +		//Prometheus counters may not decrease
    +		Counter testCounter = new SimpleCounter();
    +		testCounter.inc(7);
    +
    +		String counterName = "testCounter";
    +		String gaugeName = SCOPE_PREFIX + counterName;
    +
    +		assertThat(addMetricAndPollResponse(testCounter, counterName),
    +			containsString(HELP_PREFIX + gaugeName + " " + getFullMetricName(counterName) + "\n" +
    +						   TYPE_PREFIX + gaugeName + " gauge" + "\n" +
    +						   gaugeName + DEFAULT_LABELS + " 7.0"));
    +	}
    +
    +	@Test
    +	public void gaugeIsReportedAsPrometheusGauge() throws UnirestException {
    +		Gauge<Integer> testGauge = new Gauge<Integer>() {
    +			@Override
    +			public Integer getValue() {
    +				return 1;
    +			}
    +		};
    +
    +		String gaugeName = "testGauge";
    +		String prometheusGaugeName = SCOPE_PREFIX + gaugeName;
    +
    +		assertThat(addMetricAndPollResponse(testGauge, gaugeName),
    +			containsString(HELP_PREFIX + prometheusGaugeName + " " + getFullMetricName(gaugeName) + "\n" +
    +						   TYPE_PREFIX + prometheusGaugeName + " gauge" + "\n" +
    +						   prometheusGaugeName + DEFAULT_LABELS + " 1.0"));
    +	}
    +
    +	@Test
    +	public void histogramIsReportedAsPrometheusSummary() throws UnirestException {
    +		Histogram testHistogram = new TestingHistogram();
    +
    +		String histogramName = "testHistogram";
    +		String summaryName = SCOPE_PREFIX + histogramName;
    +
    +		String response = addMetricAndPollResponse(testHistogram, histogramName);
    +		assertThat(response, both(containsString(HELP_PREFIX + summaryName + " " + getFullMetricName(histogramName) + "\n" +
    +												 TYPE_PREFIX + summaryName + " summary" + "\n"))
    +			.and(containsString(summaryName + "_count" + DEFAULT_LABELS + " 1.0")));
    +		for (String quantile : Arrays.asList("0.5", "0.75", "0.95", "0.98", "0.99", "0.999")) {
    +			assertThat(response, containsString(
    +				summaryName + "{" + DIMENSIONS + ",quantile=\"" + quantile + "\",} " + quantile));
    +		}
    +	}
    +
    +	@Test
    +	public void meterTotalIsReportedAsPrometheusCounter() throws UnirestException {
    +		Meter testMeter = new TestMeter();
    +
    +		String meterName = "testMeter";
    +		String counterName = SCOPE_PREFIX + meterName + "_total";
    +
    +		assertThat(addMetricAndPollResponse(testMeter, meterName),
    +			containsString(HELP_PREFIX + counterName + " " + getFullMetricName(meterName) + "\n" +
    +						   TYPE_PREFIX + counterName + " counter" + "\n" +
    +						   counterName + DEFAULT_LABELS + " 100.0"));
    +	}
    +
    +	@Test
    +	public void endpointIsUnavailableAfterReporterIsClosed() throws UnirestException {
    +		reporter.close();
    +		thrown.expect(UnirestException.class);
    +		pollMetrics();
    +	}
    +
    +	@Test
    +	public void invalidCharactersAreReplacedWithUnderscore() {
    +		assertThat(PrometheusReporter.replaceInvalidChars(""), equalTo(""));
    +		assertThat(PrometheusReporter.replaceInvalidChars("abc"), equalTo("abc"));
    +		assertThat(PrometheusReporter.replaceInvalidChars("abc\""), equalTo("abc_"));
    +		assertThat(PrometheusReporter.replaceInvalidChars("\"abc"), equalTo("_abc"));
    +		assertThat(PrometheusReporter.replaceInvalidChars("\"abc\""), equalTo("_abc_"));
    +		assertThat(PrometheusReporter.replaceInvalidChars("\"a\"b\"c\""), equalTo("_a_b_c_"));
    +		assertThat(PrometheusReporter.replaceInvalidChars("\"\"\"\""), equalTo("____"));
    +		assertThat(PrometheusReporter.replaceInvalidChars("    "), equalTo("____"));
    +		assertThat(PrometheusReporter.replaceInvalidChars("\"ab ;(c)'"), equalTo("_ab___c__"));
    +		assertThat(PrometheusReporter.replaceInvalidChars("a b c"), equalTo("a_b_c"));
    +		assertThat(PrometheusReporter.replaceInvalidChars("a b c "), equalTo("a_b_c_"));
    +		assertThat(PrometheusReporter.replaceInvalidChars("a;b'c*"), equalTo("a_b_c_"));
    +		assertThat(PrometheusReporter.replaceInvalidChars("a,=;:?'b,=;:?'c"), equalTo("a___:__b___:__c"));
    +	}
    +
    +	private String addMetricAndPollResponse(Metric metric, String metricName) throws UnirestException {
    +		reporter.notifyOfAddedMetric(metric, metricName, new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER)));
    +		return pollMetrics().getBody();
    +	}
    +
    +	private static HttpResponse<String> pollMetrics() throws UnirestException {
    +		return Unirest.get("http://localhost:" + NON_DEFAULT_PORT + "/metrics").asString();
    +	}
    +
    +	private static String getFullMetricName(String metricName) {
    +		return HOST_NAME + SCOPE_SEPARATOR + "taskmanager" + SCOPE_SEPARATOR + TASK_MANAGER + SCOPE_SEPARATOR + metricName;
    +	}
    +
    +	private static MetricRegistry prepareMetricRegistry() {
    --- End diff --
    
    This method is a bit overkill for a one-liner that is called once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3833: [FLINK-6221] Add PrometheusReporter

Posted by mbode <gi...@git.apache.org>.
Github user mbode commented on the issue:

    https://github.com/apache/flink/pull/3833
  
    Thanks for looking at it so quickly! I somewhat had the same instinct as far as your first point is concerned and thought about pulling out a `DropwizardReporter` without Scheduling but decided against it to not have to touch too many places. I like your suggestion of converting metrics directly without using Dropwizard as an intermediate step and am going to try that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116376926
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import com.google.common.base.CharMatcher;
    +import com.google.common.collect.ImmutableList;
    +import fi.iki.elonen.NanoHTTPD;
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.common.TextFormat;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.metrics.CharacterFilter;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.HistogramStatistics;
    +import org.apache.flink.metrics.Meter;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +
    +import static com.google.common.collect.Iterables.toArray;
    +
    +@PublicEvolving
    +public class PrometheusReporter implements MetricReporter {
    +	private static final Logger log = LoggerFactory.getLogger(PrometheusReporter.class);
    +
    +	static final         String ARG_PORT     = "port";
    +	private static final int    DEFAULT_PORT = 9249;
    +
    +	private static final Pattern         UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
    +	private static final CharacterFilter CHARACTER_FILTER       = new CharacterFilter() {
    +		@Override
    +		public String filterCharacters(String input) {
    +			return replaceInvalidChars(input);
    +		}
    +	};
    +
    +	private static final char SCOPE_SEPARATOR = '_';
    +
    +	private PrometheusEndpoint prometheusEndpoint;
    +	private Map<String, Collector> collectorsByMetricName = new HashMap<>();
    +
    +	@VisibleForTesting
    +	static String replaceInvalidChars(final String input) {
    +		// https://prometheus.io/docs/instrumenting/writing_exporters/
    +		// Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore.
    +		return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
    +	}
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +		int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
    +		log.info("Using port {}.", port);
    +		prometheusEndpoint = new PrometheusEndpoint(port);
    +		try {
    +			prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
    +		} catch (IOException e) {
    +			log.error("Could not start PrometheusEndpoint on port " + port, e);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		prometheusEndpoint.stop();
    +		CollectorRegistry.defaultRegistry.clear();
    +	}
    +
    +	@Override
    +	public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) {
    +		final String scope = ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
    +		List<String> dimensionKeys = new LinkedList<>();
    +		List<String> dimensionValues = new LinkedList<>();
    +		for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
    +			dimensionKeys.add(CHARACTER_FILTER.filterCharacters(CharMatcher.anyOf("<>").trimFrom(dimension.getKey())));
    +			dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
    +		}
    +
    +		final String validMetricName = scope + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName);
    +		final String metricIdentifier = group.getMetricIdentifier(metricName);
    +		final Collector collector;
    +		if (metric instanceof Gauge) {
    +			collector = createGauge((Gauge) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Counter) {
    +			collector = createGauge((Counter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Histogram) {
    +			collector = createSummary((Histogram) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Meter) {
    +			collector = createCounter((Meter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else {
    +			log.error("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
    --- End diff --
    
    That should be done in a seaprate issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by mbode <gi...@git.apache.org>.
Github user mbode commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116376242
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import com.google.common.base.CharMatcher;
    +import com.google.common.collect.ImmutableList;
    +import fi.iki.elonen.NanoHTTPD;
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.common.TextFormat;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.metrics.CharacterFilter;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.HistogramStatistics;
    +import org.apache.flink.metrics.Meter;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +
    +import static com.google.common.collect.Iterables.toArray;
    +
    +@PublicEvolving
    +public class PrometheusReporter implements MetricReporter {
    +	private static final Logger log = LoggerFactory.getLogger(PrometheusReporter.class);
    +
    +	static final         String ARG_PORT     = "port";
    +	private static final int    DEFAULT_PORT = 9249;
    +
    +	private static final Pattern         UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
    +	private static final CharacterFilter CHARACTER_FILTER       = new CharacterFilter() {
    +		@Override
    +		public String filterCharacters(String input) {
    +			return replaceInvalidChars(input);
    +		}
    +	};
    +
    +	private static final char SCOPE_SEPARATOR = '_';
    +
    +	private PrometheusEndpoint prometheusEndpoint;
    +	private Map<String, Collector> collectorsByMetricName = new HashMap<>();
    +
    +	@VisibleForTesting
    +	static String replaceInvalidChars(final String input) {
    +		// https://prometheus.io/docs/instrumenting/writing_exporters/
    +		// Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore.
    +		return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
    +	}
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +		int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
    +		log.info("Using port {}.", port);
    +		prometheusEndpoint = new PrometheusEndpoint(port);
    +		try {
    +			prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
    +		} catch (IOException e) {
    +			log.error("Could not start PrometheusEndpoint on port " + port, e);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		prometheusEndpoint.stop();
    +		CollectorRegistry.defaultRegistry.clear();
    +	}
    +
    +	@Override
    +	public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) {
    +		final String scope = ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
    +		List<String> dimensionKeys = new LinkedList<>();
    +		List<String> dimensionValues = new LinkedList<>();
    +		for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
    +			dimensionKeys.add(CHARACTER_FILTER.filterCharacters(CharMatcher.anyOf("<>").trimFrom(dimension.getKey())));
    +			dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
    +		}
    +
    +		final String validMetricName = scope + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName);
    +		final String metricIdentifier = group.getMetricIdentifier(metricName);
    +		final Collector collector;
    +		if (metric instanceof Gauge) {
    +			collector = createGauge((Gauge) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Counter) {
    +			collector = createGauge((Counter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Histogram) {
    +			collector = createSummary((Histogram) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Meter) {
    +			collector = createCounter((Meter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else {
    +			log.error("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
    --- End diff --
    
    Ok, I changed it. The `MetricRegistry` has a few `error` logs in it as well, logging things that probably don't interfere with job execution either (for example in the constructor, where it tries to open reporters). Do you want me to change them to `warn` as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r116368638
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import com.google.common.base.CharMatcher;
    +import com.google.common.collect.ImmutableList;
    +import fi.iki.elonen.NanoHTTPD;
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.common.TextFormat;
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.metrics.CharacterFilter;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.HistogramStatistics;
    +import org.apache.flink.metrics.Meter;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +
    +import static com.google.common.collect.Iterables.toArray;
    +
    +@PublicEvolving
    +public class PrometheusReporter implements MetricReporter {
    +	private static final Logger log = LoggerFactory.getLogger(PrometheusReporter.class);
    +
    +	static final         String ARG_PORT     = "port";
    +	private static final int    DEFAULT_PORT = 9249;
    +
    +	private static final Pattern         UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
    +	private static final CharacterFilter CHARACTER_FILTER       = new CharacterFilter() {
    +		@Override
    +		public String filterCharacters(String input) {
    +			return replaceInvalidChars(input);
    +		}
    +	};
    +
    +	private static final char SCOPE_SEPARATOR = '_';
    +
    +	private PrometheusEndpoint prometheusEndpoint;
    +	private Map<String, Collector> collectorsByMetricName = new HashMap<>();
    +
    +	@VisibleForTesting
    +	static String replaceInvalidChars(final String input) {
    +		// https://prometheus.io/docs/instrumenting/writing_exporters/
    +		// Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore.
    +		return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
    +	}
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +		int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
    +		log.info("Using port {}.", port);
    +		prometheusEndpoint = new PrometheusEndpoint(port);
    +		try {
    +			prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
    +		} catch (IOException e) {
    +			log.error("Could not start PrometheusEndpoint on port " + port, e);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		prometheusEndpoint.stop();
    +		CollectorRegistry.defaultRegistry.clear();
    +	}
    +
    +	@Override
    +	public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) {
    +		final String scope = ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
    +		List<String> dimensionKeys = new LinkedList<>();
    +		List<String> dimensionValues = new LinkedList<>();
    +		for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
    +			dimensionKeys.add(CHARACTER_FILTER.filterCharacters(CharMatcher.anyOf("<>").trimFrom(dimension.getKey())));
    +			dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
    +		}
    +
    +		final String validMetricName = scope + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName);
    +		final String metricIdentifier = group.getMetricIdentifier(metricName);
    +		final Collector collector;
    +		if (metric instanceof Gauge) {
    +			collector = createGauge((Gauge) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Counter) {
    +			collector = createGauge((Counter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Histogram) {
    +			collector = createSummary((Histogram) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else if (metric instanceof Meter) {
    +			collector = createCounter((Meter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
    +		} else {
    +			log.error("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
    +				metric.getClass().getName());
    +			return;
    +		}
    +		collector.register();
    +		collectorsByMetricName.put(metricName, collector);
    +	}
    +
    +	@Override
    +	public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) {
    +		CollectorRegistry.defaultRegistry.unregister(collectorsByMetricName.get(metricName));
    +		collectorsByMetricName.remove(metricName);
    +	}
    +
    +	private Collector createGauge(final Gauge gauge, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
    +		return io.prometheus.client.Gauge
    +			.build()
    +			.name(name)
    +			.help(identifier)
    +			.labelNames(toArray(labelNames, String.class))
    +			.create()
    +			.setChild(new io.prometheus.client.Gauge.Child() {
    +				@Override
    +				public double get() {
    +					final Object value = gauge.getValue();
    +					if (value instanceof Double) {
    +						return (double) value;
    +					}
    +					if (value instanceof Number) {
    +						return ((Number) value).doubleValue();
    +					} else if (value instanceof Boolean) {
    +						return ((Boolean) value) ? 1 : 0;
    +					} else {
    +						log.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.",
    +							gauge, value.getClass().getName());
    +						return 0;
    +					}
    +				}
    +			}, toArray(labelValues, String.class));
    +	}
    +
    +	private static Collector createGauge(final Counter counter, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
    +		return io.prometheus.client.Gauge
    +			.build()
    +			.name(name)
    +			.help(identifier)
    +			.labelNames(toArray(labelNames, String.class))
    +			.create()
    +			.setChild(new io.prometheus.client.Gauge.Child() {
    +				@Override
    +				public double get() {
    +					return (double) counter.getCount();
    +				}
    +			}, toArray(labelValues, String.class));
    +	}
    +
    +	private Collector createCounter(final Meter meter, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
    +		return io.prometheus.client.Counter
    +			.build()
    +			.name(name + "_total")
    +			.help(identifier)
    +			.labelNames(toArray(labelNames, String.class))
    +			.create()
    +			.setChild(new io.prometheus.client.Counter.Child() {
    +				@Override
    +				public double get() {
    +					return (double) meter.getCount();
    --- End diff --
    
    We should also export the rate of the meter as a prometheus gauge.
    
    Personally i think we can drop this method completely; I'm not too big of a fan of exporting the count of a meter; if that would be of interest there should always be a separate counter for it. (and as far as Flink is concerned, there is)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3833: [FLINK-6221] Add PrometheusReporter

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3833#discussion_r119595452
  
    --- Diff: flink-metrics/flink-metrics-prometheus/pom.xml ---
    @@ -0,0 +1,129 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    +
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.flink</groupId>
    +		<artifactId>flink-metrics</artifactId>
    +		<version>1.3-SNAPSHOT</version>
    --- End diff --
    
    I  guess that's outdated by now. Sorry :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---