You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/09/17 05:21:06 UTC

svn commit: r1171886 - in /incubator/kafka/trunk/core/src: main/scala/kafka/log/LogManager.scala main/scala/kafka/server/KafkaConfig.scala test/scala/unit/kafka/log/LogManagerTest.scala

Author: jkreps
Date: Sat Sep 17 03:21:06 2011
New Revision: 1171886

URL: http://svn.apache.org/viewvc?rev=1171886&view=rev
Log:
KAFKA-70 Patch from Prashanth Menon to add space-based retention setting.


Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1171886&r1=1171885&r2=1171886&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Sat Sep 17 03:21:06 2011
@@ -51,6 +51,7 @@ private[kafka] class LogManager(val conf
   private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
   private val logFlushIntervalMap = config.flushIntervalMap
   private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
+  private val logRetentionSize = config.logRetentionSize
 
   /* Initialize a log for each subdirectory of the main log directory */
   private val logs = new Pool[String, Pool[Int, Log]]()
@@ -193,6 +194,51 @@ private[kafka] class LogManager(val conf
     log
   }
   
+  /* Attemps to delete all provided segments from a log and returns how many it was able to */
+  private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = {
+    var total = 0
+    for(segment <- segments) {
+      logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name)
+      Utils.swallow(logger.warn, segment.messageSet.close())
+      if(!segment.file.delete()) {
+        logger.warn("Delete failed.")
+      } else {
+        total += 1
+      }
+    }
+    total
+  }
+
+  /* Runs through the log removing segments older than a certain age */
+  private def cleanupExpiredSegments(log: Log): Int = {
+    val startMs = time.milliseconds
+    val topic = Utils.getTopicPartition(log.dir.getName)._1
+    val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
+    val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
+    val total = deleteSegments(log, toBeDeleted)
+    total
+  }
+
+  /**
+   *  Runs through the log removing segments until the size of the log
+   *  is at least logRetentionSize bytes in size
+   */
+  private def cleanupSegmentsToMaintainSize(log: Log): Int = {
+    if(logRetentionSize < 0 || log.size < logRetentionSize) return 0
+    var diff = log.size - logRetentionSize
+    def shouldDelete(segment: LogSegment) = {
+      if(diff - segment.size >= 0) {
+        diff -= segment.size
+        true
+      } else {
+        false
+      }
+    }
+    val toBeDeleted = log.markDeletedWhile( shouldDelete )
+    val total = deleteSegments(log, toBeDeleted)
+    total
+  }
+
   /**
    * Delete any eligible logs. Return the number of segments deleted.
    */
@@ -204,19 +250,7 @@ private[kafka] class LogManager(val conf
     while(iter.hasNext) {
       val log = iter.next
       logger.debug("Garbage collecting '" + log.name + "'")
-      var logCleanupThresholdMS = this.logCleanupDefaultAgeMs
-      val topic = Utils.getTopicPartition(log.dir.getName)._1
-      if (logRetentionMSMap.contains(topic))
-        logCleanupThresholdMS = logRetentionMSMap(topic)
-      val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
-      for(segment <- toBeDeleted) {
-        logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name)
-        Utils.swallow(logger.warn, segment.messageSet.close())
-        if(!segment.file.delete())
-          logger.warn("Delete failed.")
-        else
-          total += 1
-      }
+      total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
     }
     logger.debug("Log cleanup completed. " + total + " files deleted in " + 
                  (time.milliseconds - startMs) / 1000 + " seconds")

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1171886&r1=1171885&r2=1171886&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala Sat Sep 17 03:21:06 2011
@@ -64,6 +64,9 @@ class KafkaConfig(props: Properties) ext
   /* the number of hours to keep a log file before deleting it */
   val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
   
+  /* the maximum size of the log before deleting it */
+  val logRetentionSize = Utils.getInt(props, "log.retention.size", -1)
+
   /* the number of hours to keep a log file before deleting it for some specific topic*/
   val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
 

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1171886&r1=1171885&r2=1171886&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Sat Sep 17 03:21:06 2011
@@ -59,7 +59,7 @@ class LogManagerTest extends JUnitSuite 
 
 
   @Test
-  def testCleanup() {
+  def testCleanupExpiredSegments() {
     val log = logManager.getOrCreateLog("cleanup", 0)
     var offset = 0L
     for(i <- 0 until 1000) {
@@ -87,6 +87,54 @@ class LogManagerTest extends JUnitSuite 
   }
 
   @Test
+  def testCleanupSegmentsToMaintainSize() {
+    val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
+    val retentionHours = 1
+    val retentionMs = 1000 * 60 * 60 * retentionHours
+    val props = TestUtils.createBrokerConfig(0, -1)
+    logManager.close
+    Thread.sleep(100)
+    config = new KafkaConfig(props) {
+      override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
+      override val enableZookeeper = false
+      override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Int] // keep exactly 6 segments + 1 roll over
+      override val logRetentionHours = retentionHours
+    }
+    logManager = new LogManager(config, null, time, -1, retentionMs, false)
+    logManager.startup
+
+    // create a log
+    val log = logManager.getOrCreateLog("cleanup", 0)
+    var offset = 0L
+
+    // add a bunch of messages that should be larger than the retentionSize
+    for(i <- 0 until 1000) {
+      val set = TestUtils.singleMessageSet("test".getBytes())
+      log.append(set)
+      offset += set.sizeInBytes
+    }
+    // flush to make sure it's written to disk, then sleep to confirm
+    log.flush
+    Thread.sleep(2000)
+
+    // should be exactly 100 full segments + 1 new empty one
+    assertEquals("There should be example 101 segments.", 100 + 1, log.numberOfSegments)
+
+    // this cleanup shouldn't find any expired segments but should delete some to reduce size
+    logManager.cleanupLogs()
+    assertEquals("Now there should be exactly 7 segments", 6 + 1, log.numberOfSegments)
+    assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
+    try {
+      log.read(0, 1024)
+      fail("Should get exception from fetching earlier.")
+    } catch {
+      case e: OffsetOutOfRangeException => "This is good."
+    }
+    // log should still be appendable
+    log.append(TestUtils.singleMessageSet("test".getBytes()))
+  }
+
+  @Test
   def testTimeBasedFlush() {
     val props = TestUtils.createBrokerConfig(0, -1)
     logManager.close