You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/01 06:48:03 UTC

[GitHub] [kafka] showuon commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

showuon commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r911661767


##########
core/src/main/scala/kafka/utils/Throttler.scala:
##########
@@ -36,7 +36,7 @@ import scala.math._
  * @param time: The time implementation to use
  */
 @threadsafe
-class Throttler(desiredRatePerSec: Double,
+class Throttler(var desiredRatePerSec: Double,

Review Comment:
   If the `desiredRatePerSec` can be changed by other threads, should we `volatile` it?



##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -191,6 +191,13 @@ class LogCleaner(initialConfig: CleanerConfig,
     */
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
     config = LogCleaner.cleanerConfig(newConfig)
+
+    val maxIoBytesPerSecond = config.maxIoBytesPerSecond;
+    if (maxIoBytesPerSecond != oldConfig.logCleanerIoMaxBytesPerSecond) {
+      info(s"Updating logCleanerIoMaxBytesPerSecond: $maxIoBytesPerSecond")
+      throttler.updateDesiredRatePerSec(maxIoBytesPerSecond)
+    }

Review Comment:
   Could we also add a test for `LogCleaner` to make sure after `reconfigure`, the throttler will get the updated config?



##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -191,6 +191,13 @@ class LogCleaner(initialConfig: CleanerConfig,
     */
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {

Review Comment:
   The javadoc should also update about updating the throttler.



##########
core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala:
##########
@@ -58,4 +58,50 @@ class ThrottlerTest {
     val actualCountPerSec = 4 * desiredCountPerInterval * 1000 / elapsedTimeMs
     assertTrue(actualCountPerSec <= desiredCountPerSec)
   }
+
+  @Test
+  def testUpdateThrottleDesiredRate(): Unit = {
+    val throttleCheckIntervalMs = 100
+    val desiredCountPerSec = 1000.0
+    val desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0
+    val updatedDesiredCountPerSec = 1500.0;
+    val updatedDesiredCountPerInterval = updatedDesiredCountPerSec * throttleCheckIntervalMs / 1000.0
+
+    val mockTime = new MockTime()
+    val throttler = new Throttler(desiredRatePerSec = desiredCountPerSec,
+      checkIntervalMs = throttleCheckIntervalMs,
+      time = mockTime)
+
+    // Observe desiredCountPerInterval at t1
+    val t1 = mockTime.milliseconds()
+    throttler.maybeThrottle(desiredCountPerInterval)
+    assertEquals(t1, mockTime.milliseconds())
+
+    // Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
+    mockTime.sleep(throttleCheckIntervalMs + 1)
+    throttler.maybeThrottle(desiredCountPerInterval)
+    val t2 = mockTime.milliseconds()
+    assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs)
+
+    val elapsedTimeMs = t2 - t1
+    val actualCountPerSec = 2 * desiredCountPerInterval * 1000 / elapsedTimeMs
+    assertTrue(actualCountPerSec <= desiredCountPerSec)
+
+    // Update ThrottleDesiredRate
+    throttler.updateDesiredRatePerSec(updatedDesiredCountPerSec);
+
+    // Observe desiredCountPerInterval at t2
+    throttler.maybeThrottle(updatedDesiredCountPerInterval)

Review Comment:
   the comment should be: `updatedDesiredCountPerInterval`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org