You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/09/10 21:32:25 UTC

[20/50] git commit: Adding sc name in metrics source

Adding sc name in metrics source


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b4e382c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b4e382c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b4e382c2

Branch: refs/remotes/origin/branch-0.8
Commit: b4e382c210b4987da78421f5de11199e4d74f0e7
Parents: 8026537
Author: Patrick Wendell <pw...@gmail.com>
Authored: Sun Sep 8 16:06:49 2013 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Sep 8 16:06:49 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkContext.scala        | 4 ++--
 core/src/main/scala/org/apache/spark/executor/Executor.scala   | 2 +-
 .../main/scala/org/apache/spark/executor/ExecutorSource.scala  | 5 +++--
 .../scala/org/apache/spark/scheduler/DAGSchedulerSource.scala  | 6 ++++--
 .../scala/org/apache/spark/storage/BlockManagerSource.scala    | 6 ++++--
 5 files changed, 14 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4e382c2/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8931871..4f711a5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -282,8 +282,8 @@ class SparkContext(
   // Post init
   taskScheduler.postStartHook()
 
-  val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
-  val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
+  val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
+  val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
 
   def initDriverMetrics() {
     SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4e382c2/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d365804..ceae3b8 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -98,7 +98,7 @@ private[spark] class Executor(
     }
   )
 
-  val executorSource = new ExecutorSource(this)
+  val executorSource = new ExecutorSource(this, executorId)
 
   // Initialize Spark environment (using system properties read above)
   val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4e382c2/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index bf8fb4f..18c9dc1 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -27,7 +27,7 @@ import scala.collection.JavaConversions._
 
 import org.apache.spark.metrics.source.Source
 
-class ExecutorSource(val executor: Executor) extends Source {
+class ExecutorSource(val executor: Executor, executorId: String) extends Source {
   private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
     FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption
 
@@ -39,7 +39,8 @@ class ExecutorSource(val executor: Executor) extends Source {
   }
 
   val metricRegistry = new MetricRegistry()
-  val sourceName = "executor"
+  // TODO: It would be nice to pass the application name here
+  val sourceName = "executor.%s".format(executorId)
 
   // Gauge for executor thread pool's actively executing task counts
   metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4e382c2/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 22e3723..446d490 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -20,10 +20,12 @@ package org.apache.spark.scheduler
 import com.codahale.metrics.{Gauge,MetricRegistry}
 
 import org.apache.spark.metrics.source.Source
+import org.apache.spark.SparkContext
 
-private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
+private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
+    extends Source {
   val metricRegistry = new MetricRegistry()
-  val sourceName = "DAGScheduler"
+  val sourceName = "%s.DAGScheduler".format(sc.appName)
 
   metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.failed.size

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4e382c2/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 3d709cf..acc3951 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -20,11 +20,13 @@ package org.apache.spark.storage
 import com.codahale.metrics.{Gauge,MetricRegistry}
 
 import org.apache.spark.metrics.source.Source
+import org.apache.spark.SparkContext
 
 
-private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
+private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext)
+    extends Source {
   val metricRegistry = new MetricRegistry()
-  val sourceName = "BlockManager"
+  val sourceName = "%s.BlockManager".format(sc.appName)
 
   metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
     override def getValue: Long = {