You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/05/05 11:55:25 UTC
[kafka] branch trunk updated: KAFKA-14926: Remove metrics on Log Cleaner shutdown (#13623)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e34f8840315 KAFKA-14926: Remove metrics on Log Cleaner shutdown (#13623)
e34f8840315 is described below
commit e34f88403159cc8381da23dafdf7e3d7403114a2
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Fri May 5 13:55:17 2023 +0200
KAFKA-14926: Remove metrics on Log Cleaner shutdown (#13623)
When Log cleaning is shutdown, it doesn't remove metrics that were registered to `KafkaYammerMetrics.defaultRegistry()` which has one instance per server. Log cleaner's lifecycle is associated with lifecycle of `LogManager` and hence, there is no possibility where log cleaner will be shutdown but the broker won't. Broker shutdown will close the `jmxReporter` and hence, there is no current metric leak here. The motivation for this code change is to "do the right thing" from a code hygi [...]
Reviewers: Manyanda Chitimbo <ma...@gmail.com>, Kirk True <ki...@mustardgrain.com>, David Jacot <dj...@confluent.io>
---
core/src/main/scala/kafka/log/LogCleaner.scala | 43 ++++++++++++++++------
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 30 +++++++++++++++
2 files changed, 61 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 25511976e31..b8b991aeabe 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -22,6 +22,7 @@ import java.nio._
import java.util.Date
import java.util.concurrent.TimeUnit
import kafka.common._
+import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.utils._
import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -124,29 +125,26 @@ class LogCleaner(initialConfig: CleanerConfig,
private def maxOverCleanerThreads(f: CleanerThread => Double): Int =
cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))).toInt
-
/* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
- metricsGroup.newGauge("max-buffer-utilization-percent",
+ metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
() => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100)
/* a metric to track the recopy rate of each thread's last cleaning */
- metricsGroup.newGauge("cleaner-recopy-percent", () => {
+ metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => {
val stats = cleaners.map(_.lastStats)
val recopyRate = stats.iterator.map(_.bytesWritten).sum.toDouble / math.max(stats.iterator.map(_.bytesRead).sum, 1)
(100 * recopyRate).toInt
})
/* a metric to track the maximum cleaning time for the last cleaning from each thread */
- metricsGroup.newGauge("max-clean-time-secs",
- () => maxOverCleanerThreads(_.lastStats.elapsedSecs))
-
+ metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs))
// a metric to track delay between the time when a log is required to be compacted
// as determined by max compaction lag and the time of last cleaner run.
- metricsGroup.newGauge("max-compaction-delay-secs",
+ metricsGroup.newGauge(MaxCompactionDelayMetricsName,
() => maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000)
- metricsGroup.newGauge("DeadThreadCount", () => deadThreadCount)
+ metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount)
private[log] def deadThreadCount: Int = cleaners.count(_.isThreadFailed)
@@ -167,8 +165,16 @@ class LogCleaner(initialConfig: CleanerConfig,
*/
def shutdown(): Unit = {
info("Shutting down the log cleaner.")
- cleaners.foreach(_.shutdown())
- cleaners.clear()
+ try {
+ cleaners.foreach(_.shutdown())
+ cleaners.clear()
+ } finally {
+ removeMetrics()
+ }
+ }
+
+ def removeMetrics(): Unit = {
+ LogCleaner.MetricNames.foreach(metricsGroup.removeMetric)
}
override def reconfigurableConfigs: Set[String] = {
@@ -189,14 +195,14 @@ class LogCleaner(initialConfig: CleanerConfig,
/**
* Reconfigure log clean config. The will:
- * 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary
+ * 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary
* 2. stop current log cleaners and create new ones.
* That ensures that if any of the cleaners had failed, new cleaners are created to match the new config.
*/
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
config = LogCleaner.cleanerConfig(newConfig)
- val maxIoBytesPerSecond = config.maxIoBytesPerSecond;
+ val maxIoBytesPerSecond = config.maxIoBytesPerSecond
if (maxIoBytesPerSecond != oldConfig.logCleanerIoMaxBytesPerSecond) {
info(s"Updating logCleanerIoMaxBytesPerSecond: $maxIoBytesPerSecond")
throttler.updateDesiredRatePerSec(maxIoBytesPerSecond)
@@ -466,6 +472,19 @@ object LogCleaner {
config.logCleanerEnable)
}
+
+ private val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent"
+ private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
+ private val MaxCleanTimeMetricName = "max-clean-time-secs"
+ private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
+ private val DeadThreadCountMetricName = "DeadThreadCount"
+ // package private for testing
+ private[log] val MetricNames = Set(
+ MaxBufferUtilizationPercentMetricName,
+ CleanerRecopyPercentMetricName,
+ MaxCleanTimeMetricName,
+ MaxCompactionDelayMetricsName,
+ DeadThreadCountMetricName)
}
/**
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index fdfa61c10e8..6a1f0f1b911 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -25,9 +25,12 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
+import org.mockito.ArgumentMatchers.{any, anyString}
+import org.mockito.Mockito.{mockConstruction, times, verify, verifyNoMoreInteractions}
import java.io.{File, RandomAccessFile}
import java.nio._
@@ -62,6 +65,33 @@ class LogCleanerTest {
Utils.delete(tmpdir)
}
+ @Test
+ def testRemoveMetricsOnClose(): Unit = {
+ val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+ try {
+ val logCleaner = new LogCleaner(new CleanerConfig(true),
+ logDirs = Array(TestUtils.tempDir()),
+ logs = new Pool[TopicPartition, UnifiedLog](),
+ logDirFailureChannel = new LogDirFailureChannel(1),
+ time = time)
+
+ // shutdown logCleaner so that metrics are removed
+ logCleaner.shutdown()
+
+ val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+ val numMetricsRegistered = LogCleaner.MetricNames.size
+ verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())
+
+ // verify that each metric is removed
+ LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
+
+ // assert that we have verified all invocations on
+ verifyNoMoreInteractions(mockMetricsGroup)
+ } finally {
+ mockMetricsGroupCtor.close()
+ }
+ }
+
/**
* Test simple log cleaning
*/