You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/07/08 12:41:58 UTC
[kafka] branch trunk updated: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically (#12296)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e85500bbbef KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically (#12296)
e85500bbbef is described below
commit e85500bbbef7c10ee6bd379ab1327e8abb858b6b
Author: Tomonari Yamashita <ty...@redhat.com>
AuthorDate: Fri Jul 8 21:41:47 2022 +0900
KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically (#12296)
log.cleaner.io.max.bytes.per.second cannot be changed dynamically using bin/kafka-configs.sh. Call updateDesiredRatePerSec() of Throttler with new log.cleaner.io.max.bytes.per.second value in reconfigure() of Log Cleaner to fix the issue.
Reviewers: Tom Bentley <tb...@redhat.com>, Luke Chen <sh...@gmail.com>
---
core/src/main/scala/kafka/log/LogCleaner.scala | 13 +++++-
core/src/main/scala/kafka/utils/Throttler.scala | 12 ++++--
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 32 ++++++++++++++-
.../scala/unit/kafka/utils/ThrottlerTest.scala | 46 ++++++++++++++++++++++
4 files changed, 95 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index df20a4a36e9..bf55f10a400 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -104,7 +104,7 @@ class LogCleaner(initialConfig: CleanerConfig,
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, logDirFailureChannel)
/* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
- private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond,
+ private[log] val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond,
checkIntervalMs = 300,
throttleDown = true,
"cleaner-io",
@@ -186,11 +186,20 @@ class LogCleaner(initialConfig: CleanerConfig,
}
/**
- * Reconfigure log clean config. This simply stops current log cleaners and creates new ones.
+ * Reconfigure log clean config. The will:
+ * 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary
+ * 2. stop current log cleaners and create new ones.
* That ensures that if any of the cleaners had failed, new cleaners are created to match the new config.
*/
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
config = LogCleaner.cleanerConfig(newConfig)
+
+ val maxIoBytesPerSecond = config.maxIoBytesPerSecond;
+ if (maxIoBytesPerSecond != oldConfig.logCleanerIoMaxBytesPerSecond) {
+ info(s"Updating logCleanerIoMaxBytesPerSecond: $maxIoBytesPerSecond")
+ throttler.updateDesiredRatePerSec(maxIoBytesPerSecond)
+ }
+
shutdown()
startup()
}
diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala
index cce6270cf02..a431db5f006 100644
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ b/core/src/main/scala/kafka/utils/Throttler.scala
@@ -36,7 +36,7 @@ import scala.math._
* @param time: The time implementation to use
*/
@threadsafe
-class Throttler(desiredRatePerSec: Double,
+class Throttler(@volatile var desiredRatePerSec: Double,
checkIntervalMs: Long = 100L,
throttleDown: Boolean = true,
metricName: String = "throttler",
@@ -52,6 +52,7 @@ class Throttler(desiredRatePerSec: Double,
def maybeThrottle(observed: Double): Unit = {
val msPerSec = TimeUnit.SECONDS.toMillis(1)
val nsPerSec = TimeUnit.SECONDS.toNanos(1)
+ val currentDesiredRatePerSec = desiredRatePerSec;
meter.mark(observed.toLong)
lock synchronized {
@@ -62,14 +63,14 @@ class Throttler(desiredRatePerSec: Double,
// we should take a little nap
if (elapsedNs > checkIntervalNs && observedSoFar > 0) {
val rateInSecs = (observedSoFar * nsPerSec) / elapsedNs
- val needAdjustment = !(throttleDown ^ (rateInSecs > desiredRatePerSec))
+ val needAdjustment = !(throttleDown ^ (rateInSecs > currentDesiredRatePerSec))
if (needAdjustment) {
// solve for the amount of time to sleep to make us hit the desired rate
- val desiredRateMs = desiredRatePerSec / msPerSec.toDouble
+ val desiredRateMs = currentDesiredRatePerSec / msPerSec.toDouble
val elapsedMs = TimeUnit.NANOSECONDS.toMillis(elapsedNs)
val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs)
if (sleepTime > 0) {
- trace("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, desiredRatePerSec, sleepTime))
+ trace("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, currentDesiredRatePerSec, sleepTime))
time.sleep(sleepTime)
}
}
@@ -79,6 +80,9 @@ class Throttler(desiredRatePerSec: Double,
}
}
+ def updateDesiredRatePerSec(updatedDesiredRatePerSec: Double): Unit = {
+ desiredRatePerSec = updatedDesiredRatePerSec;
+ }
}
object Throttler {
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index b3a1a76f0d0..0dd8a2cfb04 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -23,9 +23,8 @@ import java.nio.charset.StandardCharsets
import java.nio.file.Paths
import java.util.Properties
import java.util.concurrent.{CountDownLatch, TimeUnit}
-
import kafka.common._
-import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
+import kafka.server.{BrokerTopicStats, KafkaConfig, LogDirFailureChannel}
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.CorruptRecordException
@@ -1854,6 +1853,35 @@ class LogCleanerTest {
} finally logCleaner.shutdown()
}
+ @Test
+ def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+ val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+ oldKafkaProps.setProperty(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "10000000")
+
+ val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
+ logDirs = Array(TestUtils.tempDir()),
+ logs = new Pool[TopicPartition, UnifiedLog](),
+ logDirFailureChannel = new LogDirFailureChannel(1),
+ time = time) {
+ // shutdown() and startup() are called in LogCleaner.reconfigure().
+ // Empty startup() and shutdown() to ensure that no unnecessary log cleaner threads remain after this test.
+ override def startup(): Unit = {}
+ override def shutdown(): Unit = {}
+ }
+
+ try {
+ assertEquals(10000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be initialized from initial `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+
+ val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+ newKafkaProps.setProperty(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "20000000")
+
+ logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps))
+
+ assertEquals(20000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be updated with new `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+ } finally {
+ logCleaner.shutdown()
+ }
+ }
private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
diff --git a/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala b/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala
index 1591cbad900..80ebde4fcd7 100755
--- a/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala
@@ -58,4 +58,50 @@ class ThrottlerTest {
val actualCountPerSec = 4 * desiredCountPerInterval * 1000 / elapsedTimeMs
assertTrue(actualCountPerSec <= desiredCountPerSec)
}
+
+ @Test
+ def testUpdateThrottleDesiredRate(): Unit = {
+ val throttleCheckIntervalMs = 100
+ val desiredCountPerSec = 1000.0
+ val desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0
+ val updatedDesiredCountPerSec = 1500.0;
+ val updatedDesiredCountPerInterval = updatedDesiredCountPerSec * throttleCheckIntervalMs / 1000.0
+
+ val mockTime = new MockTime()
+ val throttler = new Throttler(desiredRatePerSec = desiredCountPerSec,
+ checkIntervalMs = throttleCheckIntervalMs,
+ time = mockTime)
+
+ // Observe desiredCountPerInterval at t1
+ val t1 = mockTime.milliseconds()
+ throttler.maybeThrottle(desiredCountPerInterval)
+ assertEquals(t1, mockTime.milliseconds())
+
+ // Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
+ mockTime.sleep(throttleCheckIntervalMs + 1)
+ throttler.maybeThrottle(desiredCountPerInterval)
+ val t2 = mockTime.milliseconds()
+ assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs)
+
+ val elapsedTimeMs = t2 - t1
+ val actualCountPerSec = 2 * desiredCountPerInterval * 1000 / elapsedTimeMs
+ assertTrue(actualCountPerSec <= desiredCountPerSec)
+
+ // Update ThrottleDesiredRate
+ throttler.updateDesiredRatePerSec(updatedDesiredCountPerSec);
+
+ // Observe updatedDesiredCountPerInterval at t2
+ throttler.maybeThrottle(updatedDesiredCountPerInterval)
+ assertEquals(t2, mockTime.milliseconds())
+
+ // Observe updatedDesiredCountPerInterval at t2 + throttleCheckIntervalMs + 1
+ mockTime.sleep(throttleCheckIntervalMs + 1)
+ throttler.maybeThrottle(updatedDesiredCountPerInterval)
+ val t3 = mockTime.milliseconds()
+ assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs)
+
+ val updatedElapsedTimeMs = t3 - t2
+ val updatedActualCountPerSec = 2 * updatedDesiredCountPerInterval * 1000 / updatedElapsedTimeMs
+ assertTrue(updatedActualCountPerSec <= updatedDesiredCountPerSec)
+ }
}