You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/15 18:41:38 UTC

[GitHub] [spark] mridulm commented on a change in pull request #31267: [SPARK-21195][CORE] MetricSystem should pick up dynamically registered metrics in sources

mridulm commented on a change in pull request #31267:
URL: https://github.com/apache/spark/pull/31267#discussion_r614299378



##########
File path: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
##########
@@ -108,6 +110,7 @@ private[spark] class MetricsSystem private (
     if (running) {
       sinks.foreach(_.stop)
       registry.removeMatching((_: String, _: Metric) => true)
+      sourcesWithListeners.keySet.foreach(removeSource)

Review comment:
       synchronize on `sourcesWithListeners`

##########
File path: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
##########
@@ -152,28 +155,25 @@ private[spark] class MetricsSystem private (
     } else { defaultName }
   }
 
-  def getSourcesByName(sourceName: String): Seq[Source] = sources.synchronized {
-    sources.filter(_.sourceName == sourceName).toSeq
+  def getSourcesByName(sourceName: String): Seq[Source] = sourcesWithListeners.synchronized {
+    sourcesWithListeners.keySet.filter(_.sourceName == sourceName).toSeq
   }
 
-  def registerSource(source: Source): Unit = {
-    sources.synchronized {
-      sources += source
-    }
+  def registerSource(source: Source): Unit = sourcesWithListeners.synchronized {
     try {
-      val regName = buildRegistryName(source)
-      registry.register(regName, source.metricRegistry)
+      val listener = new MetricsSystemListener(buildRegistryName(source))
+      source.metricRegistry.addListener(listener)
+      sourcesWithListeners += source -> listener
     } catch {

Review comment:
       Let us keep the scope of the lock narrow to match what was there earlier, unless there is a good reason to expand the critical section ?

##########
File path: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
##########
@@ -152,28 +155,25 @@ private[spark] class MetricsSystem private (
     } else { defaultName }
   }
 
-  def getSourcesByName(sourceName: String): Seq[Source] = sources.synchronized {
-    sources.filter(_.sourceName == sourceName).toSeq
+  def getSourcesByName(sourceName: String): Seq[Source] = sourcesWithListeners.synchronized {
+    sourcesWithListeners.keySet.filter(_.sourceName == sourceName).toSeq
   }
 
-  def registerSource(source: Source): Unit = {
-    sources.synchronized {
-      sources += source
-    }
+  def registerSource(source: Source): Unit = sourcesWithListeners.synchronized {
     try {
-      val regName = buildRegistryName(source)
-      registry.register(regName, source.metricRegistry)
+      val listener = new MetricsSystemListener(buildRegistryName(source))
+      source.metricRegistry.addListener(listener)
+      sourcesWithListeners += source -> listener
     } catch {
       case e: IllegalArgumentException => logInfo("Metrics already registered", e)
     }
   }
 
-  def removeSource(source: Source): Unit = {
-    sources.synchronized {
-      sources -= source
-    }
+  def removeSource(source: Source): Unit = sourcesWithListeners.synchronized {
     val regName = buildRegistryName(source)
     registry.removeMatching((name: String, _: Metric) => name.startsWith(regName))
+    sourcesWithListeners.get(source).foreach(source.metricRegistry.removeListener)
+    sourcesWithListeners.remove(source)

Review comment:
       ```suggestion
   sourcesWithListeners.remove(source).foreach(source.metricRegistry.removeListener)
   ```

##########
File path: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
##########
@@ -242,7 +277,14 @@ private[spark] object MetricsSystem {
   }
 
   def createMetricsSystem(instance: String, conf: SparkConf): MetricsSystem = {
-    new MetricsSystem(instance, conf)
+    new MetricsSystem(instance, conf, new MetricRegistry)
+  }
+
+  def createMetricsSystem(
+     instance: String,
+     conf: SparkConf,
+     registry: MetricRegistry): MetricsSystem = {
+    new MetricsSystem(instance, conf, registry)

Review comment:
       Rely on default registry param if unspecified ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org