You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by matyix <gi...@git.apache.org> on 2017/11/17 13:52:00 UTC

[GitHub] spark pull request #19775: Add support for publishing Spark metrics into Pro...

GitHub user matyix opened a pull request:

    https://github.com/apache/spark/pull/19775

    Add support for publishing Spark metrics into Prometheus

    ## What changes were proposed in this pull request?
    
    _Originally this PR was submitted to the Spark on K8S fork [here](https://github.com/apache-spark-on-k8s/spark/pull/531) but has been advised to resend it upstream by @erikerlandson and @foxish. K8S specific items were removed from the PR and been reworked for the Apache version._
    
    Publishing Spark metrics into Prometheus - as highlighted in the [JIRA](https://issues.apache.org/jira/browse/SPARK-22343). Implemented a metrics sink that publishes Spark metrics into Prometheus via [Prometheus Pushgateway](https://prometheus.io/docs/instrumenting/pushing/). Metrics data published by Spark is based on [Dropwizard](http://metrics.dropwizard.io/). The format of Spark metrics is not supported natively by Prometheus thus these are converted using [DropwizardExports](https://prometheus.io/client_java/io/prometheus/client/dropwizard/DropwizardExports.html) prior pushing metrics to the pushgateway.
    
    Also the default Prometheus pushgateway client API implementation does not support metrics timestamp thus the client API has been ehanced to enrich metrics  data with timestamp. 
    
    ## How was this patch tested?
    
    This PR is not affecting the existing code base and not altering the functionality. Nevertheless, I have executed all `unit and integration` tests. Also this setup has been deployed and been monitored via Prometheus (Prometheus 1.7.1 + Pushgateway 0.3.1). 
    
    `Manual` testing through deploying a Spark cluster, Prometheus server, Pushgateway and ran SparkPi.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/banzaicloud/spark apache_master_prometheus_support

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19775.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19775
    
----
commit 579cca96af187cf50fbedf5927cdea4e0bbdff26
Author: Janos Matyas <ja...@gmail.com>
Date:   2017-10-17T18:51:50Z

    Add support for prometheus

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by stoader <gi...@git.apache.org>.
Github user stoader commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r167359085
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java ---
    @@ -0,0 +1,320 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.net.HttpURLConnection;
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.net.URL;
    +import java.net.URLEncoder;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import java.io.BufferedWriter;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +
    +/**
    + * Export metrics via the Prometheus Pushgateway.
    + * <p>
    + * The Prometheus Pushgateway exists to allow ephemeral and
    + * batch jobs to expose their metrics to Prometheus.
    + * Since these kinds of jobs may not exist long enough to be scraped,
    + * they can instead push their metrics to a Pushgateway.
    + * This class allows pushing the contents of a {@link CollectorRegistry} to
    + * a Pushgateway.
    + * <p>
    + * Example usage:
    + * <pre>
    + * {@code
    + *   void executeBatchJob() throws Exception {
    + *     CollectorRegistry registry = new CollectorRegistry();
    + *     Gauge duration = Gauge.build()
    + *         .name("my_batch_job_duration_seconds")
    + *         .help("Duration of my batch job in seconds.")
    + *         .register(registry);
    + *     Gauge.Timer durationTimer = duration.startTimer();
    + *     try {
    + *       // Your code here.
    + *
    + *       // This is only added to the registry after success,
    + *       // so that a previous success in the Pushgateway isn't overwritten on failure.
    + *       Gauge lastSuccess = Gauge.build()
    + *           .name("my_batch_job_last_success")
    + *           .help("Last time my batch job succeeded, in unixtime.")
    + *           .register(registry);
    + *       lastSuccess.setToCurrentTime();
    + *     } finally {
    + *       durationTimer.setDuration();
    + *       PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091");
    + *       pg.pushAdd(registry, "my_batch_job");
    + *     }
    + *   }
    + * }
    + * </pre>
    + * <p>
    + * See <a href="https://github.com/prometheus/pushgateway">
    + *     https://github.com/prometheus/pushgateway</a>
    + */
    +public class PushGatewayWithTimestamp {
    +
    +    private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
    +    private final String address;
    +    private static final int SECONDS_PER_MILLISECOND = 1000;
    +    /**
    +     * Construct a Pushgateway, with the given address.
    +     * <p>
    +     * @param address  host:port or ip:port of the Pushgateway.
    +     */
    +    public PushGatewayWithTimestamp(String address) {
    +        this.address = address;
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry, String job) throws IOException {
    +        doRequest(registry, job, null, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(registry, job, groupingKey, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the same job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job, groupingKey);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,
    +                        String job, String timestamp) throws IOException {
    +        doRequest(registry, job, null, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, "");
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,String job,
    +                        Map<String, String> groupingKey, String timestamp) throws IOException {
    +        doRequest(registry, job, groupingKey, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job,
    +                        Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, groupingKey, null);
    +    }
    +
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with no grouping key and the provided job.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job) throws IOException {
    +        doRequest(null, job, null, "DELETE", null);
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with the provided job and grouping key.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(null, job, groupingKey, "DELETE", null);
    +    }
    +
    +
    +    /**
    +     * Pushes all metrics in a registry, replacing all those with the same job and instance.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(CollectorRegistry, String, Map)}
    +     */
    +    @Deprecated
    --- End diff --
    
    Removed deprecated methods.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by matyix <gi...@git.apache.org>.
Github user matyix commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    @jerryshao this PR is not Kubernetes specific, it's an extension of the Spark Metrics system which is part of the core already. We could externalize it the PR #11994 above ever gets merged. Re-factoring and externalizing it after (for all the other metrics subsystem is not a big work). Although I submitted this PR first on the K8S fork, actually this feature might be beneficial for all using the (becoming) de-facto monitoring solution, Prometheus. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by smurakozi <gi...@git.apache.org>.
Github user smurakozi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r166886232
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import java.io.IOException;
    +import java.io.Writer;
    +import java.util.Enumeration;
    +
    +import io.prometheus.client.Collector;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class TextFormatWithTimestamp {
    +    private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class);
    +
    +    /**
    +     * Content-type for text version 0.0.4.
    +     */
    +    public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8";
    +
    +    private static StringBuilder jsonMessageLogBuilder = new StringBuilder();
    +
    +    public static void write004(Writer writer,
    +                                Enumeration<Collector.MetricFamilySamples> mfs)throws IOException {
    +        write004(writer, mfs, null);
    +    }
    +
    +    /**
    +     * Write out the text version 0.0.4 of the given MetricFamilySamples.
    +     */
    +    public static void write004(Writer writer,Enumeration<Collector.MetricFamilySamples> mfs,
    +                                String timestamp) throws IOException {
    +    /* See http://prometheus.io/docs/instrumenting/exposition_formats/
    +     * for the output format specification. */
    +    while(mfs.hasMoreElements()) {
    +        Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
    --- End diff --
    
    I think `for(Collector.MetricFamilySamples s: Collections.list(mfs)) {`  would be nicer.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by erikerlandson <gi...@git.apache.org>.
Github user erikerlandson commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    @matyix thanks for re-submitting!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by matyix <gi...@git.apache.org>.
Github user matyix commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    @GaalDornick @erikerlandson @jerryshao @felixcheung  et all 
    
    We gave up this - we have made the requested changes several times and I am not willing to put more time on this and get in the middle of a debate which is not my concern. Currently the Spark monitoring architecture it is how it is - and we have made the PR to align with the current architecture of the existing sinks and metrics subsystem. What did happen is that now the debate is not about whether this is good, needed or not but whether it should be part of Spark core, be pluggable, we should refactor the whole metrics subsystem, etc. Most likely this will still be the case later and once these will be changed, nailed down or agreed by all parties I can rework and resend the PR... 
    
    Anyways, we (and our customers) are using this in production for months - we have externalized this into a separate jar which we put it on the CP and does not need to be part of Spark (though it should I believe, as Prometheus is one of the best open source monitoring framework). 
    
    Should anybody need help how to use this sink with Spark drop me a mail at janos@banzaicloud.com happy to help anybody who'd like to use Prometheus with Spark. We do pretty advanced scenarios with this sink and all open source - you can read more about [Monitoring Spark with Prometheus](https://banzaicloud.com/blog/spark-monitoring/) and [Federated monitoring of multiple Spark clusters](https://banzaicloud.com/blog/prometheus-application-monitoring/). 
    
    Thanks for all the support.
    Janos


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by lony <gi...@git.apache.org>.
Github user lony commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    @andrusha Do you have a tutorial how to set this up. From my understanding if I have multiple executers pulling it is harder, as prometheus has to have all the hostnames. Am I right or wrong?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by stoader <gi...@git.apache.org>.
Github user stoader commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r167357307
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java ---
    @@ -0,0 +1,320 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.net.HttpURLConnection;
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.net.URL;
    +import java.net.URLEncoder;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import java.io.BufferedWriter;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +
    +/**
    + * Export metrics via the Prometheus Pushgateway.
    + * <p>
    + * The Prometheus Pushgateway exists to allow ephemeral and
    + * batch jobs to expose their metrics to Prometheus.
    + * Since these kinds of jobs may not exist long enough to be scraped,
    + * they can instead push their metrics to a Pushgateway.
    + * This class allows pushing the contents of a {@link CollectorRegistry} to
    + * a Pushgateway.
    + * <p>
    + * Example usage:
    + * <pre>
    + * {@code
    + *   void executeBatchJob() throws Exception {
    + *     CollectorRegistry registry = new CollectorRegistry();
    + *     Gauge duration = Gauge.build()
    + *         .name("my_batch_job_duration_seconds")
    + *         .help("Duration of my batch job in seconds.")
    + *         .register(registry);
    + *     Gauge.Timer durationTimer = duration.startTimer();
    + *     try {
    + *       // Your code here.
    + *
    + *       // This is only added to the registry after success,
    + *       // so that a previous success in the Pushgateway isn't overwritten on failure.
    + *       Gauge lastSuccess = Gauge.build()
    + *           .name("my_batch_job_last_success")
    + *           .help("Last time my batch job succeeded, in unixtime.")
    + *           .register(registry);
    + *       lastSuccess.setToCurrentTime();
    + *     } finally {
    + *       durationTimer.setDuration();
    + *       PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091");
    + *       pg.pushAdd(registry, "my_batch_job");
    + *     }
    + *   }
    + * }
    + * </pre>
    + * <p>
    + * See <a href="https://github.com/prometheus/pushgateway">
    + *     https://github.com/prometheus/pushgateway</a>
    + */
    +public class PushGatewayWithTimestamp {
    +
    +    private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
    +    private final String address;
    +    private static final int SECONDS_PER_MILLISECOND = 1000;
    +    /**
    +     * Construct a Pushgateway, with the given address.
    +     * <p>
    +     * @param address  host:port or ip:port of the Pushgateway.
    +     */
    +    public PushGatewayWithTimestamp(String address) {
    +        this.address = address;
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry, String job) throws IOException {
    +        doRequest(registry, job, null, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(registry, job, groupingKey, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the same job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job, groupingKey);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,
    +                        String job, String timestamp) throws IOException {
    +        doRequest(registry, job, null, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, "");
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,String job,
    +                        Map<String, String> groupingKey, String timestamp) throws IOException {
    +        doRequest(registry, job, groupingKey, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job,
    +                        Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, groupingKey, null);
    +    }
    +
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with no grouping key and the provided job.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job) throws IOException {
    +        doRequest(null, job, null, "DELETE", null);
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with the provided job and grouping key.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(null, job, groupingKey, "DELETE", null);
    +    }
    +
    +
    +    /**
    +     * Pushes all metrics in a registry, replacing all those with the same job and instance.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(CollectorRegistry, String, Map)}
    +     */
    +    @Deprecated
    +    public void push(CollectorRegistry registry, String job, String instance) throws IOException {
    +        push(registry, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the same job and instance.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(Collector, String, Map)}
    +     */
    +    @Deprecated
    +    public void push(Collector collector, String job, String instance) throws IOException {
    +        push(collector, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     * @deprecated use {@link #pushAdd(Collector, String, Map)}
    +     */
    +    @Deprecated
    +    public void pushAdd(Collector collector, String job, String instance) throws IOException {
    +        pushAdd(collector, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * This uses the DELETE HTTP method.
    +     * @deprecated use {@link #delete(String, Map)}
    +     */
    +    @Deprecated
    +    public void delete(String job, String instance) throws IOException {
    +        delete(job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    void doRequest(CollectorRegistry registry, String job, Map<String,
    +            String> groupingKey, String method, String timestamp) throws IOException {
    +        String url = address + "/metrics/job/" + URLEncoder.encode(job, "UTF-8");
    +        if (groupingKey != null) {
    +            for (Map.Entry<String, String> entry: groupingKey.entrySet()) {
    +                url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8");
    +            }
    +        }
    +
    +        logger.info("Sending metrics data to '{}'", url);
    +
    +        HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
    +        connection.setRequestProperty("Content-Type", TextFormatWithTimestamp.CONTENT_TYPE_004);
    +        if (!method.equals("DELETE")) {
    +            connection.setDoOutput(true);
    +        }
    +        connection.setRequestMethod(method);
    +
    +        connection.setConnectTimeout(10 * SECONDS_PER_MILLISECOND);
    +        connection.setReadTimeout(10 * SECONDS_PER_MILLISECOND);
    +        connection.connect();
    +
    +        try {
    +            if (!method.equals("DELETE")) {
    +                BufferedWriter writer =
    +                        new BufferedWriter(
    +                                new OutputStreamWriter(connection.getOutputStream(), "UTF-8"));
    +                TextFormatWithTimestamp.write004(writer,
    +                                                registry.metricFamilySamples(), timestamp);
    +                writer.flush();
    +                writer.close();
    +            }
    +
    +            int response = connection.getResponseCode();
    +            if (response != HttpURLConnection.HTTP_ACCEPTED) {
    +                throw new IOException("Response code from " + url + " was " + response);
    +            }
    +        } catch (Exception ex) {
    +            logger.error("Sending metrics failed due to: ", ex);
    +        }
    +
    +        finally {
    +            connection.disconnect();
    --- End diff --
    
    The `new URL(url).openConnection()` is outside of the `try-catch-block` this in case it throws an exception it exist `doRequest()` before reaching ```finally {
                connection.disconnect();```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by smurakozi <gi...@git.apache.org>.
Github user smurakozi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r166889139
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import java.io.IOException;
    +import java.io.Writer;
    +import java.util.Enumeration;
    +
    +import io.prometheus.client.Collector;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class TextFormatWithTimestamp {
    +    private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class);
    +
    +    /**
    +     * Content-type for text version 0.0.4.
    +     */
    +    public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8";
    +
    +    private static StringBuilder jsonMessageLogBuilder = new StringBuilder();
    --- End diff --
    
    Usage of this variable is questionable for a couple of reasons:
    - it just keeps growing, it's never cleared or re-initialized. As a consequence from the second call of write it will have invalid content + it acts as a memory leak.
    - its usage pattern (`writer.write(blah);appendToJsonMessageLogBuilder("blah")`) is pretty verbose, it should be factored out. 
    - it's not thread safe (and it's not documented)
    - I don't think accessing it as a static member everywhere is a good design. It should either
      - be passed around as method parameter
      - or changed to an instance method. The static write004 could instantiate a new `TextFormatWithTimestamp` and call write on that. 
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by smurakozi <gi...@git.apache.org>.
Github user smurakozi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r166894472
  
    --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.metrics.sink
    +
    +import java.net.URI
    +import java.util
    +import java.util.Properties
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.JavaConverters._
    +import scala.util.Try
    +
    +import com.codahale.metrics._
    +import io.prometheus.client.CollectorRegistry
    +import io.prometheus.client.dropwizard.DropwizardExports
    +
    +import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config.METRICS_NAMESPACE
    +import org.apache.spark.metrics.MetricsSystem
    +import org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp
    +
    +
    +private[spark] class PrometheusSink(
    +                                     val property: Properties,
    +                                     val registry: MetricRegistry,
    +                                     securityMgr: SecurityManager)
    +  extends Sink with Logging {
    +
    +  protected class Reporter(registry: MetricRegistry)
    +    extends ScheduledReporter(
    +      registry,
    +      "prometheus-reporter",
    +      MetricFilter.ALL,
    +      TimeUnit.SECONDS,
    +      TimeUnit.MILLISECONDS) {
    +
    +    val defaultSparkConf: SparkConf = new SparkConf(true)
    +
    +    override def report(
    +                         gauges: util.SortedMap[String, Gauge[_]],
    +                         counters: util.SortedMap[String, Counter],
    +                         histograms: util.SortedMap[String, Histogram],
    +                         meters: util.SortedMap[String, Meter],
    +                         timers: util.SortedMap[String, Timer]): Unit = {
    +
    +      // SparkEnv may become available only after metrics sink creation thus retrieving
    +      // SparkConf from spark env here and not during the creation/initialisation of PrometheusSink.
    +      val sparkConf: SparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(defaultSparkConf)
    +
    +      val metricsNamespace: Option[String] = sparkConf.get(METRICS_NAMESPACE)
    +      val sparkAppId: Option[String] = sparkConf.getOption("spark.app.id")
    +      val executorId: Option[String] = sparkConf.getOption("spark.executor.id")
    +
    +      logInfo(s"metricsNamespace=$metricsNamespace, sparkAppId=$sparkAppId, " +
    +        s"executorId=$executorId")
    +
    +      val role: String = (sparkAppId, executorId) match {
    +        case (Some(_), Some(SparkContext.DRIVER_IDENTIFIER)) => "driver"
    +        case (Some(_), Some(_)) => "executor"
    +        case _ => "shuffle"
    +      }
    +
    +      val job: String = role match {
    +        case "driver" => metricsNamespace.getOrElse(sparkAppId.get)
    +        case "executor" => metricsNamespace.getOrElse(sparkAppId.get)
    +        case _ => metricsNamespace.getOrElse("shuffle")
    +      }
    +      logInfo(s"role=$role, job=$job")
    +
    +      val groupingKey: Map[String, String] = (role, executorId) match {
    +        case ("driver", _) => Map("role" -> role)
    +        case ("executor", Some(id)) => Map ("role" -> role, "number" -> id)
    +        case _ => Map("role" -> role)
    +      }
    +
    +
    +      pushGateway.pushAdd(pushRegistry, job, groupingKey.asJava,
    +        s"${System.currentTimeMillis}")
    +
    +    }
    +
    +  }
    +
    +  val DEFAULT_PUSH_PERIOD: Int = 10
    +  val DEFAULT_PUSH_PERIOD_UNIT: TimeUnit = TimeUnit.SECONDS
    +  val DEFAULT_PUSHGATEWAY_ADDRESS: String = "127.0.0.1:9091"
    +  val DEFAULT_PUSHGATEWAY_ADDRESS_PROTOCOL: String = "http"
    +
    +  val KEY_PUSH_PERIOD = "period"
    +  val KEY_PUSH_PERIOD_UNIT = "unit"
    +  val KEY_PUSHGATEWAY_ADDRESS = "pushgateway-address"
    +  val KEY_PUSHGATEWAY_ADDRESS_PROTOCOL = "pushgateway-address-protocol"
    +
    +
    +  val pollPeriod: Int =
    +    Option(property.getProperty(KEY_PUSH_PERIOD))
    +      .map(_.toInt)
    +      .getOrElse(DEFAULT_PUSH_PERIOD)
    +
    +  val pollUnit: TimeUnit =
    +    Option(property.getProperty(KEY_PUSH_PERIOD_UNIT))
    +      .map { s => TimeUnit.valueOf(s.toUpperCase) }
    +      .getOrElse(DEFAULT_PUSH_PERIOD_UNIT)
    +
    +  val pushGatewayAddress =
    +    Option(property.getProperty(KEY_PUSHGATEWAY_ADDRESS))
    +      .getOrElse(DEFAULT_PUSHGATEWAY_ADDRESS)
    +
    +  val pushGatewayAddressProtocol =
    +    Option(property.getProperty(KEY_PUSHGATEWAY_ADDRESS_PROTOCOL))
    +      .getOrElse(DEFAULT_PUSHGATEWAY_ADDRESS_PROTOCOL)
    +
    +  // validate pushgateway host:port
    +  Try(new URI(s"$pushGatewayAddressProtocol://$pushGatewayAddress")).get
    +
    +  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
    +
    +  logInfo("Initializing Prometheus Sink...")
    +  logInfo(s"Metrics polling period -> $pollPeriod $pollUnit")
    +  logInfo(s"$KEY_PUSHGATEWAY_ADDRESS -> $pushGatewayAddress")
    +  logInfo(s"$KEY_PUSHGATEWAY_ADDRESS_PROTOCOL -> $pushGatewayAddressProtocol")
    +
    +  val pushRegistry: CollectorRegistry = new CollectorRegistry()
    +  val sparkMetricExports: DropwizardExports = new DropwizardExports(registry)
    +  val pushGateway: PushGatewayWithTimestamp =
    +    new PushGatewayWithTimestamp(s"$pushGatewayAddressProtocol://$pushGatewayAddress")
    +
    +  val reporter = new Reporter(registry)
    +
    +  override def start(): Unit = {
    +    sparkMetricExports.register(pushRegistry)
    +
    --- End diff --
    
    Nit: extra line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by matyix <gi...@git.apache.org>.
Github user matyix commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    For those who are still interested using Prometheus you can get the standalone package and source code from here: https://github.com/banzaicloud/spark-metrics . Happy monitoring, try to catch the issues and avoid those PagerDuty notifications beforehand :).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by smurakozi <gi...@git.apache.org>.
Github user smurakozi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r166896081
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java ---
    @@ -0,0 +1,320 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.net.HttpURLConnection;
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.net.URL;
    +import java.net.URLEncoder;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import java.io.BufferedWriter;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +
    +/**
    + * Export metrics via the Prometheus Pushgateway.
    + * <p>
    + * The Prometheus Pushgateway exists to allow ephemeral and
    + * batch jobs to expose their metrics to Prometheus.
    + * Since these kinds of jobs may not exist long enough to be scraped,
    + * they can instead push their metrics to a Pushgateway.
    + * This class allows pushing the contents of a {@link CollectorRegistry} to
    + * a Pushgateway.
    + * <p>
    + * Example usage:
    + * <pre>
    + * {@code
    + *   void executeBatchJob() throws Exception {
    + *     CollectorRegistry registry = new CollectorRegistry();
    + *     Gauge duration = Gauge.build()
    + *         .name("my_batch_job_duration_seconds")
    + *         .help("Duration of my batch job in seconds.")
    + *         .register(registry);
    + *     Gauge.Timer durationTimer = duration.startTimer();
    + *     try {
    + *       // Your code here.
    + *
    + *       // This is only added to the registry after success,
    + *       // so that a previous success in the Pushgateway isn't overwritten on failure.
    + *       Gauge lastSuccess = Gauge.build()
    + *           .name("my_batch_job_last_success")
    + *           .help("Last time my batch job succeeded, in unixtime.")
    + *           .register(registry);
    + *       lastSuccess.setToCurrentTime();
    + *     } finally {
    + *       durationTimer.setDuration();
    + *       PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091");
    + *       pg.pushAdd(registry, "my_batch_job");
    + *     }
    + *   }
    + * }
    + * </pre>
    + * <p>
    + * See <a href="https://github.com/prometheus/pushgateway">
    + *     https://github.com/prometheus/pushgateway</a>
    + */
    +public class PushGatewayWithTimestamp {
    +
    +    private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
    +    private final String address;
    +    private static final int SECONDS_PER_MILLISECOND = 1000;
    +    /**
    +     * Construct a Pushgateway, with the given address.
    +     * <p>
    +     * @param address  host:port or ip:port of the Pushgateway.
    +     */
    +    public PushGatewayWithTimestamp(String address) {
    +        this.address = address;
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry, String job) throws IOException {
    +        doRequest(registry, job, null, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(registry, job, groupingKey, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the same job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job, groupingKey);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,
    +                        String job, String timestamp) throws IOException {
    +        doRequest(registry, job, null, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, "");
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,String job,
    +                        Map<String, String> groupingKey, String timestamp) throws IOException {
    +        doRequest(registry, job, groupingKey, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job,
    +                        Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, groupingKey, null);
    +    }
    +
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with no grouping key and the provided job.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job) throws IOException {
    +        doRequest(null, job, null, "DELETE", null);
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with the provided job and grouping key.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(null, job, groupingKey, "DELETE", null);
    +    }
    +
    +
    +    /**
    +     * Pushes all metrics in a registry, replacing all those with the same job and instance.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(CollectorRegistry, String, Map)}
    +     */
    +    @Deprecated
    +    public void push(CollectorRegistry registry, String job, String instance) throws IOException {
    +        push(registry, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the same job and instance.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(Collector, String, Map)}
    +     */
    +    @Deprecated
    +    public void push(Collector collector, String job, String instance) throws IOException {
    +        push(collector, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     * @deprecated use {@link #pushAdd(Collector, String, Map)}
    +     */
    +    @Deprecated
    +    public void pushAdd(Collector collector, String job, String instance) throws IOException {
    +        pushAdd(collector, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * This uses the DELETE HTTP method.
    +     * @deprecated use {@link #delete(String, Map)}
    +     */
    +    @Deprecated
    +    public void delete(String job, String instance) throws IOException {
    +        delete(job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    void doRequest(CollectorRegistry registry, String job, Map<String,
    +            String> groupingKey, String method, String timestamp) throws IOException {
    +        String url = address + "/metrics/job/" + URLEncoder.encode(job, "UTF-8");
    +        if (groupingKey != null) {
    +            for (Map.Entry<String, String> entry: groupingKey.entrySet()) {
    +                url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8");
    +            }
    +        }
    +
    +        logger.info("Sending metrics data to '{}'", url);
    +
    +        HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
    +        connection.setRequestProperty("Content-Type", TextFormatWithTimestamp.CONTENT_TYPE_004);
    +        if (!method.equals("DELETE")) {
    +            connection.setDoOutput(true);
    +        }
    +        connection.setRequestMethod(method);
    +
    +        connection.setConnectTimeout(10 * SECONDS_PER_MILLISECOND);
    +        connection.setReadTimeout(10 * SECONDS_PER_MILLISECOND);
    +        connection.connect();
    +
    +        try {
    +            if (!method.equals("DELETE")) {
    +                BufferedWriter writer =
    +                        new BufferedWriter(
    +                                new OutputStreamWriter(connection.getOutputStream(), "UTF-8"));
    +                TextFormatWithTimestamp.write004(writer,
    +                                                registry.metricFamilySamples(), timestamp);
    +                writer.flush();
    +                writer.close();
    +            }
    +
    +            int response = connection.getResponseCode();
    +            if (response != HttpURLConnection.HTTP_ACCEPTED) {
    +                throw new IOException("Response code from " + url + " was " + response);
    +            }
    +        } catch (Exception ex) {
    +            logger.error("Sending metrics failed due to: ", ex);
    +        }
    +
    --- End diff --
    
    Nit: extra line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by smurakozi <gi...@git.apache.org>.
Github user smurakozi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r166896270
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java ---
    @@ -0,0 +1,320 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.net.HttpURLConnection;
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.net.URL;
    +import java.net.URLEncoder;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import java.io.BufferedWriter;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +
    +/**
    + * Export metrics via the Prometheus Pushgateway.
    + * <p>
    + * The Prometheus Pushgateway exists to allow ephemeral and
    + * batch jobs to expose their metrics to Prometheus.
    + * Since these kinds of jobs may not exist long enough to be scraped,
    + * they can instead push their metrics to a Pushgateway.
    + * This class allows pushing the contents of a {@link CollectorRegistry} to
    + * a Pushgateway.
    + * <p>
    + * Example usage:
    + * <pre>
    + * {@code
    + *   void executeBatchJob() throws Exception {
    + *     CollectorRegistry registry = new CollectorRegistry();
    + *     Gauge duration = Gauge.build()
    + *         .name("my_batch_job_duration_seconds")
    + *         .help("Duration of my batch job in seconds.")
    + *         .register(registry);
    + *     Gauge.Timer durationTimer = duration.startTimer();
    + *     try {
    + *       // Your code here.
    + *
    + *       // This is only added to the registry after success,
    + *       // so that a previous success in the Pushgateway isn't overwritten on failure.
    + *       Gauge lastSuccess = Gauge.build()
    + *           .name("my_batch_job_last_success")
    + *           .help("Last time my batch job succeeded, in unixtime.")
    + *           .register(registry);
    + *       lastSuccess.setToCurrentTime();
    + *     } finally {
    + *       durationTimer.setDuration();
    + *       PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091");
    + *       pg.pushAdd(registry, "my_batch_job");
    + *     }
    + *   }
    + * }
    + * </pre>
    + * <p>
    + * See <a href="https://github.com/prometheus/pushgateway">
    + *     https://github.com/prometheus/pushgateway</a>
    + */
    +public class PushGatewayWithTimestamp {
    +
    +    private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
    +    private final String address;
    +    private static final int SECONDS_PER_MILLISECOND = 1000;
    +    /**
    +     * Construct a Pushgateway, with the given address.
    +     * <p>
    +     * @param address  host:port or ip:port of the Pushgateway.
    +     */
    +    public PushGatewayWithTimestamp(String address) {
    +        this.address = address;
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry, String job) throws IOException {
    +        doRequest(registry, job, null, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(registry, job, groupingKey, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the same job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job, groupingKey);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,
    +                        String job, String timestamp) throws IOException {
    +        doRequest(registry, job, null, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, "");
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,String job,
    +                        Map<String, String> groupingKey, String timestamp) throws IOException {
    +        doRequest(registry, job, groupingKey, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job,
    +                        Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, groupingKey, null);
    +    }
    +
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with no grouping key and the provided job.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job) throws IOException {
    +        doRequest(null, job, null, "DELETE", null);
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with the provided job and grouping key.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(null, job, groupingKey, "DELETE", null);
    +    }
    +
    +
    +    /**
    +     * Pushes all metrics in a registry, replacing all those with the same job and instance.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(CollectorRegistry, String, Map)}
    +     */
    +    @Deprecated
    +    public void push(CollectorRegistry registry, String job, String instance) throws IOException {
    +        push(registry, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the same job and instance.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(Collector, String, Map)}
    +     */
    +    @Deprecated
    +    public void push(Collector collector, String job, String instance) throws IOException {
    +        push(collector, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     * @deprecated use {@link #pushAdd(Collector, String, Map)}
    +     */
    +    @Deprecated
    +    public void pushAdd(Collector collector, String job, String instance) throws IOException {
    +        pushAdd(collector, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * This uses the DELETE HTTP method.
    +     * @deprecated use {@link #delete(String, Map)}
    +     */
    +    @Deprecated
    +    public void delete(String job, String instance) throws IOException {
    +        delete(job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    void doRequest(CollectorRegistry registry, String job, Map<String,
    +            String> groupingKey, String method, String timestamp) throws IOException {
    +        String url = address + "/metrics/job/" + URLEncoder.encode(job, "UTF-8");
    +        if (groupingKey != null) {
    +            for (Map.Entry<String, String> entry: groupingKey.entrySet()) {
    +                url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8");
    +            }
    +        }
    +
    +        logger.info("Sending metrics data to '{}'", url);
    +
    +        HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
    +        connection.setRequestProperty("Content-Type", TextFormatWithTimestamp.CONTENT_TYPE_004);
    +        if (!method.equals("DELETE")) {
    +            connection.setDoOutput(true);
    +        }
    +        connection.setRequestMethod(method);
    +
    +        connection.setConnectTimeout(10 * SECONDS_PER_MILLISECOND);
    +        connection.setReadTimeout(10 * SECONDS_PER_MILLISECOND);
    +        connection.connect();
    +
    +        try {
    +            if (!method.equals("DELETE")) {
    +                BufferedWriter writer =
    +                        new BufferedWriter(
    +                                new OutputStreamWriter(connection.getOutputStream(), "UTF-8"));
    +                TextFormatWithTimestamp.write004(writer,
    +                                                registry.metricFamilySamples(), timestamp);
    +                writer.flush();
    +                writer.close();
    +            }
    +
    +            int response = connection.getResponseCode();
    +            if (response != HttpURLConnection.HTTP_ACCEPTED) {
    +                throw new IOException("Response code from " + url + " was " + response);
    +            }
    +        } catch (Exception ex) {
    +            logger.error("Sending metrics failed due to: ", ex);
    +        }
    +
    +        finally {
    +            connection.disconnect();
    --- End diff --
    
    `connection` can be null if `new URL(url).openConnection()` at line 272 threw an exception.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by stoader <gi...@git.apache.org>.
Github user stoader commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r167359067
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java ---
    @@ -0,0 +1,320 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.net.HttpURLConnection;
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.net.URL;
    +import java.net.URLEncoder;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import java.io.BufferedWriter;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +
    +/**
    + * Export metrics via the Prometheus Pushgateway.
    + * <p>
    + * The Prometheus Pushgateway exists to allow ephemeral and
    + * batch jobs to expose their metrics to Prometheus.
    + * Since these kinds of jobs may not exist long enough to be scraped,
    + * they can instead push their metrics to a Pushgateway.
    + * This class allows pushing the contents of a {@link CollectorRegistry} to
    + * a Pushgateway.
    + * <p>
    + * Example usage:
    + * <pre>
    + * {@code
    + *   void executeBatchJob() throws Exception {
    + *     CollectorRegistry registry = new CollectorRegistry();
    + *     Gauge duration = Gauge.build()
    + *         .name("my_batch_job_duration_seconds")
    + *         .help("Duration of my batch job in seconds.")
    + *         .register(registry);
    + *     Gauge.Timer durationTimer = duration.startTimer();
    + *     try {
    + *       // Your code here.
    + *
    + *       // This is only added to the registry after success,
    + *       // so that a previous success in the Pushgateway isn't overwritten on failure.
    + *       Gauge lastSuccess = Gauge.build()
    + *           .name("my_batch_job_last_success")
    + *           .help("Last time my batch job succeeded, in unixtime.")
    + *           .register(registry);
    + *       lastSuccess.setToCurrentTime();
    + *     } finally {
    + *       durationTimer.setDuration();
    + *       PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091");
    + *       pg.pushAdd(registry, "my_batch_job");
    + *     }
    + *   }
    + * }
    + * </pre>
    + * <p>
    + * See <a href="https://github.com/prometheus/pushgateway">
    + *     https://github.com/prometheus/pushgateway</a>
    + */
    +public class PushGatewayWithTimestamp {
    +
    +    private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
    +    private final String address;
    +    private static final int SECONDS_PER_MILLISECOND = 1000;
    +    /**
    +     * Construct a Pushgateway, with the given address.
    +     * <p>
    +     * @param address  host:port or ip:port of the Pushgateway.
    +     */
    +    public PushGatewayWithTimestamp(String address) {
    +        this.address = address;
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry, String job) throws IOException {
    +        doRequest(registry, job, null, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(registry, job, groupingKey, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the same job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job, groupingKey);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,
    +                        String job, String timestamp) throws IOException {
    +        doRequest(registry, job, null, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, "");
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,String job,
    +                        Map<String, String> groupingKey, String timestamp) throws IOException {
    +        doRequest(registry, job, groupingKey, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job,
    +                        Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, groupingKey, null);
    +    }
    +
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with no grouping key and the provided job.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job) throws IOException {
    +        doRequest(null, job, null, "DELETE", null);
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with the provided job and grouping key.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(null, job, groupingKey, "DELETE", null);
    +    }
    +
    +
    +    /**
    +     * Pushes all metrics in a registry, replacing all those with the same job and instance.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(CollectorRegistry, String, Map)}
    +     */
    +    @Deprecated
    +    public void push(CollectorRegistry registry, String job, String instance) throws IOException {
    +        push(registry, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the same job and instance.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(Collector, String, Map)}
    +     */
    +    @Deprecated
    +    public void push(Collector collector, String job, String instance) throws IOException {
    +        push(collector, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     * @deprecated use {@link #pushAdd(Collector, String, Map)}
    +     */
    +    @Deprecated
    +    public void pushAdd(Collector collector, String job, String instance) throws IOException {
    +        pushAdd(collector, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * This uses the DELETE HTTP method.
    +     * @deprecated use {@link #delete(String, Map)}
    +     */
    +    @Deprecated
    +    public void delete(String job, String instance) throws IOException {
    +        delete(job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    void doRequest(CollectorRegistry registry, String job, Map<String,
    +            String> groupingKey, String method, String timestamp) throws IOException {
    +        String url = address + "/metrics/job/" + URLEncoder.encode(job, "UTF-8");
    +        if (groupingKey != null) {
    +            for (Map.Entry<String, String> entry: groupingKey.entrySet()) {
    +                url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8");
    +            }
    +        }
    +
    +        logger.info("Sending metrics data to '{}'", url);
    +
    +        HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
    +        connection.setRequestProperty("Content-Type", TextFormatWithTimestamp.CONTENT_TYPE_004);
    +        if (!method.equals("DELETE")) {
    +            connection.setDoOutput(true);
    +        }
    +        connection.setRequestMethod(method);
    +
    +        connection.setConnectTimeout(10 * SECONDS_PER_MILLISECOND);
    +        connection.setReadTimeout(10 * SECONDS_PER_MILLISECOND);
    +        connection.connect();
    +
    +        try {
    +            if (!method.equals("DELETE")) {
    +                BufferedWriter writer =
    +                        new BufferedWriter(
    +                                new OutputStreamWriter(connection.getOutputStream(), "UTF-8"));
    +                TextFormatWithTimestamp.write004(writer,
    +                                                registry.metricFamilySamples(), timestamp);
    +                writer.flush();
    +                writer.close();
    +            }
    +
    +            int response = connection.getResponseCode();
    +            if (response != HttpURLConnection.HTTP_ACCEPTED) {
    +                throw new IOException("Response code from " + url + " was " + response);
    +            }
    +        } catch (Exception ex) {
    +            logger.error("Sending metrics failed due to: ", ex);
    +        }
    +
    --- End diff --
    
    Removed extra line.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by stoader <gi...@git.apache.org>.
Github user stoader commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r167359048
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import java.io.IOException;
    +import java.io.Writer;
    +import java.util.Enumeration;
    +
    +import io.prometheus.client.Collector;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class TextFormatWithTimestamp {
    +    private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class);
    +
    +    /**
    +     * Content-type for text version 0.0.4.
    +     */
    +    public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8";
    +
    +    private static StringBuilder jsonMessageLogBuilder = new StringBuilder();
    --- End diff --
    
    This class has been refactored.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by smurakozi <gi...@git.apache.org>.
Github user smurakozi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r166892802
  
    --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.metrics.sink
    +
    +import java.net.URI
    +import java.util
    +import java.util.Properties
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.JavaConverters._
    +import scala.util.Try
    +
    +import com.codahale.metrics._
    +import io.prometheus.client.CollectorRegistry
    +import io.prometheus.client.dropwizard.DropwizardExports
    +
    +import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config.METRICS_NAMESPACE
    +import org.apache.spark.metrics.MetricsSystem
    +import org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp
    +
    +
    +private[spark] class PrometheusSink(
    +                                     val property: Properties,
    +                                     val registry: MetricRegistry,
    +                                     securityMgr: SecurityManager)
    +  extends Sink with Logging {
    +
    +  protected class Reporter(registry: MetricRegistry)
    +    extends ScheduledReporter(
    +      registry,
    +      "prometheus-reporter",
    +      MetricFilter.ALL,
    +      TimeUnit.SECONDS,
    +      TimeUnit.MILLISECONDS) {
    +
    +    val defaultSparkConf: SparkConf = new SparkConf(true)
    +
    +    override def report(
    +                         gauges: util.SortedMap[String, Gauge[_]],
    +                         counters: util.SortedMap[String, Counter],
    +                         histograms: util.SortedMap[String, Histogram],
    +                         meters: util.SortedMap[String, Meter],
    +                         timers: util.SortedMap[String, Timer]): Unit = {
    +
    +      // SparkEnv may become available only after metrics sink creation thus retrieving
    +      // SparkConf from spark env here and not during the creation/initialisation of PrometheusSink.
    +      val sparkConf: SparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(defaultSparkConf)
    +
    +      val metricsNamespace: Option[String] = sparkConf.get(METRICS_NAMESPACE)
    +      val sparkAppId: Option[String] = sparkConf.getOption("spark.app.id")
    +      val executorId: Option[String] = sparkConf.getOption("spark.executor.id")
    +
    +      logInfo(s"metricsNamespace=$metricsNamespace, sparkAppId=$sparkAppId, " +
    +        s"executorId=$executorId")
    +
    +      val role: String = (sparkAppId, executorId) match {
    +        case (Some(_), Some(SparkContext.DRIVER_IDENTIFIER)) => "driver"
    +        case (Some(_), Some(_)) => "executor"
    +        case _ => "shuffle"
    +      }
    +
    +      val job: String = role match {
    +        case "driver" => metricsNamespace.getOrElse(sparkAppId.get)
    +        case "executor" => metricsNamespace.getOrElse(sparkAppId.get)
    +        case _ => metricsNamespace.getOrElse("shuffle")
    +      }
    +      logInfo(s"role=$role, job=$job")
    +
    +      val groupingKey: Map[String, String] = (role, executorId) match {
    +        case ("driver", _) => Map("role" -> role)
    +        case ("executor", Some(id)) => Map ("role" -> role, "number" -> id)
    +        case _ => Map("role" -> role)
    +      }
    +
    +
    --- End diff --
    
    Nit: extra line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by erikerlandson <gi...@git.apache.org>.
Github user erikerlandson commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    Although this is not kube-specific, kubernetes deployment is a major prometheus use case. Has it been tested in a kube environment?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: Add support for publishing Spark metrics into Prometheus

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by smurakozi <gi...@git.apache.org>.
Github user smurakozi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r166886527
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import java.io.IOException;
    +import java.io.Writer;
    +import java.util.Enumeration;
    +
    +import io.prometheus.client.Collector;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class TextFormatWithTimestamp {
    +    private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class);
    +
    +    /**
    +     * Content-type for text version 0.0.4.
    +     */
    +    public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8";
    +
    +    private static StringBuilder jsonMessageLogBuilder = new StringBuilder();
    +
    +    public static void write004(Writer writer,
    +                                Enumeration<Collector.MetricFamilySamples> mfs)throws IOException {
    +        write004(writer, mfs, null);
    +    }
    +
    +    /**
    +     * Write out the text version 0.0.4 of the given MetricFamilySamples.
    +     */
    +    public static void write004(Writer writer,Enumeration<Collector.MetricFamilySamples> mfs,
    +                                String timestamp) throws IOException {
    +    /* See http://prometheus.io/docs/instrumenting/exposition_formats/
    +     * for the output format specification. */
    +    while(mfs.hasMoreElements()) {
    +        Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
    --- End diff --
    
    Also, method body is not indented well.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by stoader <gi...@git.apache.org>.
Github user stoader commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    @erikerlandson we tested this on Kubernetes using https://github.com/prometheus/pushgateway/tree/v0.3.1 and https://github.com/prometheus/pushgateway/tree/v0.4.0


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by smurakozi <gi...@git.apache.org>.
Github user smurakozi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r166883372
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java ---
    @@ -0,0 +1,320 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.net.HttpURLConnection;
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.net.URL;
    +import java.net.URLEncoder;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import java.io.BufferedWriter;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +
    +/**
    + * Export metrics via the Prometheus Pushgateway.
    + * <p>
    + * The Prometheus Pushgateway exists to allow ephemeral and
    + * batch jobs to expose their metrics to Prometheus.
    + * Since these kinds of jobs may not exist long enough to be scraped,
    + * they can instead push their metrics to a Pushgateway.
    + * This class allows pushing the contents of a {@link CollectorRegistry} to
    + * a Pushgateway.
    + * <p>
    + * Example usage:
    + * <pre>
    + * {@code
    + *   void executeBatchJob() throws Exception {
    + *     CollectorRegistry registry = new CollectorRegistry();
    + *     Gauge duration = Gauge.build()
    + *         .name("my_batch_job_duration_seconds")
    + *         .help("Duration of my batch job in seconds.")
    + *         .register(registry);
    + *     Gauge.Timer durationTimer = duration.startTimer();
    + *     try {
    + *       // Your code here.
    + *
    + *       // This is only added to the registry after success,
    + *       // so that a previous success in the Pushgateway isn't overwritten on failure.
    + *       Gauge lastSuccess = Gauge.build()
    + *           .name("my_batch_job_last_success")
    + *           .help("Last time my batch job succeeded, in unixtime.")
    + *           .register(registry);
    + *       lastSuccess.setToCurrentTime();
    + *     } finally {
    + *       durationTimer.setDuration();
    + *       PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091");
    + *       pg.pushAdd(registry, "my_batch_job");
    + *     }
    + *   }
    + * }
    + * </pre>
    + * <p>
    + * See <a href="https://github.com/prometheus/pushgateway">
    + *     https://github.com/prometheus/pushgateway</a>
    + */
    +public class PushGatewayWithTimestamp {
    +
    +    private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
    +    private final String address;
    +    private static final int SECONDS_PER_MILLISECOND = 1000;
    +    /**
    +     * Construct a Pushgateway, with the given address.
    +     * <p>
    +     * @param address  host:port or ip:port of the Pushgateway.
    +     */
    +    public PushGatewayWithTimestamp(String address) {
    +        this.address = address;
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry, String job) throws IOException {
    +        doRequest(registry, job, null, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(registry, job, groupingKey, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the same job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job, groupingKey);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,
    +                        String job, String timestamp) throws IOException {
    +        doRequest(registry, job, null, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, "");
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,String job,
    +                        Map<String, String> groupingKey, String timestamp) throws IOException {
    +        doRequest(registry, job, groupingKey, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job,
    +                        Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, groupingKey, null);
    +    }
    +
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with no grouping key and the provided job.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job) throws IOException {
    +        doRequest(null, job, null, "DELETE", null);
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with the provided job and grouping key.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(null, job, groupingKey, "DELETE", null);
    +    }
    +
    +
    +    /**
    +     * Pushes all metrics in a registry, replacing all those with the same job and instance.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(CollectorRegistry, String, Map)}
    +     */
    +    @Deprecated
    +    public void push(CollectorRegistry registry, String job, String instance) throws IOException {
    +        push(registry, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the same job and instance.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(Collector, String, Map)}
    +     */
    +    @Deprecated
    +    public void push(Collector collector, String job, String instance) throws IOException {
    +        push(collector, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     * @deprecated use {@link #pushAdd(Collector, String, Map)}
    +     */
    +    @Deprecated
    +    public void pushAdd(Collector collector, String job, String instance) throws IOException {
    +        pushAdd(collector, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * This uses the DELETE HTTP method.
    +     * @deprecated use {@link #delete(String, Map)}
    +     */
    +    @Deprecated
    +    public void delete(String job, String instance) throws IOException {
    +        delete(job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    void doRequest(CollectorRegistry registry, String job, Map<String,
    --- End diff --
    
    It should be private.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by stoader <gi...@git.apache.org>.
Github user stoader commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r167359010
  
    --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.metrics.sink
    +
    +import java.net.URI
    +import java.util
    +import java.util.Properties
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.JavaConverters._
    +import scala.util.Try
    +
    +import com.codahale.metrics._
    +import io.prometheus.client.CollectorRegistry
    +import io.prometheus.client.dropwizard.DropwizardExports
    +
    +import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config.METRICS_NAMESPACE
    +import org.apache.spark.metrics.MetricsSystem
    +import org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp
    +
    +
    +private[spark] class PrometheusSink(
    +                                     val property: Properties,
    +                                     val registry: MetricRegistry,
    +                                     securityMgr: SecurityManager)
    +  extends Sink with Logging {
    +
    +  protected class Reporter(registry: MetricRegistry)
    +    extends ScheduledReporter(
    +      registry,
    +      "prometheus-reporter",
    +      MetricFilter.ALL,
    +      TimeUnit.SECONDS,
    +      TimeUnit.MILLISECONDS) {
    +
    +    val defaultSparkConf: SparkConf = new SparkConf(true)
    +
    +    override def report(
    +                         gauges: util.SortedMap[String, Gauge[_]],
    +                         counters: util.SortedMap[String, Counter],
    +                         histograms: util.SortedMap[String, Histogram],
    +                         meters: util.SortedMap[String, Meter],
    +                         timers: util.SortedMap[String, Timer]): Unit = {
    +
    +      // SparkEnv may become available only after metrics sink creation thus retrieving
    +      // SparkConf from spark env here and not during the creation/initialisation of PrometheusSink.
    +      val sparkConf: SparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(defaultSparkConf)
    +
    +      val metricsNamespace: Option[String] = sparkConf.get(METRICS_NAMESPACE)
    +      val sparkAppId: Option[String] = sparkConf.getOption("spark.app.id")
    +      val executorId: Option[String] = sparkConf.getOption("spark.executor.id")
    +
    +      logInfo(s"metricsNamespace=$metricsNamespace, sparkAppId=$sparkAppId, " +
    +        s"executorId=$executorId")
    +
    +      val role: String = (sparkAppId, executorId) match {
    +        case (Some(_), Some(SparkContext.DRIVER_IDENTIFIER)) => "driver"
    +        case (Some(_), Some(_)) => "executor"
    +        case _ => "shuffle"
    +      }
    +
    +      val job: String = role match {
    +        case "driver" => metricsNamespace.getOrElse(sparkAppId.get)
    +        case "executor" => metricsNamespace.getOrElse(sparkAppId.get)
    +        case _ => metricsNamespace.getOrElse("shuffle")
    +      }
    +      logInfo(s"role=$role, job=$job")
    +
    +      val groupingKey: Map[String, String] = (role, executorId) match {
    +        case ("driver", _) => Map("role" -> role)
    +        case ("executor", Some(id)) => Map ("role" -> role, "number" -> id)
    +        case _ => Map("role" -> role)
    +      }
    +
    +
    --- End diff --
    
    Empty line removed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by smurakozi <gi...@git.apache.org>.
Github user smurakozi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r166883368
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import java.io.IOException;
    +import java.io.Writer;
    +import java.util.Enumeration;
    +
    +import io.prometheus.client.Collector;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class TextFormatWithTimestamp {
    --- End diff --
    
    No doc.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by smurakozi <gi...@git.apache.org>.
Github user smurakozi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r166881873
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java ---
    @@ -0,0 +1,320 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.net.HttpURLConnection;
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.net.URL;
    +import java.net.URLEncoder;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import java.io.BufferedWriter;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +
    +/**
    + * Export metrics via the Prometheus Pushgateway.
    + * <p>
    + * The Prometheus Pushgateway exists to allow ephemeral and
    + * batch jobs to expose their metrics to Prometheus.
    + * Since these kinds of jobs may not exist long enough to be scraped,
    + * they can instead push their metrics to a Pushgateway.
    + * This class allows pushing the contents of a {@link CollectorRegistry} to
    + * a Pushgateway.
    + * <p>
    + * Example usage:
    + * <pre>
    + * {@code
    + *   void executeBatchJob() throws Exception {
    + *     CollectorRegistry registry = new CollectorRegistry();
    + *     Gauge duration = Gauge.build()
    + *         .name("my_batch_job_duration_seconds")
    + *         .help("Duration of my batch job in seconds.")
    + *         .register(registry);
    + *     Gauge.Timer durationTimer = duration.startTimer();
    + *     try {
    + *       // Your code here.
    + *
    + *       // This is only added to the registry after success,
    + *       // so that a previous success in the Pushgateway isn't overwritten on failure.
    + *       Gauge lastSuccess = Gauge.build()
    + *           .name("my_batch_job_last_success")
    + *           .help("Last time my batch job succeeded, in unixtime.")
    + *           .register(registry);
    + *       lastSuccess.setToCurrentTime();
    + *     } finally {
    + *       durationTimer.setDuration();
    + *       PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091");
    + *       pg.pushAdd(registry, "my_batch_job");
    + *     }
    + *   }
    + * }
    + * </pre>
    + * <p>
    + * See <a href="https://github.com/prometheus/pushgateway">
    + *     https://github.com/prometheus/pushgateway</a>
    + */
    +public class PushGatewayWithTimestamp {
    +
    +    private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
    +    private final String address;
    +    private static final int SECONDS_PER_MILLISECOND = 1000;
    +    /**
    +     * Construct a Pushgateway, with the given address.
    +     * <p>
    +     * @param address  host:port or ip:port of the Pushgateway.
    +     */
    +    public PushGatewayWithTimestamp(String address) {
    +        this.address = address;
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry, String job) throws IOException {
    +        doRequest(registry, job, null, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(registry, job, groupingKey, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the same job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job, groupingKey);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,
    +                        String job, String timestamp) throws IOException {
    +        doRequest(registry, job, null, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, "");
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,String job,
    +                        Map<String, String> groupingKey, String timestamp) throws IOException {
    +        doRequest(registry, job, groupingKey, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job,
    +                        Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, groupingKey, null);
    +    }
    +
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with no grouping key and the provided job.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job) throws IOException {
    +        doRequest(null, job, null, "DELETE", null);
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with the provided job and grouping key.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(null, job, groupingKey, "DELETE", null);
    +    }
    +
    +
    +    /**
    +     * Pushes all metrics in a registry, replacing all those with the same job and instance.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(CollectorRegistry, String, Map)}
    +     */
    +    @Deprecated
    --- End diff --
    
    This is a new class, why should we include these deprecated methods? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    I agree this is useful to have, @jerryshao is probably right though, it is likely better to add extensibility into the Metrics system.
    
    @jerryshao I'll review your PR
    @matyix could you also review #11994 to see if that suits the need to build everything you have here to connect to Prometheus, but external to Spark and on top of #11994? I think your feedback will be very valuable.
    
    We can then come back to this PR as needed.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by matyix <gi...@git.apache.org>.
Github user matyix closed the pull request at:

    https://github.com/apache/spark/pull/19775


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by andrusha <gi...@git.apache.org>.
Github user andrusha commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    You can also try using https://github.com/andrusha/dropwizard-prometheus, which implements pull instead of push.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by smurakozi <gi...@git.apache.org>.
Github user smurakozi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r166886839
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import java.io.IOException;
    +import java.io.Writer;
    +import java.util.Enumeration;
    +
    +import io.prometheus.client.Collector;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class TextFormatWithTimestamp {
    +    private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class);
    +
    +    /**
    +     * Content-type for text version 0.0.4.
    +     */
    +    public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8";
    +
    +    private static StringBuilder jsonMessageLogBuilder = new StringBuilder();
    +
    +    public static void write004(Writer writer,
    +                                Enumeration<Collector.MetricFamilySamples> mfs)throws IOException {
    +        write004(writer, mfs, null);
    +    }
    +
    +    /**
    +     * Write out the text version 0.0.4 of the given MetricFamilySamples.
    +     */
    +    public static void write004(Writer writer,Enumeration<Collector.MetricFamilySamples> mfs,
    +                                String timestamp) throws IOException {
    +    /* See http://prometheus.io/docs/instrumenting/exposition_formats/
    +     * for the output format specification. */
    +    while(mfs.hasMoreElements()) {
    +        Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
    +
    +        logger.debug("Metrics data");
    +        logger.debug(metricFamilySamples.toString());
    +        logger.debug("Logging metrics as a json format:");
    +
    +
    +        writer.write("# HELP ");
    +        appendToJsonMessageLogBuilder("# HELP ");
    +        writer.write(metricFamilySamples.name);
    +        appendToJsonMessageLogBuilder(metricFamilySamples.name);
    +        writer.write(' ');
    +        appendToJsonMessageLogBuilder(' ');
    +        writeEscapedHelp(writer, metricFamilySamples.help);
    +        writer.write('\n');
    +        appendToJsonMessageLogBuilder('\n');
    +
    +        writer.write("# TYPE ");
    +        appendToJsonMessageLogBuilder("# TYPE ");
    +        writer.write(metricFamilySamples.name);
    +        appendToJsonMessageLogBuilder(metricFamilySamples.name);
    +        writer.write(' ');
    +        appendToJsonMessageLogBuilder(' ');
    +        writer.write(typeString(metricFamilySamples.type));
    +        appendToJsonMessageLogBuilder(typeString(metricFamilySamples.type));
    +        writer.write('\n');
    +        appendToJsonMessageLogBuilder('\n');
    +
    +        for (Collector.MetricFamilySamples.Sample sample: metricFamilySamples.samples) {
    +            writer.write(sample.name);
    +            appendToJsonMessageLogBuilder(sample.name);
    +            if (sample.labelNames.size() > 0) {
    +                writer.write('{');
    +                appendToJsonMessageLogBuilder('{');
    +                for (int i = 0; i < sample.labelNames.size(); ++i) {
    +                    writer.write(sample.labelNames.get(i));
    +                    appendToJsonMessageLogBuilder(sample.labelNames.get(i));
    +                    writer.write("=\"");
    +                    appendToJsonMessageLogBuilder("=\"");
    +                    writeEscapedLabelValue(writer, sample.labelValues.get(i));
    +                    writer.write("\",");
    +                    appendToJsonMessageLogBuilder("\",");
    +                }
    +                writer.write('}');
    +                appendToJsonMessageLogBuilder('}');
    +            }
    +            writer.write(' ');
    +            appendToJsonMessageLogBuilder(' ');
    +            writer.write(Collector.doubleToGoString(sample.value));
    +            appendToJsonMessageLogBuilder(Collector.doubleToGoString(sample.value));
    +            if(timestamp != null && !timestamp.isEmpty()) {
    +                writer.write(" " + timestamp);
    +                appendToJsonMessageLogBuilder(" " + timestamp);
    +            }
    +            writer.write('\n');
    +            appendToJsonMessageLogBuilder('\n');
    +        }
    +        logger.debug("JSON: "+ jsonMessageLogBuilder);
    +        }
    +    }
    +
    +    private static void writeEscapedHelp(Writer writer, String s) throws IOException {
    +        for (int i = 0; i < s.length(); i++) {
    +            char c = s.charAt(i);
    +            switch (c) {
    +                case '\\':
    +
    --- End diff --
    
    Nit: empty line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by GaalDornick <gi...@git.apache.org>.
Github user GaalDornick commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    So, where did we land on the discussion for this PR. Is this change getting in or is it out?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by andrusha <gi...@git.apache.org>.
Github user andrusha commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    You can use Kubernetes service discovery with conjunction with Prometheus to find all the available nodes for scraping. Then you just add in your annotation the host and port.
    https://prometheus.io/docs/prometheus/latest/configuration/configuration/#%3Ckubernetes_sd_config%3E <https://prometheus.io/docs/prometheus/latest/configuration/configuration/#<kubernetes_sd_config>> is a good place to start.
    I think it requires a blog post, but not sure if will ever do it.
    
    > On 16 May 2018, at 13:10, Goetz Epperlein <no...@github.com> wrote:
    > 
    > @andrusha <https://github.com/andrusha> Do you have a tutorial how to set this up. From my understanding if I have multiple executers pulling it is harder, as prometheus has to have all the hostnames. Am I right or wrong?
    > 
    > —
    > You are receiving this because you were mentioned.
    > Reply to this email directly, view it on GitHub <https://github.com/apache/spark/pull/19775#issuecomment-389481586>, or mute the thread <https://github.com/notifications/unsubscribe-auth/AAXqz9edXdn5rSat4-m9464k3NM-c2NJks5tzAkSgaJpZM4QiEPC>.
    > 
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by stoader <gi...@git.apache.org>.
Github user stoader commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r167359043
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import java.io.IOException;
    +import java.io.Writer;
    +import java.util.Enumeration;
    +
    +import io.prometheus.client.Collector;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class TextFormatWithTimestamp {
    +    private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class);
    +
    +    /**
    +     * Content-type for text version 0.0.4.
    +     */
    +    public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8";
    +
    +    private static StringBuilder jsonMessageLogBuilder = new StringBuilder();
    +
    +    public static void write004(Writer writer,
    --- End diff --
    
    Added doc.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by matyix <gi...@git.apache.org>.
Github user matyix commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    Hello @felixcheung @jerryshao
    
    The PR #11994 generally looks good for adding extensibility into the Metrics system. This PR (Prometheus) works with the changes proposed in PR #11994 out of the box, so no modification needed on our side.
    
    As Prometheus is (becoming) a widely used monitoring system personally I see a lot of value including this into the Spark code base, same as the current metrics. I see PR #11994 rather a solution for those who build custom specialised metrics sink.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    My original intention is to expose MetricsSystem related interface in #11994 , so that users can leverage such interface to build their own metrics sink/source out of Spark. Unfortunately I'm stuck on the #11994 , but still I think it is better to leave this as a package out of Spark, pulling to much dependencies for non-core functionalities seems not so reasonable (just my thoughts).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by mageru <gi...@git.apache.org>.
Github user mageru commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    Did we really miss out on Prometheus metric functionality because people couldn't just be cool about it? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by matyix <gi...@git.apache.org>.
Github user matyix commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    Hello @erikerlandson @felixcheung @jerryshao - any feedback on this PR? Shall I close it and not worry about this being merged upstream anymore? We've been using this in production for the last 3 months and it's a bit awkward that our CI/CD system needs to `patch` the upstream version all the time but we can live with that (since it's automated). Please advise. Happy to help to get it merge or eventually just close it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    Do we have to put this in Spark, is it a necessary part of k8s? I think if we pull in that PR(https://github.com/apache/spark/pull/11994), then this can be stayed out of Spark as a package. Even without #11994 , I believe users can still add their own Metrics source/sink via exposed SparkEnv/MetricsSystem. My concern is that this unnecessarily increases the code base of spark core.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    Regardless of the discussion about whether this should live in Spark, does it need to live in core?
    
    Can it be kept in a separate module like the Ganglia stuff (even though that one is for licensing reasons)?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by stoader <gi...@git.apache.org>.
Github user stoader commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r167359056
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import java.io.IOException;
    +import java.io.Writer;
    +import java.util.Enumeration;
    +
    +import io.prometheus.client.Collector;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class TextFormatWithTimestamp {
    --- End diff --
    
    Added doc.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by stoader <gi...@git.apache.org>.
Github user stoader commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    @smurakozi thank you for reviewing. The PR has been updated based on your comments.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by smurakozi <gi...@git.apache.org>.
Github user smurakozi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r166883609
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import java.io.IOException;
    +import java.io.Writer;
    +import java.util.Enumeration;
    +
    +import io.prometheus.client.Collector;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class TextFormatWithTimestamp {
    +    private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class);
    +
    +    /**
    +     * Content-type for text version 0.0.4.
    +     */
    +    public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8";
    +
    +    private static StringBuilder jsonMessageLogBuilder = new StringBuilder();
    +
    +    public static void write004(Writer writer,
    --- End diff --
    
    No doc


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by stoader <gi...@git.apache.org>.
Github user stoader commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r167359023
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import java.io.IOException;
    +import java.io.Writer;
    +import java.util.Enumeration;
    +
    +import io.prometheus.client.Collector;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class TextFormatWithTimestamp {
    +    private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class);
    +
    +    /**
    +     * Content-type for text version 0.0.4.
    +     */
    +    public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8";
    +
    +    private static StringBuilder jsonMessageLogBuilder = new StringBuilder();
    +
    +    public static void write004(Writer writer,
    +                                Enumeration<Collector.MetricFamilySamples> mfs)throws IOException {
    +        write004(writer, mfs, null);
    +    }
    +
    +    /**
    +     * Write out the text version 0.0.4 of the given MetricFamilySamples.
    +     */
    +    public static void write004(Writer writer,Enumeration<Collector.MetricFamilySamples> mfs,
    +                                String timestamp) throws IOException {
    +    /* See http://prometheus.io/docs/instrumenting/exposition_formats/
    +     * for the output format specification. */
    +    while(mfs.hasMoreElements()) {
    +        Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
    +
    +        logger.debug("Metrics data");
    +        logger.debug(metricFamilySamples.toString());
    +        logger.debug("Logging metrics as a json format:");
    +
    +
    +        writer.write("# HELP ");
    +        appendToJsonMessageLogBuilder("# HELP ");
    +        writer.write(metricFamilySamples.name);
    +        appendToJsonMessageLogBuilder(metricFamilySamples.name);
    +        writer.write(' ');
    +        appendToJsonMessageLogBuilder(' ');
    +        writeEscapedHelp(writer, metricFamilySamples.help);
    +        writer.write('\n');
    +        appendToJsonMessageLogBuilder('\n');
    +
    +        writer.write("# TYPE ");
    +        appendToJsonMessageLogBuilder("# TYPE ");
    +        writer.write(metricFamilySamples.name);
    +        appendToJsonMessageLogBuilder(metricFamilySamples.name);
    +        writer.write(' ');
    +        appendToJsonMessageLogBuilder(' ');
    +        writer.write(typeString(metricFamilySamples.type));
    +        appendToJsonMessageLogBuilder(typeString(metricFamilySamples.type));
    +        writer.write('\n');
    +        appendToJsonMessageLogBuilder('\n');
    +
    +        for (Collector.MetricFamilySamples.Sample sample: metricFamilySamples.samples) {
    +            writer.write(sample.name);
    +            appendToJsonMessageLogBuilder(sample.name);
    +            if (sample.labelNames.size() > 0) {
    +                writer.write('{');
    +                appendToJsonMessageLogBuilder('{');
    +                for (int i = 0; i < sample.labelNames.size(); ++i) {
    +                    writer.write(sample.labelNames.get(i));
    +                    appendToJsonMessageLogBuilder(sample.labelNames.get(i));
    +                    writer.write("=\"");
    +                    appendToJsonMessageLogBuilder("=\"");
    +                    writeEscapedLabelValue(writer, sample.labelValues.get(i));
    +                    writer.write("\",");
    +                    appendToJsonMessageLogBuilder("\",");
    +                }
    +                writer.write('}');
    +                appendToJsonMessageLogBuilder('}');
    +            }
    +            writer.write(' ');
    +            appendToJsonMessageLogBuilder(' ');
    +            writer.write(Collector.doubleToGoString(sample.value));
    +            appendToJsonMessageLogBuilder(Collector.doubleToGoString(sample.value));
    +            if(timestamp != null && !timestamp.isEmpty()) {
    +                writer.write(" " + timestamp);
    +                appendToJsonMessageLogBuilder(" " + timestamp);
    +            }
    +            writer.write('\n');
    +            appendToJsonMessageLogBuilder('\n');
    +        }
    +        logger.debug("JSON: "+ jsonMessageLogBuilder);
    +        }
    +    }
    +
    +    private static void writeEscapedHelp(Writer writer, String s) throws IOException {
    +        for (int i = 0; i < s.length(); i++) {
    +            char c = s.charAt(i);
    +            switch (c) {
    +                case '\\':
    +
    --- End diff --
    
    Empty line removed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

Posted by erikerlandson <gi...@git.apache.org>.
Github user erikerlandson commented on the issue:

    https://github.com/apache/spark/pull/19775
  
    I agree w/ @jerryshao that adding new deps to core isn't ideal.  (Also that having #11994 would be really nice)
    New deps on a sub-project seems more palatable, but interested in what other Sparkers think.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by stoader <gi...@git.apache.org>.
Github user stoader commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r167357250
  
    --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.metrics.sink
    +
    +import java.net.URI
    +import java.util
    +import java.util.Properties
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.JavaConverters._
    +import scala.util.Try
    +
    +import com.codahale.metrics._
    +import io.prometheus.client.CollectorRegistry
    +import io.prometheus.client.dropwizard.DropwizardExports
    +
    +import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config.METRICS_NAMESPACE
    +import org.apache.spark.metrics.MetricsSystem
    +import org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp
    +
    +
    +private[spark] class PrometheusSink(
    +                                     val property: Properties,
    +                                     val registry: MetricRegistry,
    +                                     securityMgr: SecurityManager)
    --- End diff --
    
    The parameter list for a `Sink` is imposed by`MetricsSystem` which instantiates the configured sinks (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L199). `PrometheusSink` doesn't need `SecurityManager` this is why `securityMgr` is not used.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by stoader <gi...@git.apache.org>.
Github user stoader commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r167359075
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java ---
    @@ -0,0 +1,320 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.net.HttpURLConnection;
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.net.URL;
    +import java.net.URLEncoder;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import java.io.BufferedWriter;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +
    +/**
    + * Export metrics via the Prometheus Pushgateway.
    + * <p>
    + * The Prometheus Pushgateway exists to allow ephemeral and
    + * batch jobs to expose their metrics to Prometheus.
    + * Since these kinds of jobs may not exist long enough to be scraped,
    + * they can instead push their metrics to a Pushgateway.
    + * This class allows pushing the contents of a {@link CollectorRegistry} to
    + * a Pushgateway.
    + * <p>
    + * Example usage:
    + * <pre>
    + * {@code
    + *   void executeBatchJob() throws Exception {
    + *     CollectorRegistry registry = new CollectorRegistry();
    + *     Gauge duration = Gauge.build()
    + *         .name("my_batch_job_duration_seconds")
    + *         .help("Duration of my batch job in seconds.")
    + *         .register(registry);
    + *     Gauge.Timer durationTimer = duration.startTimer();
    + *     try {
    + *       // Your code here.
    + *
    + *       // This is only added to the registry after success,
    + *       // so that a previous success in the Pushgateway isn't overwritten on failure.
    + *       Gauge lastSuccess = Gauge.build()
    + *           .name("my_batch_job_last_success")
    + *           .help("Last time my batch job succeeded, in unixtime.")
    + *           .register(registry);
    + *       lastSuccess.setToCurrentTime();
    + *     } finally {
    + *       durationTimer.setDuration();
    + *       PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091");
    + *       pg.pushAdd(registry, "my_batch_job");
    + *     }
    + *   }
    + * }
    + * </pre>
    + * <p>
    + * See <a href="https://github.com/prometheus/pushgateway">
    + *     https://github.com/prometheus/pushgateway</a>
    + */
    +public class PushGatewayWithTimestamp {
    +
    +    private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
    +    private final String address;
    +    private static final int SECONDS_PER_MILLISECOND = 1000;
    +    /**
    +     * Construct a Pushgateway, with the given address.
    +     * <p>
    +     * @param address  host:port or ip:port of the Pushgateway.
    +     */
    +    public PushGatewayWithTimestamp(String address) {
    +        this.address = address;
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry, String job) throws IOException {
    +        doRequest(registry, job, null, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(registry, job, groupingKey, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the same job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector,
    +                     String job, Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job, groupingKey);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,
    +                        String job, String timestamp) throws IOException {
    +        doRequest(registry, job, null, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name and job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, "");
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,String job,
    +                        Map<String, String> groupingKey, String timestamp) throws IOException {
    +        doRequest(registry, job, groupingKey, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name, job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job,
    +                        Map<String, String> groupingKey) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, groupingKey, null);
    +    }
    +
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with no grouping key and the provided job.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job) throws IOException {
    +        doRequest(null, job, null, "DELETE", null);
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with the provided job and grouping key.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job, Map<String, String> groupingKey) throws IOException {
    +        doRequest(null, job, groupingKey, "DELETE", null);
    +    }
    +
    +
    +    /**
    +     * Pushes all metrics in a registry, replacing all those with the same job and instance.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(CollectorRegistry, String, Map)}
    +     */
    +    @Deprecated
    +    public void push(CollectorRegistry registry, String job, String instance) throws IOException {
    +        push(registry, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the same job and instance.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(Collector, String, Map)}
    +     */
    +    @Deprecated
    +    public void push(Collector collector, String job, String instance) throws IOException {
    +        push(collector, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     * @deprecated use {@link #pushAdd(Collector, String, Map)}
    +     */
    +    @Deprecated
    +    public void pushAdd(Collector collector, String job, String instance) throws IOException {
    +        pushAdd(collector, job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * This uses the DELETE HTTP method.
    +     * @deprecated use {@link #delete(String, Map)}
    +     */
    +    @Deprecated
    +    public void delete(String job, String instance) throws IOException {
    +        delete(job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    void doRequest(CollectorRegistry registry, String job, Map<String,
    --- End diff --
    
    Changed it to private.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by stoader <gi...@git.apache.org>.
Github user stoader commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r167359036
  
    --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import java.io.IOException;
    +import java.io.Writer;
    +import java.util.Enumeration;
    +
    +import io.prometheus.client.Collector;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class TextFormatWithTimestamp {
    +    private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class);
    +
    +    /**
    +     * Content-type for text version 0.0.4.
    +     */
    +    public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8";
    +
    +    private static StringBuilder jsonMessageLogBuilder = new StringBuilder();
    +
    +    public static void write004(Writer writer,
    +                                Enumeration<Collector.MetricFamilySamples> mfs)throws IOException {
    +        write004(writer, mfs, null);
    +    }
    +
    +    /**
    +     * Write out the text version 0.0.4 of the given MetricFamilySamples.
    +     */
    +    public static void write004(Writer writer,Enumeration<Collector.MetricFamilySamples> mfs,
    +                                String timestamp) throws IOException {
    +    /* See http://prometheus.io/docs/instrumenting/exposition_formats/
    +     * for the output format specification. */
    +    while(mfs.hasMoreElements()) {
    +        Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
    --- End diff --
    
    This class has been refactored.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by smurakozi <gi...@git.apache.org>.
Github user smurakozi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r166894977
  
    --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.metrics.sink
    +
    +import java.net.URI
    +import java.util
    +import java.util.Properties
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.JavaConverters._
    +import scala.util.Try
    +
    +import com.codahale.metrics._
    +import io.prometheus.client.CollectorRegistry
    +import io.prometheus.client.dropwizard.DropwizardExports
    +
    +import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config.METRICS_NAMESPACE
    +import org.apache.spark.metrics.MetricsSystem
    +import org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp
    +
    +
    +private[spark] class PrometheusSink(
    +                                     val property: Properties,
    +                                     val registry: MetricRegistry,
    +                                     securityMgr: SecurityManager)
    --- End diff --
    
    `securityMgr` is never used


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

Posted by stoader <gi...@git.apache.org>.
Github user stoader commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r167359005
  
    --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.metrics.sink
    +
    +import java.net.URI
    +import java.util
    +import java.util.Properties
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.JavaConverters._
    +import scala.util.Try
    +
    +import com.codahale.metrics._
    +import io.prometheus.client.CollectorRegistry
    +import io.prometheus.client.dropwizard.DropwizardExports
    +
    +import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config.METRICS_NAMESPACE
    +import org.apache.spark.metrics.MetricsSystem
    +import org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp
    +
    +
    +private[spark] class PrometheusSink(
    +                                     val property: Properties,
    +                                     val registry: MetricRegistry,
    +                                     securityMgr: SecurityManager)
    +  extends Sink with Logging {
    +
    +  protected class Reporter(registry: MetricRegistry)
    +    extends ScheduledReporter(
    +      registry,
    +      "prometheus-reporter",
    +      MetricFilter.ALL,
    +      TimeUnit.SECONDS,
    +      TimeUnit.MILLISECONDS) {
    +
    +    val defaultSparkConf: SparkConf = new SparkConf(true)
    +
    +    override def report(
    +                         gauges: util.SortedMap[String, Gauge[_]],
    +                         counters: util.SortedMap[String, Counter],
    +                         histograms: util.SortedMap[String, Histogram],
    +                         meters: util.SortedMap[String, Meter],
    +                         timers: util.SortedMap[String, Timer]): Unit = {
    +
    +      // SparkEnv may become available only after metrics sink creation thus retrieving
    +      // SparkConf from spark env here and not during the creation/initialisation of PrometheusSink.
    +      val sparkConf: SparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(defaultSparkConf)
    +
    +      val metricsNamespace: Option[String] = sparkConf.get(METRICS_NAMESPACE)
    +      val sparkAppId: Option[String] = sparkConf.getOption("spark.app.id")
    +      val executorId: Option[String] = sparkConf.getOption("spark.executor.id")
    +
    +      logInfo(s"metricsNamespace=$metricsNamespace, sparkAppId=$sparkAppId, " +
    +        s"executorId=$executorId")
    +
    +      val role: String = (sparkAppId, executorId) match {
    +        case (Some(_), Some(SparkContext.DRIVER_IDENTIFIER)) => "driver"
    +        case (Some(_), Some(_)) => "executor"
    +        case _ => "shuffle"
    +      }
    +
    +      val job: String = role match {
    +        case "driver" => metricsNamespace.getOrElse(sparkAppId.get)
    +        case "executor" => metricsNamespace.getOrElse(sparkAppId.get)
    +        case _ => metricsNamespace.getOrElse("shuffle")
    +      }
    +      logInfo(s"role=$role, job=$job")
    +
    +      val groupingKey: Map[String, String] = (role, executorId) match {
    +        case ("driver", _) => Map("role" -> role)
    +        case ("executor", Some(id)) => Map ("role" -> role, "number" -> id)
    +        case _ => Map("role" -> role)
    +      }
    +
    +
    +      pushGateway.pushAdd(pushRegistry, job, groupingKey.asJava,
    +        s"${System.currentTimeMillis}")
    +
    +    }
    +
    +  }
    +
    +  val DEFAULT_PUSH_PERIOD: Int = 10
    +  val DEFAULT_PUSH_PERIOD_UNIT: TimeUnit = TimeUnit.SECONDS
    +  val DEFAULT_PUSHGATEWAY_ADDRESS: String = "127.0.0.1:9091"
    +  val DEFAULT_PUSHGATEWAY_ADDRESS_PROTOCOL: String = "http"
    +
    +  val KEY_PUSH_PERIOD = "period"
    +  val KEY_PUSH_PERIOD_UNIT = "unit"
    +  val KEY_PUSHGATEWAY_ADDRESS = "pushgateway-address"
    +  val KEY_PUSHGATEWAY_ADDRESS_PROTOCOL = "pushgateway-address-protocol"
    +
    +
    +  val pollPeriod: Int =
    +    Option(property.getProperty(KEY_PUSH_PERIOD))
    +      .map(_.toInt)
    +      .getOrElse(DEFAULT_PUSH_PERIOD)
    +
    +  val pollUnit: TimeUnit =
    +    Option(property.getProperty(KEY_PUSH_PERIOD_UNIT))
    +      .map { s => TimeUnit.valueOf(s.toUpperCase) }
    +      .getOrElse(DEFAULT_PUSH_PERIOD_UNIT)
    +
    +  val pushGatewayAddress =
    +    Option(property.getProperty(KEY_PUSHGATEWAY_ADDRESS))
    +      .getOrElse(DEFAULT_PUSHGATEWAY_ADDRESS)
    +
    +  val pushGatewayAddressProtocol =
    +    Option(property.getProperty(KEY_PUSHGATEWAY_ADDRESS_PROTOCOL))
    +      .getOrElse(DEFAULT_PUSHGATEWAY_ADDRESS_PROTOCOL)
    +
    +  // validate pushgateway host:port
    +  Try(new URI(s"$pushGatewayAddressProtocol://$pushGatewayAddress")).get
    +
    +  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
    +
    +  logInfo("Initializing Prometheus Sink...")
    +  logInfo(s"Metrics polling period -> $pollPeriod $pollUnit")
    +  logInfo(s"$KEY_PUSHGATEWAY_ADDRESS -> $pushGatewayAddress")
    +  logInfo(s"$KEY_PUSHGATEWAY_ADDRESS_PROTOCOL -> $pushGatewayAddressProtocol")
    +
    +  val pushRegistry: CollectorRegistry = new CollectorRegistry()
    +  val sparkMetricExports: DropwizardExports = new DropwizardExports(registry)
    +  val pushGateway: PushGatewayWithTimestamp =
    +    new PushGatewayWithTimestamp(s"$pushGatewayAddressProtocol://$pushGatewayAddress")
    +
    +  val reporter = new Reporter(registry)
    +
    +  override def start(): Unit = {
    +    sparkMetricExports.register(pushRegistry)
    +
    --- End diff --
    
    Empty line removed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org