You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/06/26 09:12:49 UTC
spark git commit: [SPARK-8344] Add message processing time metric to
DAGScheduler
Repository: spark
Updated Branches:
refs/heads/master 1a79f0eb8 -> 9fed6abfd
[SPARK-8344] Add message processing time metric to DAGScheduler
This commit adds a new metric, `messageProcessingTime`, to the DAGScheduler metrics source. This metrics tracks the time taken to process messages in the scheduler's event processing loop, which is a helpful debugging aid for diagnosing performance issues in the scheduler (such as SPARK-4961).
In order to do this, I moved the creation of the DAGSchedulerSource metrics source into DAGScheduler itself, similar to how MasterSource is created and registered in Master.
Author: Josh Rosen <jo...@databricks.com>
Closes #7002 from JoshRosen/SPARK-8344 and squashes the following commits:
57f914b [Josh Rosen] Fix import ordering
7d6bb83 [Josh Rosen] Add message processing time metrics to DAGScheduler
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fed6abf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fed6abf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fed6abf
Branch: refs/heads/master
Commit: 9fed6abfdcb7afcf92be56e5ccbed6599fe66bc4
Parents: 1a79f0e
Author: Josh Rosen <jo...@databricks.com>
Authored: Fri Jun 26 00:12:05 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Fri Jun 26 00:12:05 2015 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 1 -
.../org/apache/spark/scheduler/DAGScheduler.scala | 18 ++++++++++++++++--
.../spark/scheduler/DAGSchedulerSource.scala | 8 ++++++--
3 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9fed6abf/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 141276a..c7a7436 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -545,7 +545,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Post init
_taskScheduler.postStartHook()
- _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
http://git-wip-us.apache.org/repos/asf/spark/blob/9fed6abf/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index aea6674..b00a5fe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -81,6 +81,8 @@ class DAGScheduler(
def this(sc: SparkContext) = this(sc, sc.taskScheduler)
+ private[scheduler] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
+
private[scheduler] val nextJobId = new AtomicInteger(0)
private[scheduler] def numTotalJobs: Int = nextJobId.get()
private val nextStageId = new AtomicInteger(0)
@@ -1438,17 +1440,29 @@ class DAGScheduler(
taskScheduler.stop()
}
- // Start the event thread at the end of the constructor
+ // Start the event thread and register the metrics source at the end of the constructor
+ env.metricsSystem.registerSource(metricsSource)
eventProcessLoop.start()
}
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
+ private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer
+
/**
* The main event loop of the DAG scheduler.
*/
- override def onReceive(event: DAGSchedulerEvent): Unit = event match {
+ override def onReceive(event: DAGSchedulerEvent): Unit = {
+ val timerContext = timer.time()
+ try {
+ doOnReceive(event)
+ } finally {
+ timerContext.stop()
+ }
+ }
+
+ private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)
http://git-wip-us.apache.org/repos/asf/spark/blob/9fed6abf/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 02c6707..6b667d5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -17,11 +17,11 @@
package org.apache.spark.scheduler
-import com.codahale.metrics.{Gauge, MetricRegistry}
+import com.codahale.metrics.{Gauge, MetricRegistry, Timer}
import org.apache.spark.metrics.source.Source
-private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
+private[scheduler] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "DAGScheduler"
@@ -45,4 +45,8 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})
+
+ /** Timer that tracks the time to process messages in the DAGScheduler's event loop */
+ val messageProcessingTimer: Timer =
+ metricRegistry.timer(MetricRegistry.name("messageProcessingTime"))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org