You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/13 05:20:49 UTC
[kafka] branch trunk updated: KAFKA-6624;
Prevent concurrent log flush and log deletion (#4663)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1ea07b9 KAFKA-6624; Prevent concurrent log flush and log deletion (#4663)
1ea07b9 is described below
commit 1ea07b993d75ed68f4c04282eb177bf84156e0b2
Author: Dong Lin <li...@users.noreply.github.com>
AuthorDate: Mon Mar 12 22:20:44 2018 -0700
KAFKA-6624; Prevent concurrent log flush and log deletion (#4663)
KAFKA-6624; Prevent concurrent log flush and log deletion
Reviewers: Ted Yu <yu...@gmail.com>, Jun Rao <ju...@gmail.com>
---
core/src/main/scala/kafka/log/LogManager.scala | 18 +++++++++++++-----
1 file changed, 13 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 9ae93aa..7aa5bcd 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -75,7 +75,8 @@ class LogManager(logDirs: Seq[File],
// from one log directory to another log directory on the same broker. The directory of the future log will be renamed
// to replace the current log of the partition after the future log catches up with the current log
private val futureLogs = new Pool[TopicPartition, Log]()
- private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
+ // Each element in the queue contains the log object to be deleted and the time it is scheduled for deletion.
+ private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]()
private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
@volatile var currentDefaultConfig = initialDefaultConfig
@@ -240,6 +241,10 @@ class LogManager(logDirs: Seq[File],
}
}
+ private def addLogToBeDeleted(log: Log): Unit = {
+ this.logsToBeDeleted.add((log, time.milliseconds()))
+ }
+
private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = {
debug("Loading log '" + logDir.getName + "'")
val topicPartition = Log.parseTopicPartitionName(logDir)
@@ -260,7 +265,7 @@ class LogManager(logDirs: Seq[File],
logDirFailureChannel = logDirFailureChannel)
if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
- this.logsToBeDeleted.add(log)
+ addLogToBeDeleted(log)
} else {
val previous = {
if (log.isFuture)
@@ -704,9 +709,12 @@ class LogManager(logDirs: Seq[File],
private def deleteLogs(): Unit = {
try {
while (!logsToBeDeleted.isEmpty) {
- val removedLog = logsToBeDeleted.take()
+ val (removedLog, scheduleTimeMs) = logsToBeDeleted.take()
if (removedLog != null) {
try {
+ val waitingTimeMs = scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
+ if (waitingTimeMs > 0)
+ Thread.sleep(waitingTimeMs)
removedLog.delete()
info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
} catch {
@@ -767,7 +775,7 @@ class LogManager(logDirs: Seq[File],
sourceLog.close()
checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile)
checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile)
- logsToBeDeleted.add(sourceLog)
+ addLogToBeDeleted(sourceLog)
} catch {
case e: KafkaStorageException =>
// If sourceLog's log directory is offline, we need close its handlers here.
@@ -805,7 +813,7 @@ class LogManager(logDirs: Seq[File],
removedLog.renameDir(Log.logDeleteDirName(topicPartition))
checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile)
checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
- logsToBeDeleted.add(removedLog)
+ addLogToBeDeleted(removedLog)
info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
} else if (offlineLogDirs.nonEmpty) {
throw new KafkaStorageException("Failed to delete log for " + topicPartition + " because it may be in one of the offline directories " + offlineLogDirs.mkString(","))
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.