You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by lamber-ken <gi...@git.apache.org> on 2018/06/20 04:28:29 UTC

[GitHub] flink pull request #6184: add prometheus pushgateway reporter

GitHub user lamber-ken opened a pull request:

    https://github.com/apache/flink/pull/6184

    add prometheus pushgateway reporter

    ## What is the purpose of the change
    This pull request makes flink system can send metrics to prometheus via pushgateway. it may be useful.
    
    ## Brief change log
    
      - Add prometheus pushgateway repoter
      - Restructure the code of the promethues reporter part
    
    ## Verifying this change
    
    This change is already covered by existing tests. [prometheus test](https://github.com/apache/flink/tree/master/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus)
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/lamber-ken/flink master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6184.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6184
    
----
commit 1a8e1f6193823e70b1dc6abc1146299042c25c7d
Author: lamber-ken <!...@...>
Date:   2018-06-20T04:26:10Z

    add prometheus pushgateway reporter

----


---

[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6184#discussion_r200123362
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.metrics.reporter.Scheduled;
    +import org.apache.flink.util.AbstractID;
    +
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.PushGateway;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +
    +/**
    + * /**
    + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
    + */
    +@PublicEvolving
    +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
    +	private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
    +
    +	public static final String ARG_HOST = "host";
    +	public static final String ARG_PORT = "port";
    +	public static final String ARG_JOB_NAME_PREFIX = "prefix";
    +
    +	public static final char JOB_NAME_SEPARATOR = '-';
    +	public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
    +
    +	private PushGateway pushGateway;
    +	private String jobName;
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +
    +		// reporter configs
    +		String host = config.getString(ARG_HOST, null);
    +		int port = config.getInteger(ARG_PORT, -1);
    +		String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
    +
    +		// host port
    +		if (host == null || host.length() == 0 || port < 1) {
    +			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
    +		}
    +
    +		// jobname
    +		String random = new AbstractID().toString();
    +		jobName = jobNamePrefix + JOB_NAME_SEPARATOR + random;
    +
    +		pushGateway = new PushGateway(host + ":" + port);
    +		log.info("Configured PrometheusPushGatewayReporter with {host:{}, port:{}, jobName:{}}", host, port, jobName);
    +	}
    +
    +	@Override
    +	public void report() {
    +		try {
    +			pushGateway.push(CollectorRegistry.defaultRegistry, jobName);
    +		} catch (Exception e) {
    +			log.warn("Failed reporting metrics to Prometheus.", e);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (pushGateway != null) {
    +			try {
    +				pushGateway.delete(jobName);
    +			} catch (IOException e) {
    +				log.warn("Failed to delete the job of Pushgateway", e);
    --- End diff --
    
    include job name.


---

[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6184#discussion_r200123482
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.metrics.reporter.Scheduled;
    +import org.apache.flink.util.AbstractID;
    +
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.PushGateway;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +
    +/**
    + * /**
    + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
    + */
    +@PublicEvolving
    +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
    +	private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
    +
    +	public static final String ARG_HOST = "host";
    +	public static final String ARG_PORT = "port";
    +	public static final String ARG_JOB_NAME_PREFIX = "prefix";
    +
    +	public static final char JOB_NAME_SEPARATOR = '-';
    +	public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
    +
    +	private PushGateway pushGateway;
    +	private String jobName;
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +
    +		// reporter configs
    +		String host = config.getString(ARG_HOST, null);
    +		int port = config.getInteger(ARG_PORT, -1);
    +		String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
    +
    +		// host port
    +		if (host == null || host.length() == 0 || port < 1) {
    +			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
    +		}
    +
    +		// jobname
    +		String random = new AbstractID().toString();
    +		jobName = jobNamePrefix + JOB_NAME_SEPARATOR + random;
    +
    +		pushGateway = new PushGateway(host + ":" + port);
    +		log.info("Configured PrometheusPushGatewayReporter with {host:{}, port:{}, jobName:{}}", host, port, jobName);
    +	}
    +
    +	@Override
    +	public void report() {
    +		try {
    +			pushGateway.push(CollectorRegistry.defaultRegistry, jobName);
    +		} catch (Exception e) {
    +			log.warn("Failed reporting metrics to Prometheus.", e);
    --- End diff --
    
    sync the exception message with exception in `close`


---

[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6184#discussion_r200122022
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.metrics.reporter.Scheduled;
    +import org.apache.flink.util.AbstractID;
    +
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.PushGateway;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +
    +/**
    + * /**
    + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
    + */
    +@PublicEvolving
    +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
    +	private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
    +
    +	public static final String ARG_HOST = "host";
    +	public static final String ARG_PORT = "port";
    +	public static final String ARG_JOB_NAME_PREFIX = "prefix";
    +
    +	public static final char JOB_NAME_SEPARATOR = '-';
    +	public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
    +
    +	private PushGateway pushGateway;
    +	private String jobName;
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +
    +		// reporter configs
    +		String host = config.getString(ARG_HOST, null);
    +		int port = config.getInteger(ARG_PORT, -1);
    +		String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
    +
    +		// host port
    +		if (host == null || host.length() == 0 || port < 1) {
    +			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
    +		}
    +
    +		// jobname
    +		String random = new AbstractID().toString();
    +		jobName = jobNamePrefix + JOB_NAME_SEPARATOR + random;
    +
    +		pushGateway = new PushGateway(host + ":" + port);
    +		log.info("Configured PrometheusPushGatewayReporter with {host:{}, port:{}, jobName:{}}", host, port, jobName);
    +	}
    +
    +	@Override
    +	public void report() {
    +		try {
    +			pushGateway.push(CollectorRegistry.defaultRegistry, jobName);
    +		} catch (Exception e) {
    +			log.warn("Failed reporting metrics to Prometheus.", e);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (pushGateway != null) {
    +			try {
    +				pushGateway.delete(jobName);
    --- End diff --
    
    I will add a configuration option to make the deletion optional, users may want to be in control of this themselves.


---

[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6184#discussion_r200122904
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.metrics.reporter.Scheduled;
    +import org.apache.flink.util.AbstractID;
    +
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.PushGateway;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +
    +/**
    + * /**
    + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
    + */
    +@PublicEvolving
    +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
    +	private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
    +
    +	public static final String ARG_HOST = "host";
    +	public static final String ARG_PORT = "port";
    +	public static final String ARG_JOB_NAME_PREFIX = "prefix";
    +
    +	public static final char JOB_NAME_SEPARATOR = '-';
    +	public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
    +
    +	private PushGateway pushGateway;
    +	private String jobName;
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +
    +		// reporter configs
    +		String host = config.getString(ARG_HOST, null);
    +		int port = config.getInteger(ARG_PORT, -1);
    +		String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
    +
    +		// host port
    +		if (host == null || host.length() == 0 || port < 1) {
    +			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
    +		}
    +
    +		// jobname
    --- End diff --
    
    remove comment, adds no value


---

[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6184#discussion_r200123407
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.metrics.reporter.Scheduled;
    +import org.apache.flink.util.AbstractID;
    +
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.PushGateway;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +
    +/**
    + * /**
    + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
    + */
    +@PublicEvolving
    +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
    +	private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
    +
    +	public static final String ARG_HOST = "host";
    +	public static final String ARG_PORT = "port";
    +	public static final String ARG_JOB_NAME_PREFIX = "prefix";
    +
    +	public static final char JOB_NAME_SEPARATOR = '-';
    +	public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
    +
    +	private PushGateway pushGateway;
    +	private String jobName;
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +
    +		// reporter configs
    +		String host = config.getString(ARG_HOST, null);
    +		int port = config.getInteger(ARG_PORT, -1);
    +		String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
    +
    +		// host port
    +		if (host == null || host.length() == 0 || port < 1) {
    +			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
    +		}
    +
    +		// jobname
    +		String random = new AbstractID().toString();
    +		jobName = jobNamePrefix + JOB_NAME_SEPARATOR + random;
    +
    +		pushGateway = new PushGateway(host + ":" + port);
    +		log.info("Configured PrometheusPushGatewayReporter with {host:{}, port:{}, jobName:{}}", host, port, jobName);
    +	}
    +
    +	@Override
    +	public void report() {
    +		try {
    +			pushGateway.push(CollectorRegistry.defaultRegistry, jobName);
    +		} catch (Exception e) {
    +			log.warn("Failed reporting metrics to Prometheus.", e);
    --- End diff --
    
    include job name


---

[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6184


---

[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6184#discussion_r200122928
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.metrics.reporter.Scheduled;
    +import org.apache.flink.util.AbstractID;
    +
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.PushGateway;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +
    +/**
    + * /**
    + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
    + */
    +@PublicEvolving
    +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
    +	private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
    +
    +	public static final String ARG_HOST = "host";
    +	public static final String ARG_PORT = "port";
    +	public static final String ARG_JOB_NAME_PREFIX = "prefix";
    +
    +	public static final char JOB_NAME_SEPARATOR = '-';
    +	public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
    +
    +	private PushGateway pushGateway;
    +	private String jobName;
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +
    +		// reporter configs
    --- End diff --
    
    remove comment, adds no value


---

[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6184#discussion_r200122491
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java ---
    @@ -0,0 +1,279 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.metrics.CharacterFilter;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.Meter;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
    +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
    +
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.AbstractMap;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +
    +
    +/**
    + * base prometheus reporter for prometheus metrics.
    --- End diff --
    
    extend javadocs to include responsibilities of this/extending classes.


---

[GitHub] flink issue #6184: [FLINK-9187][METRICS] add prometheus pushgateway reporter

Posted by lamber-ken <gi...@git.apache.org>.
Github user lamber-ken commented on the issue:

    https://github.com/apache/flink/pull/6184
  
    @zentol,  well done. If there is anything I can do for you, please let me know.


---

[GitHub] flink issue #6184: [FLINK-9187][METRICS] add prometheus pushgateway reporter

Posted by lamber-ken <gi...@git.apache.org>.
Github user lamber-ken commented on the issue:

    https://github.com/apache/flink/pull/6184
  
    Please click here for details [old-flink-9187](https://github.com/apache/flink/pull/5857)


---

[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6184#discussion_r200122913
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.metrics.reporter.Scheduled;
    +import org.apache.flink.util.AbstractID;
    +
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.PushGateway;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +
    +/**
    + * /**
    + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
    + */
    +@PublicEvolving
    +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
    +	private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
    +
    +	public static final String ARG_HOST = "host";
    +	public static final String ARG_PORT = "port";
    +	public static final String ARG_JOB_NAME_PREFIX = "prefix";
    +
    +	public static final char JOB_NAME_SEPARATOR = '-';
    +	public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
    +
    +	private PushGateway pushGateway;
    +	private String jobName;
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +
    +		// reporter configs
    +		String host = config.getString(ARG_HOST, null);
    +		int port = config.getInteger(ARG_PORT, -1);
    +		String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
    +
    +		// host port
    --- End diff --
    
    remove comment, adds no value


---

[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6184#discussion_r200122411
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.metrics.reporter.Scheduled;
    +import org.apache.flink.util.AbstractID;
    +
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.PushGateway;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +
    +/**
    + * /**
    --- End diff --
    
    remove line


---

[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6184#discussion_r200122285
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.metrics.reporter.Scheduled;
    +import org.apache.flink.util.AbstractID;
    +
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.PushGateway;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +
    +/**
    + * /**
    + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
    + */
    +@PublicEvolving
    +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
    +	private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
    +
    +	public static final String ARG_HOST = "host";
    +	public static final String ARG_PORT = "port";
    +	public static final String ARG_JOB_NAME_PREFIX = "prefix";
    +
    +	public static final char JOB_NAME_SEPARATOR = '-';
    +	public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
    +
    +	private PushGateway pushGateway;
    +	private String jobName;
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +
    +		// reporter configs
    +		String host = config.getString(ARG_HOST, null);
    +		int port = config.getInteger(ARG_PORT, -1);
    +		String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
    +
    +		// host port
    +		if (host == null || host.length() == 0 || port < 1) {
    --- End diff --
    
    use `host.isEmpty()`


---

[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6184#discussion_r200121810
  
    --- Diff: docs/monitoring/metrics.md ---
    @@ -700,6 +700,31 @@ Flink metric types are mapped to Prometheus metric types as follows:
     
     All Flink metrics variables (see [List of all Variables](#list-of-all-variables)) are exported to Prometheus as labels. 
     
    +### PrometheusPushGateway (org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)
    +
    +In order to use this reporter you must copy `/opt/flink-metrics-prometheus-{{site.version}}.jar` into the `/lib` folder
    +of your Flink distribution.
    +
    +Parameters:
    +
    +- `host` - the PushGateway server host
    +- `port` - the PushGateway server port
    +- `prefix` - (optional) the prefix is used to compose the jobName, defaults to `flink`. The jobName is used to distinguish different flink clusters
    +
    +Example configuration:
    +
    +{% highlight yaml %}
    +
    +metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
    +metrics.reporter.promgateway.host: localhost
    +metrics.reporter.promgateway.port: 9091
    +metrics.reporter.promgateway.prefix: flink
    +
    +{% endhighlight %}
    +
    +PrometheusPushGatewayReporter push metrics to a [Pushgateway](https://github.com/prometheus/pushgateway). The Pushgateway then exposes 
    +these metrics to Prometheus. The working mechanism is different from PrometheusReporter (see [PrometheusReporter](#prometheus-orgapacheflinkmetricsprometheusprometheusreporter)).
    --- End diff --
    
    I will remove the second sentence as it provides no value, and instead introduce a link tot he prometheus documentation for pushgateway use-cases.


---

[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6184#discussion_r200122215
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.prometheus;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.metrics.Metric;
    +import org.apache.flink.metrics.MetricConfig;
    +import org.apache.flink.metrics.reporter.MetricReporter;
    +import org.apache.flink.metrics.reporter.Scheduled;
    +import org.apache.flink.util.AbstractID;
    +
    +import io.prometheus.client.CollectorRegistry;
    +import io.prometheus.client.exporter.PushGateway;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +
    +/**
    + * /**
    + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway.
    + */
    +@PublicEvolving
    +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
    +	private final Logger log = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
    +
    +	public static final String ARG_HOST = "host";
    +	public static final String ARG_PORT = "port";
    +	public static final String ARG_JOB_NAME_PREFIX = "prefix";
    +
    +	public static final char JOB_NAME_SEPARATOR = '-';
    +	public static final String DEFAULT_JOB_NAME_PREFIX = "flink";
    +
    +	private PushGateway pushGateway;
    +	private String jobName;
    +
    +	@Override
    +	public void open(MetricConfig config) {
    +
    +		// reporter configs
    +		String host = config.getString(ARG_HOST, null);
    +		int port = config.getInteger(ARG_PORT, -1);
    +		String jobNamePrefix = config.getString(ARG_JOB_NAME_PREFIX, DEFAULT_JOB_NAME_PREFIX);
    +
    +		// host port
    +		if (host == null || host.length() == 0 || port < 1) {
    +			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
    +		}
    +
    +		// jobname
    +		String random = new AbstractID().toString();
    +		jobName = jobNamePrefix + JOB_NAME_SEPARATOR + random;
    --- End diff --
    
    I will make the random suffix optional for cases where users can supply different configurations to each container.


---

[GitHub] flink pull request #6184: [FLINK-9187][METRICS] add prometheus pushgateway r...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6184#discussion_r200122815
  
    --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---
    @@ -20,76 +20,38 @@
     
     import org.apache.flink.annotation.PublicEvolving;
     import org.apache.flink.annotation.VisibleForTesting;
    -import org.apache.flink.metrics.CharacterFilter;
    -import org.apache.flink.metrics.Counter;
    -import org.apache.flink.metrics.Gauge;
    -import org.apache.flink.metrics.Histogram;
    -import org.apache.flink.metrics.Meter;
     import org.apache.flink.metrics.Metric;
     import org.apache.flink.metrics.MetricConfig;
    -import org.apache.flink.metrics.MetricGroup;
     import org.apache.flink.metrics.reporter.MetricReporter;
    -import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
    -import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
     import org.apache.flink.util.NetUtils;
     import org.apache.flink.util.Preconditions;
     
    -import io.prometheus.client.Collector;
    -import io.prometheus.client.CollectorRegistry;
     import io.prometheus.client.exporter.HTTPServer;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.io.IOException;
    -import java.util.AbstractMap;
    -import java.util.ArrayList;
    -import java.util.Arrays;
    -import java.util.Collections;
    -import java.util.HashMap;
     import java.util.Iterator;
    -import java.util.LinkedList;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.regex.Pattern;
     
     /**
      * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus.
      */
     @PublicEvolving
    -public class PrometheusReporter implements MetricReporter {
    -	private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporter.class);
    +public class PrometheusReporter extends AbstractPrometheusReporter {
    +	private final Logger log = LoggerFactory.getLogger(PrometheusReporter.class);
    --- End diff --
    
    remove this logger and use the logger defined in the parent class


---