You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/15 14:13:56 UTC

[GitHub] [kafka] tyamashi-oss opened a new pull request, #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

tyamashi-oss opened a new pull request, #12296:
URL: https://github.com/apache/kafka/pull/12296

   - Implementation:
     - Add updateDesiredRatePerSec() on Throttler
     - Call updateDesiredRatePerSec() of Throttler with new log.cleaner.io.max.bytes.per.second value in reconfigure() of Log Cleaner
     - I implemented the feature to be similar to [reconfigure() of SocketServer](https://github.com/apache/kafka/blob/fa59be4e770627cd34cef85986b58ad7f606928d/core/src/main/scala/kafka/network/SocketServer.scala#L336-L357)
   
   - Alternative implementation considered:
     - re-instantiate Throttler with new log.cleaner.io.max.bytes.per.second value in reconfigure() of Log Cleaner 
       - However, since many parameter specifications are required to instantiate Throttler, I chose to be similar to SocketServer and update only log.cleaner.io.max.bytes.per.second
   
   - Test:
     - Added unit test in case of updating DesiredRatePerSec of Throttler
     - I confirmed by manual testing that log.cleaner.io.max.bytes.per.second can be changed using bin/kafka-configs.sh:
         >   [2022-06-15 22:44:03,089] INFO [kafka-log-cleaner-thread-0]:
           Log cleaner thread 0 cleaned log my-topic-0 (dirty section = [57585, 86901])
           2,799.3 MB of log processed in 596.0 seconds (4.7 MB/sec).
           Indexed 2,799.2 MB in 298.1 seconds (9.4 Mb/sec, 50.0% of total time)
           Buffer utilization: 0.0%
           Cleaned 2,799.3 MB in 298.0 seconds (9.4 Mb/sec, 50.0% of total time)
           Start size: 2,799.3 MB (29,317 messages)
           End size: 0.1 MB (1 messages)
           100.0% size reduction (100.0% fewer messages)
    (kafka.log.LogCleaner)`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tyamashi-oss commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
tyamashi-oss commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r914719266


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -186,11 +186,18 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-    * Reconfigure log clean config. This simply stops current log cleaners and creates new ones.
+    * Reconfigure log clean config. This updates desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond and stops current log cleaners and creates new ones.

Review Comment:
   Thank you. I’ve update the comment. 
   https://github.com/apache/kafka/pull/12296/commits/f9d27f6181c3fbc1ef02d56904d58793962dedb2



##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1854,6 +1853,26 @@ class LogCleanerTest {
     } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+    val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+
+    val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time)
+
+    assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")
+
+    val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 20000000)
+
+    logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps))

Review Comment:
   Your concern is correct. Thank you.
   LogCleaner.shutdown() should be called at the end of the test because kafka-log-cleaner-thread-x threads are created in LogCleaner.startup() at the end of LogCleaner.reconfigure(), and the threads continue to remain. 
   I appended LogCleaner.shutdown() to the end of the test and also used LogCleaner with empty startup() and shutdown() implementations.
   The test is somewhat more white-box like according to the LogCleaner.reconfigure() implementation, but I couldn't think of any other way. Please let me know if you have any.
   https://github.com/apache/kafka/pull/12296/commits/e05707d5613d096df06c4e96085deb481b00a7e7
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r913815025


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -186,11 +186,18 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-    * Reconfigure log clean config. This simply stops current log cleaners and creates new ones.
+    * Reconfigure log clean config. This updates desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond and stops current log cleaners and creates new ones.

Review Comment:
   There are 2 `and` in the sentence now. Maybe we can put it like this to make it clear:
   ```
   It will (1) updates desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond if necessary (2) stops current log cleaners and creates new ones.
   ```
   WDYT?



##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1854,6 +1853,26 @@ class LogCleanerTest {
     } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+    val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+
+    val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time)
+
+    assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")
+
+    val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 20000000)
+
+    logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps))
+
+    assertEquals(logCleaner.throttler.desiredRatePerSec, 20000000, "Throttler.desiredRatePerSec should be updated with new KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")

Review Comment:
   The error message might be able to update to:
   ```
   Throttler.desiredRatePerSec should be updated with new `cleaner.io.max.bytes.per.second` config.
   ```



##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1854,6 +1853,26 @@ class LogCleanerTest {
     } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+    val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+
+    val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time)
+
+    assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")

Review Comment:
   nit: the 1st parameter of `assertEquals` should be expected value, and 2nd one is actual value. Same comment applied to below assertion.



##########
core/src/main/scala/kafka/utils/Throttler.scala:
##########
@@ -36,7 +36,7 @@ import scala.math._
  * @param time: The time implementation to use
  */
 @threadsafe
-class Throttler(desiredRatePerSec: Double,
+class Throttler(var desiredRatePerSec: Double,

Review Comment:
   Thanks for pointing it out. Yes, it sleeps inside the lock. At first, I thought it is a but, but after further thought, I think it is intended because there might be multiple threads try to call `maybeThrottle` method. We'd like to calculate the throttle time sequentially (I guess). This code is written since 2011, so I think we just keep it as is. 
   In that case, I agree `volatile` is a better solution. Otherwise, the reconfigure change might not be able to take effect immediately.
   
   @tombentley , WDYT?



##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1854,6 +1853,26 @@ class LogCleanerTest {
     } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+    val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+
+    val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time)
+
+    assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")

Review Comment:
   Also, the error message might be able to update to:
   ```
   Throttler.desiredRatePerSec should be initialized from initial `cleaner.io.max.bytes.per.second` config.
   ```



##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -191,6 +191,13 @@ class LogCleaner(initialConfig: CleanerConfig,
     */
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
     config = LogCleaner.cleanerConfig(newConfig)
+
+    val maxIoBytesPerSecond = config.maxIoBytesPerSecond;
+    if (maxIoBytesPerSecond != oldConfig.logCleanerIoMaxBytesPerSecond) {
+      info(s"Updating logCleanerIoMaxBytesPerSecond: $maxIoBytesPerSecond")
+      throttler.updateDesiredRatePerSec(maxIoBytesPerSecond)
+    }

Review Comment:
   Test is good. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tyamashi-oss commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
tyamashi-oss commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r913695502


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -191,6 +191,13 @@ class LogCleaner(initialConfig: CleanerConfig,
     */
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {

Review Comment:
   Thank you. Java doc have been updated about updating the throttler.desiredRatePerSec. https://github.com/apache/kafka/pull/12296/commits/04264960fe7300e02dd48d9f7aee66390754a417



##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -191,6 +191,13 @@ class LogCleaner(initialConfig: CleanerConfig,
     */
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
     config = LogCleaner.cleanerConfig(newConfig)
+
+    val maxIoBytesPerSecond = config.maxIoBytesPerSecond;
+    if (maxIoBytesPerSecond != oldConfig.logCleanerIoMaxBytesPerSecond) {
+      info(s"Updating logCleanerIoMaxBytesPerSecond: $maxIoBytesPerSecond")
+      throttler.updateDesiredRatePerSec(maxIoBytesPerSecond)
+    }

Review Comment:
   I’ve added a test for that. I could not find reconfigure test that would be a good reference that I could imitate, so this is a test in my own way.
   https://github.com/apache/kafka/pull/12296/commits/1a574d67ad1685ce776bb45896fe6dd72992dbef



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tyamashi-oss commented on pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
tyamashi-oss commented on PR #12296:
URL: https://github.com/apache/kafka/pull/12296#issuecomment-1178986386

   @showuon @tombentley @Kvicii , thank you for your guidance. It was very helpful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r911774115


##########
core/src/main/scala/kafka/utils/Throttler.scala:
##########
@@ -36,7 +36,7 @@ import scala.math._
  * @param time: The time implementation to use
  */
 @threadsafe
-class Throttler(desiredRatePerSec: Double,
+class Throttler(var desiredRatePerSec: Double,

Review Comment:
    @tombentley Good point! +1. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tyamashi-oss commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
tyamashi-oss commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r915753889


##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1854,6 +1853,33 @@ class LogCleanerTest {
     } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+    val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+
+    val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time) {
+      // shutdown() and startup() are called in LogCleaner.reconfigure().
+      // Empty startup() and shutdown() to ensure that no unnecessary log cleaner threads remain after this test.
+      override def startup(): Unit = {}
+      override def shutdown(): Unit = {}
+    }
+
+    try {
+      assertEquals(10000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be initialized from initial `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+
+      val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+      newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 20000000)
+
+      logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps))
+
+      assertEquals(20000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be updated with new `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+    } finally logCleaner.shutdown();

Review Comment:
   I've fixed it. Thank you for pointing out. https://github.com/apache/kafka/pull/12296/commits/5c727409cc03a8b459f6d30b8df888489f52d743
   With this fix, the java8/scala2.12 build now succeeds.
   
   However, the `JDK 11 / Scala 2.13` build, which used to succeed, now failed. 
   I believe this new build failure is unrelated to my changes. I tried running the same command in my local environment once and this build was successful.
   ```
   [2022-07-07T06:22:53.589Z] > Task :streams:integrationTest FAILED
   
   [2022-07-07T06:22:53.589Z] 
   
   [2022-07-07T06:22:53.589Z] FAILURE: Build failed with an exception.
   ```
   https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12296/7/pipeline/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tyamashi-oss commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
tyamashi-oss commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r913698340


##########
core/src/main/scala/kafka/utils/Throttler.scala:
##########
@@ -36,7 +36,7 @@ import scala.math._
  * @param time: The time implementation to use
  */
 @threadsafe
-class Throttler(desiredRatePerSec: Double,
+class Throttler(var desiredRatePerSec: Double,

Review Comment:
   Thanks for pointing out, @showuon and @tombentley.
   Indeed, it needs volatile or synchronized.
   
   I would like your opinion. Should it use volatile? or synchronized?
   I have changed the implementation once with volatile, so if you prefer synchronized, please let me know. https://github.com/apache/kafka/pull/12296/commits/9b2477dc8a38db7e41281a6c5aba9c19348e8d5d
   
   If use synchronized, do we apply a synchronized block to this object as following code?
   I think `lock` object cannot be used for synchronized in updateDesiredRatePerSec() like `lock synchronized {…} block in maybeThrottle()`, it may take more than a few minute due to frequent sleeps depending on Throttler.desiredRatePerSec, and may block reconfigure() thread at updateDesiredRatePerSec() as well. 
   
   ```
     def updateDesiredRatePerSec(updatedDesiredRatePerSec: Double): Unit = {
       this synchronized {
         desiredRatePerSec = updatedDesiredRatePerSec;
       }
     }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tyamashi-oss commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
tyamashi-oss commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r916537625


##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1856,7 +1856,7 @@ class LogCleanerTest {
   @Test
   def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
     val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
-    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000: java.lang.Double)

Review Comment:
   Thank you for noticing.
   After checking other tests, I replaced Properties.put(Object, Object) to Properteis.setProperty(String, String), and used string, not number. Because it is appropriate to use string as key and value for Properties, and string values are also used when actually creating KafkaConfig. And I also confirmed that the previous compilation error reappeared when using 10000000L: "the result type of an implicit conversion must be more specific than Object"
   https://github.com/apache/kafka/pull/12296/commits/ba640f532ff30bd3f801613eadd2d82af02e6a8a



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r913806265


##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1854,6 +1853,26 @@ class LogCleanerTest {
     } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+    val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+
+    val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time)
+
+    assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")

Review Comment:
   nit: the 1st parameter of `assertEquals` should be expected value, and 2nd one is actual value. Same comment applied to below assertion. That is:
   `assertEquals(1000000, logCleaner.throttler.desiredRatePerSec)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r913826308


##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1854,6 +1853,26 @@ class LogCleanerTest {
     } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+    val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+
+    val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time)
+
+    assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")
+
+    val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 20000000)
+
+    logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps))

Review Comment:
   I think the `logCleaner` should not call `shutdown` in the end since we never `startup` it, am I correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tyamashi-oss commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
tyamashi-oss commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r914719051


##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1854,6 +1853,26 @@ class LogCleanerTest {
     } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+    val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+
+    val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time)
+
+    assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")

Review Comment:
   Thank you. I’ve updated the assert method parameters and the error message. 
   https://github.com/apache/kafka/pull/12296/commits/e05707d5613d096df06c4e96085deb481b00a7e7



##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1854,6 +1853,26 @@ class LogCleanerTest {
     } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+    val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+
+    val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time)
+
+    assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")
+
+    val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 20000000)
+
+    logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps))
+
+    assertEquals(logCleaner.throttler.desiredRatePerSec, 20000000, "Throttler.desiredRatePerSec should be updated with new KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")

Review Comment:
   Thank you. I’ve update the error message. 
   https://github.com/apache/kafka/pull/12296/commits/e05707d5613d096df06c4e96085deb481b00a7e7



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r915850442


##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1856,7 +1856,7 @@ class LogCleanerTest {
   @Test
   def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
     val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
-    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000: java.lang.Double)

Review Comment:
   nit: I think we can just put `10000000L` here.



##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1856,7 +1856,7 @@ class LogCleanerTest {
   @Test
   def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
     val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
-    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000: java.lang.Double)

Review Comment:
   nit: I think we can just put `10000000L` here for long value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tombentley commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
tombentley commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r916026938


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -186,11 +186,18 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-    * Reconfigure log clean config. This simply stops current log cleaners and creates new ones.
+    * Reconfigure log clean config. It will (1) update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond if necessary (2) stop current log cleaners and create new ones.

Review Comment:
   ```suggestion
       * Reconfigure log clean config. The will:
       * 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary 
       * 2. stop current log cleaners and create new ones.
   ```



##########
core/src/main/scala/kafka/utils/Throttler.scala:
##########
@@ -36,7 +36,7 @@ import scala.math._
  * @param time: The time implementation to use
  */
 @threadsafe
-class Throttler(desiredRatePerSec: Double,
+class Throttler(var desiredRatePerSec: Double,

Review Comment:
   That seems reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r916557022


##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1856,7 +1856,7 @@ class LogCleanerTest {
   @Test
   def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
     val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
-    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000: java.lang.Double)

Review Comment:
   Thanks for the update!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r911661767


##########
core/src/main/scala/kafka/utils/Throttler.scala:
##########
@@ -36,7 +36,7 @@ import scala.math._
  * @param time: The time implementation to use
  */
 @threadsafe
-class Throttler(desiredRatePerSec: Double,
+class Throttler(var desiredRatePerSec: Double,

Review Comment:
   If the `desiredRatePerSec` can be changed by other threads, should we `volatile` it?



##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -191,6 +191,13 @@ class LogCleaner(initialConfig: CleanerConfig,
     */
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
     config = LogCleaner.cleanerConfig(newConfig)
+
+    val maxIoBytesPerSecond = config.maxIoBytesPerSecond;
+    if (maxIoBytesPerSecond != oldConfig.logCleanerIoMaxBytesPerSecond) {
+      info(s"Updating logCleanerIoMaxBytesPerSecond: $maxIoBytesPerSecond")
+      throttler.updateDesiredRatePerSec(maxIoBytesPerSecond)
+    }

Review Comment:
   Could we also add a test for `LogCleaner` to make sure after `reconfigure`, the throttler will get the updated config?



##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -191,6 +191,13 @@ class LogCleaner(initialConfig: CleanerConfig,
     */
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {

Review Comment:
   The javadoc should also update about updating the throttler.



##########
core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala:
##########
@@ -58,4 +58,50 @@ class ThrottlerTest {
     val actualCountPerSec = 4 * desiredCountPerInterval * 1000 / elapsedTimeMs
     assertTrue(actualCountPerSec <= desiredCountPerSec)
   }
+
+  @Test
+  def testUpdateThrottleDesiredRate(): Unit = {
+    val throttleCheckIntervalMs = 100
+    val desiredCountPerSec = 1000.0
+    val desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0
+    val updatedDesiredCountPerSec = 1500.0;
+    val updatedDesiredCountPerInterval = updatedDesiredCountPerSec * throttleCheckIntervalMs / 1000.0
+
+    val mockTime = new MockTime()
+    val throttler = new Throttler(desiredRatePerSec = desiredCountPerSec,
+      checkIntervalMs = throttleCheckIntervalMs,
+      time = mockTime)
+
+    // Observe desiredCountPerInterval at t1
+    val t1 = mockTime.milliseconds()
+    throttler.maybeThrottle(desiredCountPerInterval)
+    assertEquals(t1, mockTime.milliseconds())
+
+    // Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
+    mockTime.sleep(throttleCheckIntervalMs + 1)
+    throttler.maybeThrottle(desiredCountPerInterval)
+    val t2 = mockTime.milliseconds()
+    assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs)
+
+    val elapsedTimeMs = t2 - t1
+    val actualCountPerSec = 2 * desiredCountPerInterval * 1000 / elapsedTimeMs
+    assertTrue(actualCountPerSec <= desiredCountPerSec)
+
+    // Update ThrottleDesiredRate
+    throttler.updateDesiredRatePerSec(updatedDesiredCountPerSec);
+
+    // Observe desiredCountPerInterval at t2
+    throttler.maybeThrottle(updatedDesiredCountPerInterval)

Review Comment:
   the comment should be: `updatedDesiredCountPerInterval`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r915386298


##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1854,6 +1853,33 @@ class LogCleanerTest {
     } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+    val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+
+    val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time) {
+      // shutdown() and startup() are called in LogCleaner.reconfigure().
+      // Empty startup() and shutdown() to ensure that no unnecessary log cleaner threads remain after this test.
+      override def startup(): Unit = {}
+      override def shutdown(): Unit = {}
+    }
+
+    try {
+      assertEquals(10000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be initialized from initial `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+
+      val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+      newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 20000000)
+
+      logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps))
+
+      assertEquals(20000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be updated with new `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+    } finally logCleaner.shutdown();

Review Comment:
   nit: (1) no semicolon is needed (2) the format in Kafka is usually like this:
   ```
   finally {
     logCleaner.shutdown()
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tyamashi-oss commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
tyamashi-oss commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r913695123


##########
core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala:
##########
@@ -58,4 +58,50 @@ class ThrottlerTest {
     val actualCountPerSec = 4 * desiredCountPerInterval * 1000 / elapsedTimeMs
     assertTrue(actualCountPerSec <= desiredCountPerSec)
   }
+
+  @Test
+  def testUpdateThrottleDesiredRate(): Unit = {
+    val throttleCheckIntervalMs = 100
+    val desiredCountPerSec = 1000.0
+    val desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0
+    val updatedDesiredCountPerSec = 1500.0;
+    val updatedDesiredCountPerInterval = updatedDesiredCountPerSec * throttleCheckIntervalMs / 1000.0
+
+    val mockTime = new MockTime()
+    val throttler = new Throttler(desiredRatePerSec = desiredCountPerSec,
+      checkIntervalMs = throttleCheckIntervalMs,
+      time = mockTime)
+
+    // Observe desiredCountPerInterval at t1
+    val t1 = mockTime.milliseconds()
+    throttler.maybeThrottle(desiredCountPerInterval)
+    assertEquals(t1, mockTime.milliseconds())
+
+    // Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
+    mockTime.sleep(throttleCheckIntervalMs + 1)
+    throttler.maybeThrottle(desiredCountPerInterval)
+    val t2 = mockTime.milliseconds()
+    assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs)
+
+    val elapsedTimeMs = t2 - t1
+    val actualCountPerSec = 2 * desiredCountPerInterval * 1000 / elapsedTimeMs
+    assertTrue(actualCountPerSec <= desiredCountPerSec)
+
+    // Update ThrottleDesiredRate
+    throttler.updateDesiredRatePerSec(updatedDesiredCountPerSec);
+
+    // Observe desiredCountPerInterval at t2
+    throttler.maybeThrottle(updatedDesiredCountPerInterval)

Review Comment:
   Thank you. The comments have been changed. https://github.com/apache/kafka/pull/12296/commits/ec4c24ee6e315657f364d0953ddfee7d61712692



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12296:
URL: https://github.com/apache/kafka/pull/12296#issuecomment-1178945720

   @tyamashi-oss , thanks for catching the issue and the fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon merged pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
showuon merged PR #12296:
URL: https://github.com/apache/kafka/pull/12296


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12296:
URL: https://github.com/apache/kafka/pull/12296#issuecomment-1178942935

   Failed tests are unrelated:
   ```
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets
       Build / JDK 8 and Scala 2.12 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tombentley commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

Posted by GitBox <gi...@apache.org>.
tombentley commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r911769480


##########
core/src/main/scala/kafka/utils/Throttler.scala:
##########
@@ -36,7 +36,7 @@ import scala.math._
  * @param time: The time implementation to use
  */
 @threadsafe
-class Throttler(desiredRatePerSec: Double,
+class Throttler(var desiredRatePerSec: Double,

Review Comment:
   @showuon given that we're synchronizing on `lock` anyway at the point of use, maybe synchronizing in `updateDesiredRatePerSec()` would actually be better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org