You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by db...@apache.org on 2020/05/12 19:59:32 UTC

[spark] branch branch-3.0 updated: [SPARK-31683][CORE] Make Prometheus output consistent with DropWizard 4.1 result

This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e892a01  [SPARK-31683][CORE] Make Prometheus output consistent with DropWizard 4.1 result
e892a01 is described below

commit e892a016699d996b959b4db01242cff934d62f76
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Tue May 12 19:57:48 2020 +0000

    [SPARK-31683][CORE] Make Prometheus output consistent with DropWizard 4.1 result
    
    ### What changes were proposed in this pull request?
    
    This PR aims to update Prometheus-related output format to be consistent with DropWizard 4.1 result.
    - Add `Number` metrics for gauges metrics.
    - Add `type` labels.
    
    ### Why are the changes needed?
    
    SPARK-29032 added Prometheus support. After that, SPARK-29674 upgraded DropWizard for JDK9+ support and this caused difference in output labels and number of keys for Guage metrics. The current status is different from Apache Spark 2.4.5. Since we cannot change DropWizard, this PR aims to be consistent in Apache Spark 3.0.0 only.
    
    **DropWizard 3.x**
    ```
    metrics_master_aliveWorkers_Value 1.0
    ```
    
    **DropWizard 4.1**
    ```
    metrics_master_aliveWorkers_Value{type="gauges",} 1.0
    metrics_master_aliveWorkers_Number{type="gauges",} 1.0
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but this is a new feature in 3.0.0.
    
    ### How was this patch tested?
    
    Manually check the output like the following.
    
    **JMXExporter Result**
    ```
    $ curl -s http://localhost:8088/ | grep "^metrics_master" | sort
    metrics_master_aliveWorkers_Number{type="gauges",} 1.0
    metrics_master_aliveWorkers_Value{type="gauges",} 1.0
    metrics_master_apps_Number{type="gauges",} 0.0
    metrics_master_apps_Value{type="gauges",} 0.0
    metrics_master_waitingApps_Number{type="gauges",} 0.0
    metrics_master_waitingApps_Value{type="gauges",} 0.0
    metrics_master_workers_Number{type="gauges",} 1.0
    metrics_master_workers_Value{type="gauges",} 1.0
    ```
    
    **This PR**
    ```
    $ curl -s http://localhost:8080/metrics/master/prometheus/ | grep master
    metrics_master_aliveWorkers_Number{type="gauges"} 1
    metrics_master_aliveWorkers_Value{type="gauges"} 1
    metrics_master_apps_Number{type="gauges"} 0
    metrics_master_apps_Value{type="gauges"} 0
    metrics_master_waitingApps_Number{type="gauges"} 0
    metrics_master_waitingApps_Value{type="gauges"} 0
    metrics_master_workers_Number{type="gauges"} 1
    metrics_master_workers_Value{type="gauges"} 1
    ```
    
    Closes #28510 from dongjoon-hyun/SPARK-31683.
    
    Authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: DB Tsai <d_...@apple.com>
    (cherry picked from commit 07209f3e2deab824f04484fa6b8bab0ec0a635d6)
    Signed-off-by: DB Tsai <d_...@apple.com>
---
 .../spark/metrics/sink/PrometheusServlet.scala     | 73 ++++++++++++----------
 .../spark/status/api/v1/PrometheusResource.scala   | 52 +++++++--------
 2 files changed, 67 insertions(+), 58 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala
index 011c7bc..59b863b 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala
@@ -56,58 +56,65 @@ private[spark] class PrometheusServlet(
   def getMetricsSnapshot(request: HttpServletRequest): String = {
     import scala.collection.JavaConverters._
 
+    val guagesLabel = """{type="gauges"}"""
+    val countersLabel = """{type="counters"}"""
+    val metersLabel = countersLabel
+    val histogramslabels = """{type="histograms"}"""
+    val timersLabels = """{type="timers"}"""
+
     val sb = new StringBuilder()
     registry.getGauges.asScala.foreach { case (k, v) =>
       if (!v.getValue.isInstanceOf[String]) {
-        sb.append(s"${normalizeKey(k)}Value ${v.getValue}\n")
+        sb.append(s"${normalizeKey(k)}Number$guagesLabel ${v.getValue}\n")
+        sb.append(s"${normalizeKey(k)}Value$guagesLabel ${v.getValue}\n")
       }
     }
     registry.getCounters.asScala.foreach { case (k, v) =>
-      sb.append(s"${normalizeKey(k)}Count ${v.getCount}\n")
+      sb.append(s"${normalizeKey(k)}Count$countersLabel ${v.getCount}\n")
     }
     registry.getHistograms.asScala.foreach { case (k, h) =>
       val snapshot = h.getSnapshot
       val prefix = normalizeKey(k)
-      sb.append(s"${prefix}Count ${h.getCount}\n")
-      sb.append(s"${prefix}Max ${snapshot.getMax}\n")
-      sb.append(s"${prefix}Mean ${snapshot.getMean}\n")
-      sb.append(s"${prefix}Min ${snapshot.getMin}\n")
-      sb.append(s"${prefix}50thPercentile ${snapshot.getMedian}\n")
-      sb.append(s"${prefix}75thPercentile ${snapshot.get75thPercentile}\n")
-      sb.append(s"${prefix}95thPercentile ${snapshot.get95thPercentile}\n")
-      sb.append(s"${prefix}98thPercentile ${snapshot.get98thPercentile}\n")
-      sb.append(s"${prefix}99thPercentile ${snapshot.get99thPercentile}\n")
-      sb.append(s"${prefix}999thPercentile ${snapshot.get999thPercentile}\n")
-      sb.append(s"${prefix}StdDev ${snapshot.getStdDev}\n")
+      sb.append(s"${prefix}Count$histogramslabels ${h.getCount}\n")
+      sb.append(s"${prefix}Max$histogramslabels ${snapshot.getMax}\n")
+      sb.append(s"${prefix}Mean$histogramslabels ${snapshot.getMean}\n")
+      sb.append(s"${prefix}Min$histogramslabels ${snapshot.getMin}\n")
+      sb.append(s"${prefix}50thPercentile$histogramslabels ${snapshot.getMedian}\n")
+      sb.append(s"${prefix}75thPercentile$histogramslabels ${snapshot.get75thPercentile}\n")
+      sb.append(s"${prefix}95thPercentile$histogramslabels ${snapshot.get95thPercentile}\n")
+      sb.append(s"${prefix}98thPercentile$histogramslabels ${snapshot.get98thPercentile}\n")
+      sb.append(s"${prefix}99thPercentile$histogramslabels ${snapshot.get99thPercentile}\n")
+      sb.append(s"${prefix}999thPercentile$histogramslabels ${snapshot.get999thPercentile}\n")
+      sb.append(s"${prefix}StdDev$histogramslabels ${snapshot.getStdDev}\n")
     }
     registry.getMeters.entrySet.iterator.asScala.foreach { kv =>
       val prefix = normalizeKey(kv.getKey)
       val meter = kv.getValue
-      sb.append(s"${prefix}Count ${meter.getCount}\n")
-      sb.append(s"${prefix}MeanRate ${meter.getMeanRate}\n")
-      sb.append(s"${prefix}OneMinuteRate ${meter.getOneMinuteRate}\n")
-      sb.append(s"${prefix}FiveMinuteRate ${meter.getFiveMinuteRate}\n")
-      sb.append(s"${prefix}FifteenMinuteRate ${meter.getFifteenMinuteRate}\n")
+      sb.append(s"${prefix}Count$metersLabel ${meter.getCount}\n")
+      sb.append(s"${prefix}MeanRate$metersLabel ${meter.getMeanRate}\n")
+      sb.append(s"${prefix}OneMinuteRate$metersLabel ${meter.getOneMinuteRate}\n")
+      sb.append(s"${prefix}FiveMinuteRate$metersLabel ${meter.getFiveMinuteRate}\n")
+      sb.append(s"${prefix}FifteenMinuteRate$metersLabel ${meter.getFifteenMinuteRate}\n")
     }
     registry.getTimers.entrySet.iterator.asScala.foreach { kv =>
       val prefix = normalizeKey(kv.getKey)
       val timer = kv.getValue
       val snapshot = timer.getSnapshot
-      sb.append(s"${prefix}Count ${timer.getCount}\n")
-      sb.append(s"${prefix}Max ${snapshot.getMax}\n")
-      sb.append(s"${prefix}Mean ${snapshot.getMax}\n")
-      sb.append(s"${prefix}Min ${snapshot.getMin}\n")
-      sb.append(s"${prefix}50thPercentile ${snapshot.getMedian}\n")
-      sb.append(s"${prefix}75thPercentile ${snapshot.get75thPercentile}\n")
-      sb.append(s"${prefix}95thPercentile ${snapshot.get95thPercentile}\n")
-      sb.append(s"${prefix}98thPercentile ${snapshot.get98thPercentile}\n")
-      sb.append(s"${prefix}99thPercentile ${snapshot.get99thPercentile}\n")
-      sb.append(s"${prefix}999thPercentile ${snapshot.get999thPercentile}\n")
-      sb.append(s"${prefix}StdDev ${snapshot.getStdDev}\n")
-      sb.append(s"${prefix}FifteenMinuteRate ${timer.getFifteenMinuteRate}\n")
-      sb.append(s"${prefix}FiveMinuteRate ${timer.getFiveMinuteRate}\n")
-      sb.append(s"${prefix}OneMinuteRate ${timer.getOneMinuteRate}\n")
-      sb.append(s"${prefix}MeanRate ${timer.getMeanRate}\n")
+      sb.append(s"${prefix}Count$timersLabels ${timer.getCount}\n")
+      sb.append(s"${prefix}Max$timersLabels ${snapshot.getMax}\n")
+      sb.append(s"${prefix}Mean$timersLabels ${snapshot.getMax}\n")
+      sb.append(s"${prefix}Min$timersLabels ${snapshot.getMin}\n")
+      sb.append(s"${prefix}50thPercentile$timersLabels ${snapshot.getMedian}\n")
+      sb.append(s"${prefix}75thPercentile$timersLabels ${snapshot.get75thPercentile}\n")
+      sb.append(s"${prefix}95thPercentile$timersLabels ${snapshot.get95thPercentile}\n")
+      sb.append(s"${prefix}98thPercentile$timersLabels ${snapshot.get98thPercentile}\n")
+      sb.append(s"${prefix}99thPercentile$timersLabels ${snapshot.get99thPercentile}\n")
+      sb.append(s"${prefix}999thPercentile$timersLabels ${snapshot.get999thPercentile}\n")
+      sb.append(s"${prefix}StdDev$timersLabels ${snapshot.getStdDev}\n")
+      sb.append(s"${prefix}FifteenMinuteRate$timersLabels ${timer.getFifteenMinuteRate}\n")
+      sb.append(s"${prefix}FiveMinuteRate$timersLabels ${timer.getFiveMinuteRate}\n")
+      sb.append(s"${prefix}OneMinuteRate$timersLabels ${timer.getOneMinuteRate}\n")
+      sb.append(s"${prefix}MeanRate$timersLabels ${timer.getMeanRate}\n")
     }
     sb.toString()
   }
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala
index 2a5f151..4ed3d45 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala
@@ -50,27 +50,27 @@ private[v1] class PrometheusResource extends ApiRequestContext {
         "application_name" -> store.applicationInfo.name,
         "executor_id" -> executor.id
       ).map { case (k, v) => s"""$k="$v"""" }.mkString("{", ", ", "}")
-      sb.append(s"${prefix}rddBlocks_Count$labels ${executor.rddBlocks}\n")
-      sb.append(s"${prefix}memoryUsed_Count$labels ${executor.memoryUsed}\n")
-      sb.append(s"${prefix}diskUsed_Count$labels ${executor.diskUsed}\n")
-      sb.append(s"${prefix}totalCores_Count$labels ${executor.totalCores}\n")
-      sb.append(s"${prefix}maxTasks_Count$labels ${executor.maxTasks}\n")
-      sb.append(s"${prefix}activeTasks_Count$labels ${executor.activeTasks}\n")
-      sb.append(s"${prefix}failedTasks_Count$labels ${executor.failedTasks}\n")
-      sb.append(s"${prefix}completedTasks_Count$labels ${executor.completedTasks}\n")
-      sb.append(s"${prefix}totalTasks_Count$labels ${executor.totalTasks}\n")
-      sb.append(s"${prefix}totalDuration_Value$labels ${executor.totalDuration}\n")
-      sb.append(s"${prefix}totalGCTime_Value$labels ${executor.totalGCTime}\n")
-      sb.append(s"${prefix}totalInputBytes_Count$labels ${executor.totalInputBytes}\n")
-      sb.append(s"${prefix}totalShuffleRead_Count$labels ${executor.totalShuffleRead}\n")
-      sb.append(s"${prefix}totalShuffleWrite_Count$labels ${executor.totalShuffleWrite}\n")
-      sb.append(s"${prefix}maxMemory_Count$labels ${executor.maxMemory}\n")
+      sb.append(s"${prefix}rddBlocks$labels ${executor.rddBlocks}\n")
+      sb.append(s"${prefix}memoryUsed_bytes$labels ${executor.memoryUsed}\n")
+      sb.append(s"${prefix}diskUsed_bytes$labels ${executor.diskUsed}\n")
+      sb.append(s"${prefix}totalCores$labels ${executor.totalCores}\n")
+      sb.append(s"${prefix}maxTasks$labels ${executor.maxTasks}\n")
+      sb.append(s"${prefix}activeTasks$labels ${executor.activeTasks}\n")
+      sb.append(s"${prefix}failedTasks_total$labels ${executor.failedTasks}\n")
+      sb.append(s"${prefix}completedTasks_total$labels ${executor.completedTasks}\n")
+      sb.append(s"${prefix}totalTasks_total$labels ${executor.totalTasks}\n")
+      sb.append(s"${prefix}totalDuration_seconds_total$labels ${executor.totalDuration * 0.001}\n")
+      sb.append(s"${prefix}totalGCTime_seconds_total$labels ${executor.totalGCTime * 0.001}\n")
+      sb.append(s"${prefix}totalInputBytes_bytes_total$labels ${executor.totalInputBytes}\n")
+      sb.append(s"${prefix}totalShuffleRead_bytes_total$labels ${executor.totalShuffleRead}\n")
+      sb.append(s"${prefix}totalShuffleWrite_bytes_total$labels ${executor.totalShuffleWrite}\n")
+      sb.append(s"${prefix}maxMemory_bytes$labels ${executor.maxMemory}\n")
       executor.executorLogs.foreach { case (k, v) => }
       executor.memoryMetrics.foreach { m =>
-        sb.append(s"${prefix}usedOnHeapStorageMemory_Count$labels ${m.usedOnHeapStorageMemory}\n")
-        sb.append(s"${prefix}usedOffHeapStorageMemory_Count$labels ${m.usedOffHeapStorageMemory}\n")
-        sb.append(s"${prefix}totalOnHeapStorageMemory_Count$labels ${m.totalOnHeapStorageMemory}\n")
-        sb.append(s"${prefix}totalOffHeapStorageMemory_Count$labels " +
+        sb.append(s"${prefix}usedOnHeapStorageMemory_bytes$labels ${m.usedOnHeapStorageMemory}\n")
+        sb.append(s"${prefix}usedOffHeapStorageMemory_bytes$labels ${m.usedOffHeapStorageMemory}\n")
+        sb.append(s"${prefix}totalOnHeapStorageMemory_bytes$labels ${m.totalOnHeapStorageMemory}\n")
+        sb.append(s"${prefix}totalOffHeapStorageMemory_bytes$labels " +
           s"${m.totalOffHeapStorageMemory}\n")
       }
       executor.peakMemoryMetrics.foreach { m =>
@@ -90,14 +90,16 @@ private[v1] class PrometheusResource extends ApiRequestContext {
           "ProcessTreePythonVMemory",
           "ProcessTreePythonRSSMemory",
           "ProcessTreeOtherVMemory",
-          "ProcessTreeOtherRSSMemory",
-          "MinorGCCount",
-          "MinorGCTime",
-          "MajorGCCount",
-          "MajorGCTime"
+          "ProcessTreeOtherRSSMemory"
         )
         names.foreach { name =>
-          sb.append(s"$prefix${name}_Count$labels ${m.getMetricValue(name)}\n")
+          sb.append(s"$prefix${name}_bytes$labels ${m.getMetricValue(name)}\n")
+        }
+        Seq("MinorGCCount", "MajorGCCount").foreach { name =>
+          sb.append(s"$prefix${name}_total$labels ${m.getMetricValue(name)}\n")
+        }
+        Seq("MinorGCTime", "MajorGCTime").foreach { name =>
+          sb.append(s"$prefix${name}_seconds_total$labels ${m.getMetricValue(name) * 0.001}\n")
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org