You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by lamber-ken <gi...@git.apache.org> on 2018/06/20 04:28:29 UTC
[GitHub] flink pull request #6184: add prometheus pushgateway reporter
GitHub user lamber-ken opened a pull request:
https://github.com/apache/flink/pull/6184
add prometheus pushgateway reporter
## What is the purpose of the change
This pull request makes flink system can send metrics to prometheus via pushgateway. it may be useful.
## Brief change log
- Add prometheus pushgateway repoter
- Restructure the code of the promethues reporter part
## Verifying this change
This change is already covered by existing tests. [prometheus test](https://github.com/apache/flink/tree/master/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus)
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (JavaDocs)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/lamber-ken/flink master
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6184.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6184
----
commit 1a8e1f6193823e70b1dc6abc1146299042c25c7d
Author: lamber-ken <!...@...>
Date: 2018-06-20T04:26:10Z
add prometheus pushgateway reporter
----
---
[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6184#discussion_r200123362
--- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
+ private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+ public static final String ARG_HOST = "host";
+ public static final String ARG_PORT = "port";
+ public static final String ARG_JOB_NAME_PREFIX = "prefix";
+
+ public static final char JOB_NAME_SEPARATOR = '-';
+ public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
+
+ private PushGateway pushGateway;
+ private String jobName;
+
+ @Override
+ public void open(MetricConfig config) {
+
+ // reporter configs
+ String host = config.getString(ARG_HOST, null);
+ int port = config.getInteger(ARG_PORT, -1);
+ String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
+
+ // host port
+ if (host == null || host.length() == 0 || port < 1) {
+ throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+ }
+
+ // jobname
+ String random = new AbstractID().toString();
+ jobName = jobNamePrefix + JOB_NAME_SEPARATOR + random;
+
+ pushGateway = new PushGateway(host + ":" + port);
+ log.info("Configured PrometheusPushGatewayReporter with {host:{}, port:{}, jobName:{}}", host, port, jobName);
+ }
+
+ @Override
+ public void report() {
+ try {
+ pushGateway.push(CollectorRegistry.defaultRegistry, jobName);
+ } catch (Exception e) {
+ log.warn("Failed reporting metrics to Prometheus.", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (pushGateway != null) {
+ try {
+ pushGateway.delete(jobName);
+ } catch (IOException e) {
+ log.warn("Failed to delete the job of Pushgateway", e);
--- End diff --
include job name.
---
[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6184#discussion_r200123482
--- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
+ private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+ public static final String ARG_HOST = "host";
+ public static final String ARG_PORT = "port";
+ public static final String ARG_JOB_NAME_PREFIX = "prefix";
+
+ public static final char JOB_NAME_SEPARATOR = '-';
+ public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
+
+ private PushGateway pushGateway;
+ private String jobName;
+
+ @Override
+ public void open(MetricConfig config) {
+
+ // reporter configs
+ String host = config.getString(ARG_HOST, null);
+ int port = config.getInteger(ARG_PORT, -1);
+ String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
+
+ // host port
+ if (host == null || host.length() == 0 || port < 1) {
+ throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+ }
+
+ // jobname
+ String random = new AbstractID().toString();
+ jobName = jobNamePrefix + JOB_NAME_SEPARATOR + random;
+
+ pushGateway = new PushGateway(host + ":" + port);
+ log.info("Configured PrometheusPushGatewayReporter with {host:{}, port:{}, jobName:{}}", host, port, jobName);
+ }
+
+ @Override
+ public void report() {
+ try {
+ pushGateway.push(CollectorRegistry.defaultRegistry, jobName);
+ } catch (Exception e) {
+ log.warn("Failed reporting metrics to Prometheus.", e);
--- End diff --
sync the exception message with exception in `close`
---
[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6184#discussion_r200122022
--- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
+ private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+ public static final String ARG_HOST = "host";
+ public static final String ARG_PORT = "port";
+ public static final String ARG_JOB_NAME_PREFIX = "prefix";
+
+ public static final char JOB_NAME_SEPARATOR = '-';
+ public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
+
+ private PushGateway pushGateway;
+ private String jobName;
+
+ @Override
+ public void open(MetricConfig config) {
+
+ // reporter configs
+ String host = config.getString(ARG_HOST, null);
+ int port = config.getInteger(ARG_PORT, -1);
+ String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
+
+ // host port
+ if (host == null || host.length() == 0 || port < 1) {
+ throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+ }
+
+ // jobname
+ String random = new AbstractID().toString();
+ jobName = jobNamePrefix + JOB_NAME_SEPARATOR + random;
+
+ pushGateway = new PushGateway(host + ":" + port);
+ log.info("Configured PrometheusPushGatewayReporter with {host:{}, port:{}, jobName:{}}", host, port, jobName);
+ }
+
+ @Override
+ public void report() {
+ try {
+ pushGateway.push(CollectorRegistry.defaultRegistry, jobName);
+ } catch (Exception e) {
+ log.warn("Failed reporting metrics to Prometheus.", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (pushGateway != null) {
+ try {
+ pushGateway.delete(jobName);
--- End diff --
I will add a configuration option to make the deletion optional, users may want to be in control of this themselves.
---
[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6184#discussion_r200122904
--- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
+ private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+ public static final String ARG_HOST = "host";
+ public static final String ARG_PORT = "port";
+ public static final String ARG_JOB_NAME_PREFIX = "prefix";
+
+ public static final char JOB_NAME_SEPARATOR = '-';
+ public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
+
+ private PushGateway pushGateway;
+ private String jobName;
+
+ @Override
+ public void open(MetricConfig config) {
+
+ // reporter configs
+ String host = config.getString(ARG_HOST, null);
+ int port = config.getInteger(ARG_PORT, -1);
+ String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
+
+ // host port
+ if (host == null || host.length() == 0 || port < 1) {
+ throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+ }
+
+ // jobname
--- End diff --
remove comment, adds no value
---
[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6184#discussion_r200123407
--- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
+ private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+ public static final String ARG_HOST = "host";
+ public static final String ARG_PORT = "port";
+ public static final String ARG_JOB_NAME_PREFIX = "prefix";
+
+ public static final char JOB_NAME_SEPARATOR = '-';
+ public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
+
+ private PushGateway pushGateway;
+ private String jobName;
+
+ @Override
+ public void open(MetricConfig config) {
+
+ // reporter configs
+ String host = config.getString(ARG_HOST, null);
+ int port = config.getInteger(ARG_PORT, -1);
+ String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
+
+ // host port
+ if (host == null || host.length() == 0 || port < 1) {
+ throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+ }
+
+ // jobname
+ String random = new AbstractID().toString();
+ jobName = jobNamePrefix + JOB_NAME_SEPARATOR + random;
+
+ pushGateway = new PushGateway(host + ":" + port);
+ log.info("Configured PrometheusPushGatewayReporter with {host:{}, port:{}, jobName:{}}", host, port, jobName);
+ }
+
+ @Override
+ public void report() {
+ try {
+ pushGateway.push(CollectorRegistry.defaultRegistry, jobName);
+ } catch (Exception e) {
+ log.warn("Failed reporting metrics to Prometheus.", e);
--- End diff --
include job name
---
[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6184
---
[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6184#discussion_r200122928
--- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
+ private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+ public static final String ARG_HOST = "host";
+ public static final String ARG_PORT = "port";
+ public static final String ARG_JOB_NAME_PREFIX = "prefix";
+
+ public static final char JOB_NAME_SEPARATOR = '-';
+ public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
+
+ private PushGateway pushGateway;
+ private String jobName;
+
+ @Override
+ public void open(MetricConfig config) {
+
+ // reporter configs
--- End diff --
remove comment, adds no value
---
[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6184#discussion_r200122491
--- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+
+/**
+ * base prometheus reporter for prometheus metrics.
--- End diff --
extend javadocs to include responsibilities of this/extending classes.
---
[GitHub] flink issue #6184: [FLINK-9187][METRICS] add prometheus pushgateway reporter
Posted by lamber-ken <gi...@git.apache.org>.
Github user lamber-ken commented on the issue:
https://github.com/apache/flink/pull/6184
@zentol, well done. If there is anything I can do for you, please let me know.
---
[GitHub] flink issue #6184: [FLINK-9187][METRICS] add prometheus pushgateway reporter
Posted by lamber-ken <gi...@git.apache.org>.
Github user lamber-ken commented on the issue:
https://github.com/apache/flink/pull/6184
Please click here for details [old-flink-9187](https://github.com/apache/flink/pull/5857)
---
[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6184#discussion_r200122913
--- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
+ private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+ public static final String ARG_HOST = "host";
+ public static final String ARG_PORT = "port";
+ public static final String ARG_JOB_NAME_PREFIX = "prefix";
+
+ public static final char JOB_NAME_SEPARATOR = '-';
+ public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
+
+ private PushGateway pushGateway;
+ private String jobName;
+
+ @Override
+ public void open(MetricConfig config) {
+
+ // reporter configs
+ String host = config.getString(ARG_HOST, null);
+ int port = config.getInteger(ARG_PORT, -1);
+ String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
+
+ // host port
--- End diff --
remove comment, adds no value
---
[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6184#discussion_r200122411
--- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * /**
--- End diff --
remove line
---
[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6184#discussion_r200122285
--- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
+ private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+ public static final String ARG_HOST = "host";
+ public static final String ARG_PORT = "port";
+ public static final String ARG_JOB_NAME_PREFIX = "prefix";
+
+ public static final char JOB_NAME_SEPARATOR = '-';
+ public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
+
+ private PushGateway pushGateway;
+ private String jobName;
+
+ @Override
+ public void open(MetricConfig config) {
+
+ // reporter configs
+ String host = config.getString(ARG_HOST, null);
+ int port = config.getInteger(ARG_PORT, -1);
+ String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
+
+ // host port
+ if (host == null || host.length() == 0 || port < 1) {
--- End diff --
use `host.isEmpty()`
---
[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6184#discussion_r200121810
--- Diff: docs/monitoring/metrics.md ---
@@ -700,6 +700,31 @@ Flink metric types are mapped to Prometheus metric types as follows:
All Flink metrics variables (see [List of all Variables](#list-of-all-variables)) are exported to Prometheus as labels.
+### PrometheusPushGateway (org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)
+
+In order to use this reporter you must copy `/opt/flink-metrics-prometheus-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Parameters:
+
+- `host` - the PushGateway server host
+- `port` - the PushGateway server port
+- `prefix` - (optional) the prefix is used to compose the jobName, defaults to `flink`. The jobName is used to distinguish different flink clusters
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
+metrics.reporter.promgateway.host: localhost
+metrics.reporter.promgateway.port: 9091
+metrics.reporter.promgateway.prefix: flink
+
+{% endhighlight %}
+
+PrometheusPushGatewayReporter push metrics to a [Pushgateway](https://github.com/prometheus/pushgateway). The Pushgateway then exposes
+these metrics to Prometheus. The working mechanism is different from PrometheusReporter (see [PrometheusReporter](#prometheus-orgapacheflinkmetricsprometheusprometheusreporter)).
--- End diff --
I will remove the second sentence as it provides no value, and instead introduce a link tot he prometheus documentation for pushgateway use-cases.
---
[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6184#discussion_r200122215
--- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.util.AbstractID;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * /**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
+ */
+@PublicEvolving
+public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
+ private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+ public static final String ARG_HOST = "host";
+ public static final String ARG_PORT = "port";
+ public static final String ARG_JOB_NAME_PREFIX = "prefix";
+
+ public static final char JOB_NAME_SEPARATOR = '-';
+ public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
+
+ private PushGateway pushGateway;
+ private String jobName;
+
+ @Override
+ public void open(MetricConfig config) {
+
+ // reporter configs
+ String host = config.getString(ARG_HOST, null);
+ int port = config.getInteger(ARG_PORT, -1);
+ String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
+
+ // host port
+ if (host == null || host.length() == 0 || port < 1) {
+ throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+ }
+
+ // jobname
+ String random = new AbstractID().toString();
+ jobName = jobNamePrefix + JOB_NAME_SEPARATOR + random;
--- End diff --
I will make the random suffix optional for cases where users can supply different configurations to each container.
---
[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6184#discussion_r200122815
--- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---
@@ -20,76 +20,38 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.metrics.CharacterFilter;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
-import io.prometheus.client.Collector;
-import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.HTTPServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
/**
* {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus.
*/
@PublicEvolving
-public class PrometheusReporter implements MetricReporter {
- private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporter.class);
+public class PrometheusReporter extends AbstractPrometheusReporter {
+ private final Logger log = LoggerFactory.getLogger(PrometheusReporter.class);
--- End diff --
remove this logger and use the logger defined in the parent class
---