You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/07/08 12:41:58 UTC

[kafka] branch trunk updated: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically (#12296)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e85500bbbef KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically (#12296)
e85500bbbef is described below

commit e85500bbbef7c10ee6bd379ab1327e8abb858b6b
Author: Tomonari Yamashita <ty...@redhat.com>
AuthorDate: Fri Jul 8 21:41:47 2022 +0900

    KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically (#12296)
    
    log.cleaner.io.max.bytes.per.second cannot be changed dynamically using bin/kafka-configs.sh. Call updateDesiredRatePerSec() of Throttler with new log.cleaner.io.max.bytes.per.second value in reconfigure() of Log Cleaner to fix the issue.
    
    Reviewers: Tom Bentley <tb...@redhat.com>, Luke Chen <sh...@gmail.com>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     | 13 +++++-
 core/src/main/scala/kafka/utils/Throttler.scala    | 12 ++++--
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 32 ++++++++++++++-
 .../scala/unit/kafka/utils/ThrottlerTest.scala     | 46 ++++++++++++++++++++++
 4 files changed, 95 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index df20a4a36e9..bf55f10a400 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -104,7 +104,7 @@ class LogCleaner(initialConfig: CleanerConfig,
   private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, logDirFailureChannel)
 
   /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
-  private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond,
+  private[log] val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond,
                                         checkIntervalMs = 300,
                                         throttleDown = true,
                                         "cleaner-io",
@@ -186,11 +186,20 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-    * Reconfigure log clean config. This simply stops current log cleaners and creates new ones.
+    * Reconfigure log clean config. The will:
+    * 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary 
+    * 2. stop current log cleaners and create new ones.
     * That ensures that if any of the cleaners had failed, new cleaners are created to match the new config.
     */
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
     config = LogCleaner.cleanerConfig(newConfig)
+
+    val maxIoBytesPerSecond = config.maxIoBytesPerSecond;
+    if (maxIoBytesPerSecond != oldConfig.logCleanerIoMaxBytesPerSecond) {
+      info(s"Updating logCleanerIoMaxBytesPerSecond: $maxIoBytesPerSecond")
+      throttler.updateDesiredRatePerSec(maxIoBytesPerSecond)
+    }
+
     shutdown()
     startup()
   }
diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala
index cce6270cf02..a431db5f006 100644
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ b/core/src/main/scala/kafka/utils/Throttler.scala
@@ -36,7 +36,7 @@ import scala.math._
  * @param time: The time implementation to use
  */
 @threadsafe
-class Throttler(desiredRatePerSec: Double,
+class Throttler(@volatile var desiredRatePerSec: Double,
                 checkIntervalMs: Long = 100L,
                 throttleDown: Boolean = true,
                 metricName: String = "throttler",
@@ -52,6 +52,7 @@ class Throttler(desiredRatePerSec: Double,
   def maybeThrottle(observed: Double): Unit = {
     val msPerSec = TimeUnit.SECONDS.toMillis(1)
     val nsPerSec = TimeUnit.SECONDS.toNanos(1)
+    val currentDesiredRatePerSec = desiredRatePerSec;
 
     meter.mark(observed.toLong)
     lock synchronized {
@@ -62,14 +63,14 @@ class Throttler(desiredRatePerSec: Double,
       // we should take a little nap
       if (elapsedNs > checkIntervalNs && observedSoFar > 0) {
         val rateInSecs = (observedSoFar * nsPerSec) / elapsedNs
-        val needAdjustment = !(throttleDown ^ (rateInSecs > desiredRatePerSec))
+        val needAdjustment = !(throttleDown ^ (rateInSecs > currentDesiredRatePerSec))
         if (needAdjustment) {
           // solve for the amount of time to sleep to make us hit the desired rate
-          val desiredRateMs = desiredRatePerSec / msPerSec.toDouble
+          val desiredRateMs = currentDesiredRatePerSec / msPerSec.toDouble
           val elapsedMs = TimeUnit.NANOSECONDS.toMillis(elapsedNs)
           val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs)
           if (sleepTime > 0) {
-            trace("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, desiredRatePerSec, sleepTime))
+            trace("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, currentDesiredRatePerSec, sleepTime))
             time.sleep(sleepTime)
           }
         }
@@ -79,6 +80,9 @@ class Throttler(desiredRatePerSec: Double,
     }
   }
 
+  def updateDesiredRatePerSec(updatedDesiredRatePerSec: Double): Unit = {
+    desiredRatePerSec = updatedDesiredRatePerSec;
+  }
 }
 
 object Throttler {
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index b3a1a76f0d0..0dd8a2cfb04 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -23,9 +23,8 @@ import java.nio.charset.StandardCharsets
 import java.nio.file.Paths
 import java.util.Properties
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-
 import kafka.common._
-import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
+import kafka.server.{BrokerTopicStats, KafkaConfig, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.CorruptRecordException
@@ -1854,6 +1853,35 @@ class LogCleanerTest {
     } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+    val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    oldKafkaProps.setProperty(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "10000000")
+
+    val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time) {
+      // shutdown() and startup() are called in LogCleaner.reconfigure().
+      // Empty startup() and shutdown() to ensure that no unnecessary log cleaner threads remain after this test.
+      override def startup(): Unit = {}
+      override def shutdown(): Unit = {}
+    }
+
+    try {
+      assertEquals(10000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be initialized from initial `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+
+      val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+      newKafkaProps.setProperty(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "20000000")
+
+      logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps))
+
+      assertEquals(20000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be updated with new `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+    } finally {
+      logCleaner.shutdown()
+    }
+  }
 
   private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
     for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
diff --git a/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala b/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala
index 1591cbad900..80ebde4fcd7 100755
--- a/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala
@@ -58,4 +58,50 @@ class ThrottlerTest {
     val actualCountPerSec = 4 * desiredCountPerInterval * 1000 / elapsedTimeMs
     assertTrue(actualCountPerSec <= desiredCountPerSec)
   }
+
+  @Test
+  def testUpdateThrottleDesiredRate(): Unit = {
+    val throttleCheckIntervalMs = 100
+    val desiredCountPerSec = 1000.0
+    val desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0
+    val updatedDesiredCountPerSec = 1500.0;
+    val updatedDesiredCountPerInterval = updatedDesiredCountPerSec * throttleCheckIntervalMs / 1000.0
+
+    val mockTime = new MockTime()
+    val throttler = new Throttler(desiredRatePerSec = desiredCountPerSec,
+      checkIntervalMs = throttleCheckIntervalMs,
+      time = mockTime)
+
+    // Observe desiredCountPerInterval at t1
+    val t1 = mockTime.milliseconds()
+    throttler.maybeThrottle(desiredCountPerInterval)
+    assertEquals(t1, mockTime.milliseconds())
+
+    // Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
+    mockTime.sleep(throttleCheckIntervalMs + 1)
+    throttler.maybeThrottle(desiredCountPerInterval)
+    val t2 = mockTime.milliseconds()
+    assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs)
+
+    val elapsedTimeMs = t2 - t1
+    val actualCountPerSec = 2 * desiredCountPerInterval * 1000 / elapsedTimeMs
+    assertTrue(actualCountPerSec <= desiredCountPerSec)
+
+    // Update ThrottleDesiredRate
+    throttler.updateDesiredRatePerSec(updatedDesiredCountPerSec);
+
+    // Observe updatedDesiredCountPerInterval at t2
+    throttler.maybeThrottle(updatedDesiredCountPerInterval)
+    assertEquals(t2, mockTime.milliseconds())
+
+    // Observe updatedDesiredCountPerInterval at t2 + throttleCheckIntervalMs + 1
+    mockTime.sleep(throttleCheckIntervalMs + 1)
+    throttler.maybeThrottle(updatedDesiredCountPerInterval)
+    val t3 = mockTime.milliseconds()
+    assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs)
+
+    val updatedElapsedTimeMs = t3 - t2
+    val updatedActualCountPerSec = 2 * updatedDesiredCountPerInterval * 1000 / updatedElapsedTimeMs
+    assertTrue(updatedActualCountPerSec <= updatedDesiredCountPerSec)
+  }
 }