You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/13 05:21:49 UTC

[kafka] branch 1.1 updated: KAFKA-6624; Prevent concurrent log flush and log deletion (#4663)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 91a14d3  KAFKA-6624; Prevent concurrent log flush and log deletion (#4663)
91a14d3 is described below

commit 91a14d3c23bbc3da6f33c9f0bdfda37d0864e726
Author: Dong Lin <li...@users.noreply.github.com>
AuthorDate: Mon Mar 12 22:20:44 2018 -0700

    KAFKA-6624; Prevent concurrent log flush and log deletion (#4663)
    
    KAFKA-6624; Prevent concurrent log flush and log deletion
    
    Reviewers: Ted Yu <yu...@gmail.com>, Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/LogManager.scala | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 9ae93aa..7aa5bcd 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -75,7 +75,8 @@ class LogManager(logDirs: Seq[File],
   // from one log directory to another log directory on the same broker. The directory of the future log will be renamed
   // to replace the current log of the partition after the future log catches up with the current log
   private val futureLogs = new Pool[TopicPartition, Log]()
-  private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
+  // Each element in the queue contains the log object to be deleted and the time it is scheduled for deletion.
+  private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]()
 
   private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
   @volatile var currentDefaultConfig = initialDefaultConfig
@@ -240,6 +241,10 @@ class LogManager(logDirs: Seq[File],
     }
   }
 
+  private def addLogToBeDeleted(log: Log): Unit = {
+    this.logsToBeDeleted.add((log, time.milliseconds()))
+  }
+
   private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = {
     debug("Loading log '" + logDir.getName + "'")
     val topicPartition = Log.parseTopicPartitionName(logDir)
@@ -260,7 +265,7 @@ class LogManager(logDirs: Seq[File],
       logDirFailureChannel = logDirFailureChannel)
 
     if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
-      this.logsToBeDeleted.add(log)
+      addLogToBeDeleted(log)
     } else {
       val previous = {
         if (log.isFuture)
@@ -704,9 +709,12 @@ class LogManager(logDirs: Seq[File],
   private def deleteLogs(): Unit = {
     try {
       while (!logsToBeDeleted.isEmpty) {
-        val removedLog = logsToBeDeleted.take()
+        val (removedLog, scheduleTimeMs) = logsToBeDeleted.take()
         if (removedLog != null) {
           try {
+            val waitingTimeMs = scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
+            if (waitingTimeMs > 0)
+              Thread.sleep(waitingTimeMs)
             removedLog.delete()
             info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
           } catch {
@@ -767,7 +775,7 @@ class LogManager(logDirs: Seq[File],
         sourceLog.close()
         checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile)
         checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile)
-        logsToBeDeleted.add(sourceLog)
+        addLogToBeDeleted(sourceLog)
       } catch {
         case e: KafkaStorageException =>
           // If sourceLog's log directory is offline, we need close its handlers here.
@@ -805,7 +813,7 @@ class LogManager(logDirs: Seq[File],
       removedLog.renameDir(Log.logDeleteDirName(topicPartition))
       checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile)
       checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
-      logsToBeDeleted.add(removedLog)
+      addLogToBeDeleted(removedLog)
       info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
     } else if (offlineLogDirs.nonEmpty) {
       throw new KafkaStorageException("Failed to delete log for " + topicPartition + " because it may be in one of the offline directories " + offlineLogDirs.mkString(","))

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.