You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/03/05 18:48:01 UTC

[spark] branch master updated: [SPARK-26928][CORE] Add driver CPU Time to the metrics system

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 25d2850  [SPARK-26928][CORE] Add driver CPU Time to the metrics system
25d2850 is described below

commit 25d28506652625c86b569dffa99e732a191ee626
Author: Luca Canali <lu...@cern.ch>
AuthorDate: Tue Mar 5 10:47:39 2019 -0800

    [SPARK-26928][CORE] Add driver CPU Time to the metrics system
    
    ## What changes were proposed in this pull request?
    
    This proposes to add instrumentation for the driver's JVM CPU time via the Spark Dropwizard/Codahale metrics system. It follows directly from previous work SPARK-25228 and shares similar motivations: it is intended as an improvement to be used for Spark performance dashboards and monitoring tools/instrumentation.
    
    Implementation details: this PR takes the code introduced in SPARK-25228 and moves it to a new separate Source JVMCPUSource, which is then used to register the jvmCpuTime gauge metric for both executor and driver.
    The registration of the jvmCpuTime metric for the driver is conditional, a new configuration parameter `spark.metrics.cpu.time.driver.enabled` (proposed default: false) is introduced for this purpose.
    
    ## How was this patch tested?
    
    Manually tested, using local mode and using YARN.
    
    Closes #23838 from LucaCanali/addCPUTimeMetricDriver.
    
    Authored-by: Luca Canali <lu...@cern.ch>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../main/scala/org/apache/spark/SparkContext.scala |  2 +
 .../scala/org/apache/spark/executor/Executor.scala |  2 +
 .../org/apache/spark/executor/ExecutorSource.scala | 18 --------
 .../org/apache/spark/metrics/source/JVMCPU.scala   | 48 ++++++++++++++++++++++
 docs/monitoring.md                                 |  7 +++-
 5 files changed, 58 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index dc0ea24..01d9792 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -48,6 +48,7 @@ import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Tests._
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.io.CompressionCodec
+import org.apache.spark.metrics.source.JVMCPUSource
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
 import org.apache.spark.rpc.RpcEndpointRef
@@ -568,6 +569,7 @@ class SparkContext(config: SparkConf) extends Logging {
     _taskScheduler.postStartHook()
     _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
     _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
+    _env.metricsSystem.registerSource(new JVMCPUSource())
     _executorAllocationManager.foreach { e =>
       _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
     }
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 6b23b26..740080e 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -38,6 +38,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
+import org.apache.spark.metrics.source.JVMCPUSource
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.scheduler._
 import org.apache.spark.shuffle.FetchFailedException
@@ -117,6 +118,7 @@ private[spark] class Executor(
   if (!isLocal) {
     env.blockManager.initialize(conf.getAppId)
     env.metricsSystem.registerSource(executorSource)
+    env.metricsSystem.registerSource(new JVMCPUSource())
     env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index a826402..e852376 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -76,24 +76,6 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends
     registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
   }
 
-  // Dropwizard metrics gauge measuring the executor's process CPU time.
-  // This Gauge will try to get and return the JVM Process CPU time or return -1 otherwise.
-  // The CPU time value is returned in nanoseconds.
-  // It will use proprietary extensions such as com.sun.management.OperatingSystemMXBean or
-  // com.ibm.lang.management.OperatingSystemMXBean, if available.
-  metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] {
-    val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer
-    val name = new ObjectName("java.lang", "type", "OperatingSystem")
-    override def getValue: Long = {
-      try {
-        // return JVM process CPU time if the ProcessCpuTime method is available
-        mBean.getAttribute(name, "ProcessCpuTime").asInstanceOf[Long]
-      } catch {
-        case NonFatal(_) => -1L
-      }
-    }
-  })
-
   // Expose executor task metrics using the Dropwizard metrics system.
   // The list is taken from TaskMetrics.scala
   val METRIC_CPU_TIME = metricRegistry.counter(MetricRegistry.name("cpuTime"))
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/JVMCPU.scala b/core/src/main/scala/org/apache/spark/metrics/source/JVMCPU.scala
new file mode 100644
index 0000000..6ea86b8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/source/JVMCPU.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.source
+
+import java.lang.management.ManagementFactory
+
+import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
+import javax.management.{MBeanServer, ObjectName}
+import scala.util.control.NonFatal
+
+private[spark] class JVMCPUSource extends Source {
+
+  override val metricRegistry = new MetricRegistry()
+  override val sourceName = "JVMCPU"
+
+  // Dropwizard/Codahale metrics gauge measuring the JVM process CPU time.
+  // This Gauge will try to get and return the JVM Process CPU time or return -1 otherwise.
+  // The CPU time value is returned in nanoseconds.
+  // It will use proprietary extensions such as com.sun.management.OperatingSystemMXBean or
+  // com.ibm.lang.management.OperatingSystemMXBean, if available.
+  metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] {
+    val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer
+    val name = new ObjectName("java.lang", "type", "OperatingSystem")
+    override def getValue: Long = {
+      try {
+        // return JVM process CPU time if the ProcessCpuTime method is available
+        mBean.getAttribute(name, "ProcessCpuTime").asInstanceOf[Long]
+      } catch {
+        case NonFatal(_) => -1L
+      }
+    }
+  })
+}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index a92dd6f..72e4f47 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -813,6 +813,9 @@ This is the component with the largest amount of instrumented metrics
   - states-rowsTotal
   - states-usedBytes
 
+- namespace=JVMCPU
+  - jvmCpuTime
+
 ### Component instance = Executor
 These metrics are exposed by Spark executors. Note, currently they are not available
 when running in local mode.
@@ -834,7 +837,6 @@ when running in local mode.
   - filesystem.hdfs.read_ops
   - filesystem.hdfs.write_bytes
   - filesystem.hdfs.write_ops
-  - jvmCpuTime
   - jvmGCTime.count
   - memoryBytesSpilled.count
   - recordsRead.count
@@ -858,6 +860,9 @@ when running in local mode.
   - threadpool.currentPool_size
   - threadpool.maxPool_size
 
+- namespace=JVMCPU
+  - jvmCpuTime
+
 - namespace=NettyBlockTransfer
   - shuffle-client.usedDirectMemory
   - shuffle-client.usedHeapMemory


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org