You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "divijvaidya (via GitHub)" <gi...@apache.org> on 2023/04/21 13:01:26 UTC

[GitHub] [kafka] divijvaidya opened a new pull request, #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

divijvaidya opened a new pull request, #13623:
URL: https://github.com/apache/kafka/pull/13623

   # Motivation
   When Log cleaning is shutdown, it doesn't remove metrics that were registered to `KafkaYammerMetrics.defaultRegistry()` which has one instance per server. Log cleaner's lifecycle is associated with lifecycle of `LogManager` and hence, there is no possibility where log cleaner will be shutdown but the broker won't. Broker shutdown will close the `jmxReporter` and hence, there is no current metric leak here.
   The motivation for this code change is to "do the right thing" from a code hygiene perspective.
   
   # Test
   Added a unit test that fails before the change and passes after the change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1183654795


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -466,6 +472,17 @@ object LogCleaner {
       config.logCleanerEnable)
 
   }
+
+  private val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent"
+  private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
+  private val MaxCleanTimeMetricName = "max-clean-time-secs"
+  private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
+  private val DeadThreadCountMetricName = "DeadThreadCount"
+  private[log] val MetricNames = Set(MaxBufferUtilizationPercentMetricName,

Review Comment:
   fixed in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13623:
URL: https://github.com/apache/kafka/pull/13623#issuecomment-1531493129

   @mimaison when you get a chance, please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1182721302


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig,
    */
   def shutdown(): Unit = {
     info("Shutting down the log cleaner.")
-    cleaners.foreach(_.shutdown())
-    cleaners.clear()
+    try {
+      cleaners.foreach(_.shutdown())
+      cleaners.clear()
+    } finally {
+      remoteMetrics()
+    }
+  }
+
+  def remoteMetrics(): Unit = {
+    metricsGroup.removeMetric("max-buffer-utilization-percent")
+    metricsGroup.removeMetric("cleaner-recopy-percent")
+    metricsGroup.removeMetric("max-clean-time-secs")
+    metricsGroup.removeMetric("max-compaction-delay-secs")
+    metricsGroup.removeMetric("DeadThreadCount")

Review Comment:
   yes, please feel free to create a JIRA.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1173855980


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig,
    */
   def shutdown(): Unit = {
     info("Shutting down the log cleaner.")
-    cleaners.foreach(_.shutdown())
-    cleaners.clear()
+    try {
+      cleaners.foreach(_.shutdown())
+      cleaners.clear()
+    } finally {
+      remoteMetrics()
+    }
+  }
+
+  def remoteMetrics(): Unit = {
+    metricsGroup.removeMetric("max-buffer-utilization-percent")
+    metricsGroup.removeMetric("cleaner-recopy-percent")
+    metricsGroup.removeMetric("max-clean-time-secs")
+    metricsGroup.removeMetric("max-compaction-delay-secs")
+    metricsGroup.removeMetric("DeadThreadCount")

Review Comment:
   Hi @divijvaidya 
   
   Just for my own understanding since I am a newbie: what happens when a metric is removed, concretely, what happens to the already registered value?
   
   I also have a nit suggestion of extracting the metric names onto const since they seem to be repeated, here and during metric declaration.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1182552980


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig,
    */
   def shutdown(): Unit = {
     info("Shutting down the log cleaner.")
-    cleaners.foreach(_.shutdown())
-    cleaners.clear()
+    try {
+      cleaners.foreach(_.shutdown())
+      cleaners.clear()
+    } finally {
+      remoteMetrics()
+    }
+  }
+
+  def remoteMetrics(): Unit = {
+    metricsGroup.removeMetric("max-buffer-utilization-percent")
+    metricsGroup.removeMetric("cleaner-recopy-percent")
+    metricsGroup.removeMetric("max-clean-time-secs")
+    metricsGroup.removeMetric("max-compaction-delay-secs")
+    metricsGroup.removeMetric("DeadThreadCount")

Review Comment:
   Good suggestion both. I have fixed this in recent commit.
   
   > Just for my own understanding since I am a newbie: what happens when a metric is removed, concretely, what happens to the already registered value?
   
   @machi1990, it's a longer conversation because the metrics story in Kafka is a bit complicated, perhaps for another time. The summary is that we use `Metrics-Core` (or rather, an ancient version of it). There is a central registry where it stores all metric names. the registry is initialised once per server. Every time, we "add"/"remove" a metric, we add/remove to that registry. In addition to this Kafka also supports exposing metrics through JMX MBeans. When we add/remove a metrics, we add/remove the registered beans to the JMX server.
   
   You can find the metrics-core concepts such as registry explained here: https://metrics.dropwizard.io/4.2.0/manual/core.html
   
   Other stuff where you can read about out metrics story: 
   1. https://cwiki.apache.org/confluence/display/KAFKA/KIP-510%3A+Metrics+library+upgrade
   2. Ticket to move core to new metrics library: https://issues.apache.org/jira/browse/KAFKA-1930
   3. Ticket to upgrade metrics-core to dropwizard: https://issues.apache.org/jira/browse/KAFKA-960
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1183264038


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -466,6 +472,17 @@ object LogCleaner {
       config.logCleanerEnable)
 
   }
+
+  private val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent"
+  private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
+  private val MaxCleanTimeMetricName = "max-clean-time-secs"
+  private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
+  private val DeadThreadCountMetricName = "DeadThreadCount"
+  private[log] val MetricNames = Set(MaxBufferUtilizationPercentMetricName,

Review Comment:
   nit 1: I suppose that we need the package private to access this from tests. We usually add a comment such as `// package private for testing` in this case.
   nit 2: Could we format the `Set` like `ReconfigurableConfigs` at L448?



##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -62,6 +65,39 @@ class LogCleanerTest {
     Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+    val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+    try {
+      val logCleaner = new LogCleaner(new CleanerConfig(true),
+        logDirs = Array(TestUtils.tempDir()),
+        logs = new Pool[TopicPartition, UnifiedLog](),
+        logDirFailureChannel = new LogDirFailureChannel(1),
+        time = time)
+
+      // shutdown logCleaner so that metrics are removed
+      logCleaner.shutdown()
+
+      val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+      val numMetricsRegistered = 5
+      verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())

Review Comment:
   nit: I wonder if we should also verify the expected names here like we did for `removeMetric`. What do you think?



##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -62,6 +65,39 @@ class LogCleanerTest {
     Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+    val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+    try {
+      val logCleaner = new LogCleaner(new CleanerConfig(true),
+        logDirs = Array(TestUtils.tempDir()),
+        logs = new Pool[TopicPartition, UnifiedLog](),
+        logDirFailureChannel = new LogDirFailureChannel(1),
+        time = time)
+
+      // shutdown logCleaner so that metrics are removed
+      logCleaner.shutdown()
+
+      val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+      val numMetricsRegistered = 5
+      verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())
+
+      // verify that all metrics are added to the list of metric name
+      assertEquals(LogCleaner.MetricNames.size, numMetricsRegistered,
+        "All metrics are not part of MetricNames collections")
+
+      // verify that each metric is removed
+      LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
+
+      // assert that we have verified all invocations on
+      verifyNoMoreInteractions(mockMetricsGroup)
+    } finally {
+      if (mockMetricsGroupCtor != null) {

Review Comment:
   Is this check needed? If we get to the try..catch, `mockMetricsGroupCtor` should be non-null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1183654517


##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -62,6 +65,39 @@ class LogCleanerTest {
     Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+    val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+    try {
+      val logCleaner = new LogCleaner(new CleanerConfig(true),
+        logDirs = Array(TestUtils.tempDir()),
+        logs = new Pool[TopicPartition, UnifiedLog](),
+        logDirFailureChannel = new LogDirFailureChannel(1),
+        time = time)
+
+      // shutdown logCleaner so that metrics are removed
+      logCleaner.shutdown()
+
+      val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+      val numMetricsRegistered = 5
+      verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())
+
+      // verify that all metrics are added to the list of metric name
+      assertEquals(LogCleaner.MetricNames.size, numMetricsRegistered,
+        "All metrics are not part of MetricNames collections")
+
+      // verify that each metric is removed
+      LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
+
+      // assert that we have verified all invocations on
+      verifyNoMoreInteractions(mockMetricsGroup)
+    } finally {
+      if (mockMetricsGroupCtor != null) {

Review Comment:
   Removed in latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac merged PR #13623:
URL: https://github.com/apache/kafka/pull/13623


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1184723565


##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -62,6 +65,39 @@ class LogCleanerTest {
     Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+    val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+    try {
+      val logCleaner = new LogCleaner(new CleanerConfig(true),
+        logDirs = Array(TestUtils.tempDir()),
+        logs = new Pool[TopicPartition, UnifiedLog](),
+        logDirFailureChannel = new LogDirFailureChannel(1),
+        time = time)
+
+      // shutdown logCleaner so that metrics are removed
+      logCleaner.shutdown()
+
+      val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+      val numMetricsRegistered = 5
+      verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())

Review Comment:
   That makes sense. Thanks for the clarification. I wonder if we could use `LogCleaner.MetricNames.size` instead of hardcoding `5`. I suppose that it would have a similar semantic, does it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1183658413


##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -62,6 +65,39 @@ class LogCleanerTest {
     Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+    val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+    try {
+      val logCleaner = new LogCleaner(new CleanerConfig(true),
+        logDirs = Array(TestUtils.tempDir()),
+        logs = new Pool[TopicPartition, UnifiedLog](),
+        logDirFailureChannel = new LogDirFailureChannel(1),
+        time = time)
+
+      // shutdown logCleaner so that metrics are removed
+      logCleaner.shutdown()
+
+      val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+      val numMetricsRegistered = 5
+      verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())

Review Comment:
   It is intentionally left as generic so that this can capture all `gauge` in this class. If someone adds a gauge and doesn't update this test, this line will fail. Hence, catching all generic strings makes this test even more secure. 
   
   We don't need to do something similar for remove because, we validate that the number of "added" is equal to the number of metrics names in the next statement. This ensures that all added metrics are in MetricNames and later we validate that all MetricNames are closed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1182715489


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -124,29 +124,37 @@ class LogCleaner(initialConfig: CleanerConfig,
   private def maxOverCleanerThreads(f: CleanerThread => Double): Int =
     cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))).toInt
 
+  private val maxBufferUtilizationPercentMetricName =  "max-buffer-utilization-percent"
+  private val cleanerRecopyPercentMetricName =  "cleaner-recopy-percent"
+  private val maxCleanTimeMetricName =  "max-clean-time-secs"
+  private val maxCompactionDelayMetricsName = "max-compaction-delay-secs"
+  private val deadThreadCountMetricName = "DeadThreadCount"
+  private val metricNames = Set.apply(maxBufferUtilizationPercentMetricName,

Review Comment:
   Thank you. This is excellent recommendation. As you can see I am not much familiar with Scala :) 
   
   The latest revision contains the changes that you have suggested.



##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -62,6 +65,33 @@ class LogCleanerTest {
     Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+    val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+    try {
+      val logCleaner = new LogCleaner(new CleanerConfig(true),
+        logDirs = Array(TestUtils.tempDir()),
+        logs = new Pool[TopicPartition, UnifiedLog](),
+        logDirFailureChannel = new LogDirFailureChannel(1),
+        time = time)
+
+      // shutdown logCleaner so that metrics are removed
+      logCleaner.shutdown()
+
+      val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+      val numMetricsRegistered = 5
+      verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())
+      verify(mockMetricsGroup, times(numMetricsRegistered)).removeMetric(anyString())

Review Comment:
   The latest revision contains the changes that you have suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1184786130


##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -62,6 +65,39 @@ class LogCleanerTest {
     Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+    val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+    try {
+      val logCleaner = new LogCleaner(new CleanerConfig(true),
+        logDirs = Array(TestUtils.tempDir()),
+        logs = new Pool[TopicPartition, UnifiedLog](),
+        logDirFailureChannel = new LogDirFailureChannel(1),
+        time = time)
+
+      // shutdown logCleaner so that metrics are removed
+      logCleaner.shutdown()
+
+      val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+      val numMetricsRegistered = 5
+      verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())

Review Comment:
   Your suggestion will work (for now) until all metrics in this class of type `gauge` but later if someone introduces a metric of another type and adds it to MetricNames, it will fail.
   
   Nevertheless, I have made the change as you suggested. The test could be changed later when someone adds a new metric.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1182595446


##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -62,6 +65,33 @@ class LogCleanerTest {
     Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+    val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+    try {
+      val logCleaner = new LogCleaner(new CleanerConfig(true),
+        logDirs = Array(TestUtils.tempDir()),
+        logs = new Pool[TopicPartition, UnifiedLog](),
+        logDirFailureChannel = new LogDirFailureChannel(1),
+        time = time)
+
+      // shutdown logCleaner so that metrics are removed
+      logCleaner.shutdown()
+
+      val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+      val numMetricsRegistered = 5
+      verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())
+      verify(mockMetricsGroup, times(numMetricsRegistered)).removeMetric(anyString())

Review Comment:
   nit: Should we actually verify that `removeMetric` was called for the metric name that we expected?



##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -124,29 +124,37 @@ class LogCleaner(initialConfig: CleanerConfig,
   private def maxOverCleanerThreads(f: CleanerThread => Double): Int =
     cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))).toInt
 
+  private val maxBufferUtilizationPercentMetricName =  "max-buffer-utilization-percent"
+  private val cleanerRecopyPercentMetricName =  "cleaner-recopy-percent"
+  private val maxCleanTimeMetricName =  "max-clean-time-secs"
+  private val maxCompactionDelayMetricsName = "max-compaction-delay-secs"
+  private val deadThreadCountMetricName = "DeadThreadCount"
+  private val metricNames = Set.apply(maxBufferUtilizationPercentMetricName,

Review Comment:
   Could we move those to the companion object? Moreover, as they are constants, they should start with a capital letter. There is also an extra space after `=` for some of them. You can also use `Set(..)` instead of `Set.apply()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1182536794


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig,
    */
   def shutdown(): Unit = {
     info("Shutting down the log cleaner.")
-    cleaners.foreach(_.shutdown())
-    cleaners.clear()
+    try {
+      cleaners.foreach(_.shutdown())
+      cleaners.clear()
+    } finally {
+      remoteMetrics()
+    }
+  }
+
+  def remoteMetrics(): Unit = {

Review Comment:
   Thanks for catching. This is fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1182592921


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig,
    */
   def shutdown(): Unit = {
     info("Shutting down the log cleaner.")
-    cleaners.foreach(_.shutdown())
-    cleaners.clear()
+    try {
+      cleaners.foreach(_.shutdown())
+      cleaners.clear()
+    } finally {
+      remoteMetrics()
+    }
+  }
+
+  def remoteMetrics(): Unit = {
+    metricsGroup.removeMetric("max-buffer-utilization-percent")
+    metricsGroup.removeMetric("cleaner-recopy-percent")
+    metricsGroup.removeMetric("max-clean-time-secs")
+    metricsGroup.removeMetric("max-compaction-delay-secs")
+    metricsGroup.removeMetric("DeadThreadCount")

Review Comment:
   Thanks @divijvaidya the change looks good to me. 
   Also thanks for the reply regarding my other question. 
   I noticed an occurrence of this in other classes that do not remove the metric during shutdown: e.g `ClientQuotaManager.scala` and some of its child classes. Is it something that you've observed as well? I could open a JIRA so that we track it and work on it in separate PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1182849704


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig,
    */
   def shutdown(): Unit = {
     info("Shutting down the log cleaner.")
-    cleaners.foreach(_.shutdown())
-    cleaners.clear()
+    try {
+      cleaners.foreach(_.shutdown())
+      cleaners.clear()
+    } finally {
+      remoteMetrics()
+    }
+  }
+
+  def remoteMetrics(): Unit = {
+    metricsGroup.removeMetric("max-buffer-utilization-percent")
+    metricsGroup.removeMetric("cleaner-recopy-percent")
+    metricsGroup.removeMetric("max-clean-time-secs")
+    metricsGroup.removeMetric("max-compaction-delay-secs")
+    metricsGroup.removeMetric("DeadThreadCount")

Review Comment:
   I've created https://issues.apache.org/jira/browse/KAFKA-14959, I'll assign it to myself and give it a stab at some point end this week or early next week. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1180722737


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig,
    */
   def shutdown(): Unit = {
     info("Shutting down the log cleaner.")
-    cleaners.foreach(_.shutdown())
-    cleaners.clear()
+    try {
+      cleaners.foreach(_.shutdown())
+      cleaners.clear()
+    } finally {
+      remoteMetrics()
+    }
+  }
+
+  def remoteMetrics(): Unit = {

Review Comment:
   Should this method name be `removeMetrics` (a `v` instead of a `t`)?



##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig,
    */
   def shutdown(): Unit = {
     info("Shutting down the log cleaner.")
-    cleaners.foreach(_.shutdown())
-    cleaners.clear()
+    try {
+      cleaners.foreach(_.shutdown())
+      cleaners.clear()
+    } finally {
+      remoteMetrics()
+    }
+  }
+
+  def remoteMetrics(): Unit = {
+    metricsGroup.removeMetric("max-buffer-utilization-percent")
+    metricsGroup.removeMetric("cleaner-recopy-percent")
+    metricsGroup.removeMetric("max-clean-time-secs")
+    metricsGroup.removeMetric("max-compaction-delay-secs")
+    metricsGroup.removeMetric("DeadThreadCount")

Review Comment:
   I had a similar thought. Calling them out together as constants could also remind future contributors that might add a metric that they need to remove the metric too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13623:
URL: https://github.com/apache/kafka/pull/13623#issuecomment-1536155781

   Thank you @dajac. Appreciate your time and patience on this PR. 🙏


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org