You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/03/24 21:05:51 UTC

[kafka] branch 1.1 updated: KAFKA-6710: Remove Thread.sleep from LogManager.deleteLogs (#4771)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 2a76d54  KAFKA-6710: Remove Thread.sleep from LogManager.deleteLogs (#4771)
2a76d54 is described below

commit 2a76d543c206b033ae9e7c6c40e518cc0778c775
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Sat Mar 24 20:54:09 2018 +0000

    KAFKA-6710: Remove Thread.sleep from LogManager.deleteLogs (#4771)
    
    `Thread.sleep` in `LogManager.deleteLogs` potentially blocks a scheduler thread for up to `log.segment.delete.delay.ms` with a default value of a minute. To avoid this, `deleteLogs` now deletes the logs for which `currentDefaultConfig.fileDeleteDelayMs` has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed are considered for deletion in the next iteration of `deleteLogs`, which is scheduled sooner if required.
    
    Reviewers: Jun Rao <ju...@gmail.com>, Dong Lin <li...@gmail.com>, Ted Yu <yu...@gmail.com>
---
 core/src/main/scala/kafka/log/LogManager.scala     | 33 ++++++++++++++++------
 .../test/scala/unit/kafka/log/LogManagerTest.scala |  5 ++++
 2 files changed, 29 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 7aa5bcd..f26a84c 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -79,13 +79,15 @@ class LogManager(logDirs: Seq[File],
   private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]()
 
   private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
-  @volatile var currentDefaultConfig = initialDefaultConfig
+  @volatile private var _currentDefaultConfig = initialDefaultConfig
   @volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir
 
   def reconfigureDefaultLogConfig(logConfig: LogConfig): Unit = {
-    this.currentDefaultConfig = logConfig
+    this._currentDefaultConfig = logConfig
   }
 
+  def currentDefaultConfig: LogConfig = _currentDefaultConfig
+
   def liveLogDirs: Seq[File] = {
     if (_liveLogDirs.size == logDirs.size)
       logDirs
@@ -245,6 +247,9 @@ class LogManager(logDirs: Seq[File],
     this.logsToBeDeleted.add((log, time.milliseconds()))
   }
 
+  // Only for testing
+  private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty
+
   private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = {
     debug("Loading log '" + logDir.getName + "'")
     val topicPartition = Log.parseTopicPartitionName(logDir)
@@ -704,17 +709,27 @@ class LogManager(logDirs: Seq[File],
   }
 
   /**
-   *  Delete logs marked for deletion.
+   *  Delete logs marked for deletion. Delete all logs for which `currentDefaultConfig.fileDeleteDelayMs`
+   *  has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed will be
+   *  considered for deletion in the next iteration of `deleteLogs`. The next iteration will be executed
+   *  after the remaining time for the first log that is not deleted. If there are no more `logsToBeDeleted`,
+   *  `deleteLogs` will be executed after `currentDefaultConfig.fileDeleteDelayMs`.
    */
   private def deleteLogs(): Unit = {
+    var nextDelayMs = 0L
     try {
-      while (!logsToBeDeleted.isEmpty) {
-        val (removedLog, scheduleTimeMs) = logsToBeDeleted.take()
+      def nextDeleteDelayMs: Long = {
+        if (!logsToBeDeleted.isEmpty) {
+          val (_, scheduleTimeMs) = logsToBeDeleted.peek()
+          scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
+        } else
+          currentDefaultConfig.fileDeleteDelayMs
+      }
+
+      while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
+        val (removedLog, _) = logsToBeDeleted.take()
         if (removedLog != null) {
           try {
-            val waitingTimeMs = scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
-            if (waitingTimeMs > 0)
-              Thread.sleep(waitingTimeMs)
             removedLog.delete()
             info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
           } catch {
@@ -730,7 +745,7 @@ class LogManager(logDirs: Seq[File],
       try {
         scheduler.schedule("kafka-delete-logs",
           deleteLogs _,
-          delay = currentDefaultConfig.fileDeleteDelayMs,
+          delay = nextDelayMs,
           unit = TimeUnit.MILLISECONDS)
       } catch {
         case e: Throwable =>
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 8a04914..20324a9 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -332,5 +332,10 @@ class LogManagerTest {
       assertNotEquals("File reference was not updated in index", fileBeforeDelete.getAbsolutePath,
         fileInIndex.get.getAbsolutePath)
     }
+
+    time.sleep(logManager.InitialTaskDelayMs)
+    assertTrue("Logs deleted too early", logManager.hasLogsToBeDeleted)
+    time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - logManager.InitialTaskDelayMs)
+    assertFalse("Logs not deleted", logManager.hasLogsToBeDeleted)
   }
 }

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.