You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/03/05 18:48:01 UTC
[spark] branch master updated: [SPARK-26928][CORE] Add driver CPU
Time to the metrics system
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 25d2850 [SPARK-26928][CORE] Add driver CPU Time to the metrics system
25d2850 is described below
commit 25d28506652625c86b569dffa99e732a191ee626
Author: Luca Canali <lu...@cern.ch>
AuthorDate: Tue Mar 5 10:47:39 2019 -0800
[SPARK-26928][CORE] Add driver CPU Time to the metrics system
## What changes were proposed in this pull request?
This proposes to add instrumentation for the driver's JVM CPU time via the Spark Dropwizard/Codahale metrics system. It follows directly from previous work SPARK-25228 and shares similar motivations: it is intended as an improvement to be used for Spark performance dashboards and monitoring tools/instrumentation.
Implementation details: this PR takes the code introduced in SPARK-25228 and moves it to a new separate Source JVMCPUSource, which is then used to register the jvmCpuTime gauge metric for both executor and driver.
The registration of the jvmCpuTime metric for the driver is conditional, a new configuration parameter `spark.metrics.cpu.time.driver.enabled` (proposed default: false) is introduced for this purpose.
## How was this patch tested?
Manually tested, using local mode and using YARN.
Closes #23838 from LucaCanali/addCPUTimeMetricDriver.
Authored-by: Luca Canali <lu...@cern.ch>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
.../main/scala/org/apache/spark/SparkContext.scala | 2 +
.../scala/org/apache/spark/executor/Executor.scala | 2 +
.../org/apache/spark/executor/ExecutorSource.scala | 18 --------
.../org/apache/spark/metrics/source/JVMCPU.scala | 48 ++++++++++++++++++++++
docs/monitoring.md | 7 +++-
5 files changed, 58 insertions(+), 19 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index dc0ea24..01d9792 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -48,6 +48,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests._
import org.apache.spark.internal.config.UI._
import org.apache.spark.io.CompressionCodec
+import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.rpc.RpcEndpointRef
@@ -568,6 +569,7 @@ class SparkContext(config: SparkConf) extends Logging {
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
+ _env.metricsSystem.registerSource(new JVMCPUSource())
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 6b23b26..740080e 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -38,6 +38,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
+import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
@@ -117,6 +118,7 @@ private[spark] class Executor(
if (!isLocal) {
env.blockManager.initialize(conf.getAppId)
env.metricsSystem.registerSource(executorSource)
+ env.metricsSystem.registerSource(new JVMCPUSource())
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index a826402..e852376 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -76,24 +76,6 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends
registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
}
- // Dropwizard metrics gauge measuring the executor's process CPU time.
- // This Gauge will try to get and return the JVM Process CPU time or return -1 otherwise.
- // The CPU time value is returned in nanoseconds.
- // It will use proprietary extensions such as com.sun.management.OperatingSystemMXBean or
- // com.ibm.lang.management.OperatingSystemMXBean, if available.
- metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] {
- val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer
- val name = new ObjectName("java.lang", "type", "OperatingSystem")
- override def getValue: Long = {
- try {
- // return JVM process CPU time if the ProcessCpuTime method is available
- mBean.getAttribute(name, "ProcessCpuTime").asInstanceOf[Long]
- } catch {
- case NonFatal(_) => -1L
- }
- }
- })
-
// Expose executor task metrics using the Dropwizard metrics system.
// The list is taken from TaskMetrics.scala
val METRIC_CPU_TIME = metricRegistry.counter(MetricRegistry.name("cpuTime"))
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/JVMCPU.scala b/core/src/main/scala/org/apache/spark/metrics/source/JVMCPU.scala
new file mode 100644
index 0000000..6ea86b8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/source/JVMCPU.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.source
+
+import java.lang.management.ManagementFactory
+
+import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
+import javax.management.{MBeanServer, ObjectName}
+import scala.util.control.NonFatal
+
+private[spark] class JVMCPUSource extends Source {
+
+ override val metricRegistry = new MetricRegistry()
+ override val sourceName = "JVMCPU"
+
+ // Dropwizard/Codahale metrics gauge measuring the JVM process CPU time.
+ // This Gauge will try to get and return the JVM Process CPU time or return -1 otherwise.
+ // The CPU time value is returned in nanoseconds.
+ // It will use proprietary extensions such as com.sun.management.OperatingSystemMXBean or
+ // com.ibm.lang.management.OperatingSystemMXBean, if available.
+ metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] {
+ val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer
+ val name = new ObjectName("java.lang", "type", "OperatingSystem")
+ override def getValue: Long = {
+ try {
+ // return JVM process CPU time if the ProcessCpuTime method is available
+ mBean.getAttribute(name, "ProcessCpuTime").asInstanceOf[Long]
+ } catch {
+ case NonFatal(_) => -1L
+ }
+ }
+ })
+}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index a92dd6f..72e4f47 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -813,6 +813,9 @@ This is the component with the largest amount of instrumented metrics
- states-rowsTotal
- states-usedBytes
+- namespace=JVMCPU
+ - jvmCpuTime
+
### Component instance = Executor
These metrics are exposed by Spark executors. Note, currently they are not available
when running in local mode.
@@ -834,7 +837,6 @@ when running in local mode.
- filesystem.hdfs.read_ops
- filesystem.hdfs.write_bytes
- filesystem.hdfs.write_ops
- - jvmCpuTime
- jvmGCTime.count
- memoryBytesSpilled.count
- recordsRead.count
@@ -858,6 +860,9 @@ when running in local mode.
- threadpool.currentPool_size
- threadpool.maxPool_size
+- namespace=JVMCPU
+ - jvmCpuTime
+
- namespace=NettyBlockTransfer
- shuffle-client.usedDirectMemory
- shuffle-client.usedHeapMemory
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org