You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/05/05 11:55:25 UTC

[kafka] branch trunk updated: KAFKA-14926: Remove metrics on Log Cleaner shutdown (#13623)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e34f8840315 KAFKA-14926: Remove metrics on Log Cleaner shutdown (#13623)
e34f8840315 is described below

commit e34f88403159cc8381da23dafdf7e3d7403114a2
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Fri May 5 13:55:17 2023 +0200

    KAFKA-14926: Remove metrics on Log Cleaner shutdown (#13623)
    
    When Log cleaning is shutdown, it doesn't remove metrics that were registered to `KafkaYammerMetrics.defaultRegistry()` which has one instance per server. Log cleaner's lifecycle is associated with lifecycle of `LogManager` and hence, there is no possibility where log cleaner will be shutdown but the broker won't. Broker shutdown will close the `jmxReporter` and hence, there is no current metric leak here. The motivation for this code change is to "do the right thing" from a code hygi [...]
    
    Reviewers: Manyanda Chitimbo <ma...@gmail.com>, Kirk True <ki...@mustardgrain.com>, David Jacot <dj...@confluent.io>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     | 43 ++++++++++++++++------
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 30 +++++++++++++++
 2 files changed, 61 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 25511976e31..b8b991aeabe 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -22,6 +22,7 @@ import java.nio._
 import java.util.Date
 import java.util.concurrent.TimeUnit
 import kafka.common._
+import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
 import kafka.server.{BrokerReconfigurable, KafkaConfig}
 import kafka.utils._
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -124,29 +125,26 @@ class LogCleaner(initialConfig: CleanerConfig,
   private def maxOverCleanerThreads(f: CleanerThread => Double): Int =
     cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))).toInt
 
-
   /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
-  metricsGroup.newGauge("max-buffer-utilization-percent",
+  metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
     () => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100)
 
   /* a metric to track the recopy rate of each thread's last cleaning */
-  metricsGroup.newGauge("cleaner-recopy-percent", () => {
+  metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => {
     val stats = cleaners.map(_.lastStats)
     val recopyRate = stats.iterator.map(_.bytesWritten).sum.toDouble / math.max(stats.iterator.map(_.bytesRead).sum, 1)
     (100 * recopyRate).toInt
   })
 
   /* a metric to track the maximum cleaning time for the last cleaning from each thread */
-  metricsGroup.newGauge("max-clean-time-secs",
-    () => maxOverCleanerThreads(_.lastStats.elapsedSecs))
-
+  metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs))
 
   // a metric to track delay between the time when a log is required to be compacted
   // as determined by max compaction lag and the time of last cleaner run.
-  metricsGroup.newGauge("max-compaction-delay-secs",
+  metricsGroup.newGauge(MaxCompactionDelayMetricsName,
     () => maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000)
 
-  metricsGroup.newGauge("DeadThreadCount", () => deadThreadCount)
+  metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount)
 
   private[log] def deadThreadCount: Int = cleaners.count(_.isThreadFailed)
 
@@ -167,8 +165,16 @@ class LogCleaner(initialConfig: CleanerConfig,
    */
   def shutdown(): Unit = {
     info("Shutting down the log cleaner.")
-    cleaners.foreach(_.shutdown())
-    cleaners.clear()
+    try {
+      cleaners.foreach(_.shutdown())
+      cleaners.clear()
+    } finally {
+      removeMetrics()
+    }
+  }
+
+  def removeMetrics(): Unit = {
+    LogCleaner.MetricNames.foreach(metricsGroup.removeMetric)
   }
 
   override def reconfigurableConfigs: Set[String] = {
@@ -189,14 +195,14 @@ class LogCleaner(initialConfig: CleanerConfig,
 
   /**
     * Reconfigure log clean config. The will:
-    * 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary 
+    * 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary
     * 2. stop current log cleaners and create new ones.
     * That ensures that if any of the cleaners had failed, new cleaners are created to match the new config.
     */
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
     config = LogCleaner.cleanerConfig(newConfig)
 
-    val maxIoBytesPerSecond = config.maxIoBytesPerSecond;
+    val maxIoBytesPerSecond = config.maxIoBytesPerSecond
     if (maxIoBytesPerSecond != oldConfig.logCleanerIoMaxBytesPerSecond) {
       info(s"Updating logCleanerIoMaxBytesPerSecond: $maxIoBytesPerSecond")
       throttler.updateDesiredRatePerSec(maxIoBytesPerSecond)
@@ -466,6 +472,19 @@ object LogCleaner {
       config.logCleanerEnable)
 
   }
+
+  private val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent"
+  private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
+  private val MaxCleanTimeMetricName = "max-clean-time-secs"
+  private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
+  private val DeadThreadCountMetricName = "DeadThreadCount"
+  // package private for testing
+  private[log] val MetricNames = Set(
+    MaxBufferUtilizationPercentMetricName,
+    CleanerRecopyPercentMetricName,
+    MaxCleanTimeMetricName,
+    MaxCompactionDelayMetricsName,
+    DeadThreadCountMetricName)
 }
 
 /**
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index fdfa61c10e8..6a1f0f1b911 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -25,9 +25,12 @@ import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
+import org.mockito.ArgumentMatchers.{any, anyString}
+import org.mockito.Mockito.{mockConstruction, times, verify, verifyNoMoreInteractions}
 
 import java.io.{File, RandomAccessFile}
 import java.nio._
@@ -62,6 +65,33 @@ class LogCleanerTest {
     Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+    val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+    try {
+      val logCleaner = new LogCleaner(new CleanerConfig(true),
+        logDirs = Array(TestUtils.tempDir()),
+        logs = new Pool[TopicPartition, UnifiedLog](),
+        logDirFailureChannel = new LogDirFailureChannel(1),
+        time = time)
+
+      // shutdown logCleaner so that metrics are removed
+      logCleaner.shutdown()
+
+      val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+      val numMetricsRegistered = LogCleaner.MetricNames.size
+      verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())
+      
+      // verify that each metric is removed
+      LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
+
+      // assert that we have verified all invocations on
+      verifyNoMoreInteractions(mockMetricsGroup)
+    } finally {
+      mockMetricsGroupCtor.close()
+    }
+  }
+
   /**
    * Test simple log cleaning
    */