You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/03/24 21:05:51 UTC
[kafka] branch 1.1 updated: KAFKA-6710: Remove Thread.sleep from
LogManager.deleteLogs (#4771)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 2a76d54 KAFKA-6710: Remove Thread.sleep from LogManager.deleteLogs (#4771)
2a76d54 is described below
commit 2a76d543c206b033ae9e7c6c40e518cc0778c775
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Sat Mar 24 20:54:09 2018 +0000
KAFKA-6710: Remove Thread.sleep from LogManager.deleteLogs (#4771)
`Thread.sleep` in `LogManager.deleteLogs` potentially blocks a scheduler thread for up to `log.segment.delete.delay.ms` with a default value of a minute. To avoid this, `deleteLogs` now deletes the logs for which `currentDefaultConfig.fileDeleteDelayMs` has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed are considered for deletion in the next iteration of `deleteLogs`, which is scheduled sooner if required.
Reviewers: Jun Rao <ju...@gmail.com>, Dong Lin <li...@gmail.com>, Ted Yu <yu...@gmail.com>
---
core/src/main/scala/kafka/log/LogManager.scala | 33 ++++++++++++++++------
.../test/scala/unit/kafka/log/LogManagerTest.scala | 5 ++++
2 files changed, 29 insertions(+), 9 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 7aa5bcd..f26a84c 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -79,13 +79,15 @@ class LogManager(logDirs: Seq[File],
private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]()
private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
- @volatile var currentDefaultConfig = initialDefaultConfig
+ @volatile private var _currentDefaultConfig = initialDefaultConfig
@volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir
def reconfigureDefaultLogConfig(logConfig: LogConfig): Unit = {
- this.currentDefaultConfig = logConfig
+ this._currentDefaultConfig = logConfig
}
+ def currentDefaultConfig: LogConfig = _currentDefaultConfig
+
def liveLogDirs: Seq[File] = {
if (_liveLogDirs.size == logDirs.size)
logDirs
@@ -245,6 +247,9 @@ class LogManager(logDirs: Seq[File],
this.logsToBeDeleted.add((log, time.milliseconds()))
}
+ // Only for testing
+ private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty
+
private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = {
debug("Loading log '" + logDir.getName + "'")
val topicPartition = Log.parseTopicPartitionName(logDir)
@@ -704,17 +709,27 @@ class LogManager(logDirs: Seq[File],
}
/**
- * Delete logs marked for deletion.
+ * Delete logs marked for deletion. Delete all logs for which `currentDefaultConfig.fileDeleteDelayMs`
+ * has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed will be
+ * considered for deletion in the next iteration of `deleteLogs`. The next iteration will be executed
+ * after the remaining time for the first log that is not deleted. If there are no more `logsToBeDeleted`,
+ * `deleteLogs` will be executed after `currentDefaultConfig.fileDeleteDelayMs`.
*/
private def deleteLogs(): Unit = {
+ var nextDelayMs = 0L
try {
- while (!logsToBeDeleted.isEmpty) {
- val (removedLog, scheduleTimeMs) = logsToBeDeleted.take()
+ def nextDeleteDelayMs: Long = {
+ if (!logsToBeDeleted.isEmpty) {
+ val (_, scheduleTimeMs) = logsToBeDeleted.peek()
+ scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
+ } else
+ currentDefaultConfig.fileDeleteDelayMs
+ }
+
+ while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
+ val (removedLog, _) = logsToBeDeleted.take()
if (removedLog != null) {
try {
- val waitingTimeMs = scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
- if (waitingTimeMs > 0)
- Thread.sleep(waitingTimeMs)
removedLog.delete()
info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
} catch {
@@ -730,7 +745,7 @@ class LogManager(logDirs: Seq[File],
try {
scheduler.schedule("kafka-delete-logs",
deleteLogs _,
- delay = currentDefaultConfig.fileDeleteDelayMs,
+ delay = nextDelayMs,
unit = TimeUnit.MILLISECONDS)
} catch {
case e: Throwable =>
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 8a04914..20324a9 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -332,5 +332,10 @@ class LogManagerTest {
assertNotEquals("File reference was not updated in index", fileBeforeDelete.getAbsolutePath,
fileInIndex.get.getAbsolutePath)
}
+
+ time.sleep(logManager.InitialTaskDelayMs)
+ assertTrue("Logs deleted too early", logManager.hasLogsToBeDeleted)
+ time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - logManager.InitialTaskDelayMs)
+ assertFalse("Logs not deleted", logManager.hasLogsToBeDeleted)
}
}
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.