You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/06/26 09:12:49 UTC

spark git commit: [SPARK-8344] Add message processing time metric to DAGScheduler

Repository: spark
Updated Branches:
  refs/heads/master 1a79f0eb8 -> 9fed6abfd


[SPARK-8344] Add message processing time metric to DAGScheduler

This commit adds a new metric, `messageProcessingTime`, to the DAGScheduler metrics source. This metrics tracks the time taken to process messages in the scheduler's event processing loop, which is a helpful debugging aid for diagnosing performance issues in the scheduler (such as SPARK-4961).

In order to do this, I moved the creation of the DAGSchedulerSource metrics source into DAGScheduler itself, similar to how MasterSource is created and registered in Master.

Author: Josh Rosen <jo...@databricks.com>

Closes #7002 from JoshRosen/SPARK-8344 and squashes the following commits:

57f914b [Josh Rosen] Fix import ordering
7d6bb83 [Josh Rosen] Add message processing time metrics to DAGScheduler


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fed6abf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fed6abf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fed6abf

Branch: refs/heads/master
Commit: 9fed6abfdcb7afcf92be56e5ccbed6599fe66bc4
Parents: 1a79f0e
Author: Josh Rosen <jo...@databricks.com>
Authored: Fri Jun 26 00:12:05 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Fri Jun 26 00:12:05 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala     |  1 -
 .../org/apache/spark/scheduler/DAGScheduler.scala | 18 ++++++++++++++++--
 .../spark/scheduler/DAGSchedulerSource.scala      |  8 ++++++--
 3 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9fed6abf/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 141276a..c7a7436 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -545,7 +545,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
 
     // Post init
     _taskScheduler.postStartHook()
-    _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
     _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
     _executorAllocationManager.foreach { e =>
       _env.metricsSystem.registerSource(e.executorAllocationManagerSource)

http://git-wip-us.apache.org/repos/asf/spark/blob/9fed6abf/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index aea6674..b00a5fe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -81,6 +81,8 @@ class DAGScheduler(
 
   def this(sc: SparkContext) = this(sc, sc.taskScheduler)
 
+  private[scheduler] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
+
   private[scheduler] val nextJobId = new AtomicInteger(0)
   private[scheduler] def numTotalJobs: Int = nextJobId.get()
   private val nextStageId = new AtomicInteger(0)
@@ -1438,17 +1440,29 @@ class DAGScheduler(
     taskScheduler.stop()
   }
 
-  // Start the event thread at the end of the constructor
+  // Start the event thread and register the metrics source at the end of the constructor
+  env.metricsSystem.registerSource(metricsSource)
   eventProcessLoop.start()
 }
 
 private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
   extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
 
+  private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer
+
   /**
    * The main event loop of the DAG scheduler.
    */
-  override def onReceive(event: DAGSchedulerEvent): Unit = event match {
+  override def onReceive(event: DAGSchedulerEvent): Unit = {
+    val timerContext = timer.time()
+    try {
+      doOnReceive(event)
+    } finally {
+      timerContext.stop()
+    }
+  }
+
+  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
     case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
       dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
         listener, properties)

http://git-wip-us.apache.org/repos/asf/spark/blob/9fed6abf/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 02c6707..6b667d5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.scheduler
 
-import com.codahale.metrics.{Gauge, MetricRegistry}
+import com.codahale.metrics.{Gauge, MetricRegistry, Timer}
 
 import org.apache.spark.metrics.source.Source
 
-private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
+private[scheduler] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
     extends Source {
   override val metricRegistry = new MetricRegistry()
   override val sourceName = "DAGScheduler"
@@ -45,4 +45,8 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
   metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.activeJobs.size
   })
+
+  /** Timer that tracks the time to process messages in the DAGScheduler's event loop */
+  val messageProcessingTimer: Timer =
+    metricRegistry.timer(MetricRegistry.name("messageProcessingTime"))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org