You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/09/17 05:21:06 UTC
svn commit: r1171886 - in /incubator/kafka/trunk/core/src:
main/scala/kafka/log/LogManager.scala
main/scala/kafka/server/KafkaConfig.scala
test/scala/unit/kafka/log/LogManagerTest.scala
Author: jkreps
Date: Sat Sep 17 03:21:06 2011
New Revision: 1171886
URL: http://svn.apache.org/viewvc?rev=1171886&view=rev
Log:
KAFKA-70 Patch from Prashanth Menon to add space-based retention setting.
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1171886&r1=1171885&r2=1171886&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Sat Sep 17 03:21:06 2011
@@ -51,6 +51,7 @@ private[kafka] class LogManager(val conf
private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
private val logFlushIntervalMap = config.flushIntervalMap
private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
+ private val logRetentionSize = config.logRetentionSize
/* Initialize a log for each subdirectory of the main log directory */
private val logs = new Pool[String, Pool[Int, Log]]()
@@ -193,6 +194,51 @@ private[kafka] class LogManager(val conf
log
}
+ /* Attemps to delete all provided segments from a log and returns how many it was able to */
+ private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = {
+ var total = 0
+ for(segment <- segments) {
+ logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name)
+ Utils.swallow(logger.warn, segment.messageSet.close())
+ if(!segment.file.delete()) {
+ logger.warn("Delete failed.")
+ } else {
+ total += 1
+ }
+ }
+ total
+ }
+
+ /* Runs through the log removing segments older than a certain age */
+ private def cleanupExpiredSegments(log: Log): Int = {
+ val startMs = time.milliseconds
+ val topic = Utils.getTopicPartition(log.dir.getName)._1
+ val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
+ val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
+ val total = deleteSegments(log, toBeDeleted)
+ total
+ }
+
+ /**
+ * Runs through the log removing segments until the size of the log
+ * is at least logRetentionSize bytes in size
+ */
+ private def cleanupSegmentsToMaintainSize(log: Log): Int = {
+ if(logRetentionSize < 0 || log.size < logRetentionSize) return 0
+ var diff = log.size - logRetentionSize
+ def shouldDelete(segment: LogSegment) = {
+ if(diff - segment.size >= 0) {
+ diff -= segment.size
+ true
+ } else {
+ false
+ }
+ }
+ val toBeDeleted = log.markDeletedWhile( shouldDelete )
+ val total = deleteSegments(log, toBeDeleted)
+ total
+ }
+
/**
* Delete any eligible logs. Return the number of segments deleted.
*/
@@ -204,19 +250,7 @@ private[kafka] class LogManager(val conf
while(iter.hasNext) {
val log = iter.next
logger.debug("Garbage collecting '" + log.name + "'")
- var logCleanupThresholdMS = this.logCleanupDefaultAgeMs
- val topic = Utils.getTopicPartition(log.dir.getName)._1
- if (logRetentionMSMap.contains(topic))
- logCleanupThresholdMS = logRetentionMSMap(topic)
- val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
- for(segment <- toBeDeleted) {
- logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name)
- Utils.swallow(logger.warn, segment.messageSet.close())
- if(!segment.file.delete())
- logger.warn("Delete failed.")
- else
- total += 1
- }
+ total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}
logger.debug("Log cleanup completed. " + total + " files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds")
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1171886&r1=1171885&r2=1171886&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala Sat Sep 17 03:21:06 2011
@@ -64,6 +64,9 @@ class KafkaConfig(props: Properties) ext
/* the number of hours to keep a log file before deleting it */
val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
+ /* the maximum size of the log before deleting it */
+ val logRetentionSize = Utils.getInt(props, "log.retention.size", -1)
+
/* the number of hours to keep a log file before deleting it for some specific topic*/
val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1171886&r1=1171885&r2=1171886&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Sat Sep 17 03:21:06 2011
@@ -59,7 +59,7 @@ class LogManagerTest extends JUnitSuite
@Test
- def testCleanup() {
+ def testCleanupExpiredSegments() {
val log = logManager.getOrCreateLog("cleanup", 0)
var offset = 0L
for(i <- 0 until 1000) {
@@ -87,6 +87,54 @@ class LogManagerTest extends JUnitSuite
}
@Test
+ def testCleanupSegmentsToMaintainSize() {
+ val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
+ val retentionHours = 1
+ val retentionMs = 1000 * 60 * 60 * retentionHours
+ val props = TestUtils.createBrokerConfig(0, -1)
+ logManager.close
+ Thread.sleep(100)
+ config = new KafkaConfig(props) {
+ override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
+ override val enableZookeeper = false
+ override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Int] // keep exactly 6 segments + 1 roll over
+ override val logRetentionHours = retentionHours
+ }
+ logManager = new LogManager(config, null, time, -1, retentionMs, false)
+ logManager.startup
+
+ // create a log
+ val log = logManager.getOrCreateLog("cleanup", 0)
+ var offset = 0L
+
+ // add a bunch of messages that should be larger than the retentionSize
+ for(i <- 0 until 1000) {
+ val set = TestUtils.singleMessageSet("test".getBytes())
+ log.append(set)
+ offset += set.sizeInBytes
+ }
+ // flush to make sure it's written to disk, then sleep to confirm
+ log.flush
+ Thread.sleep(2000)
+
+ // should be exactly 100 full segments + 1 new empty one
+ assertEquals("There should be example 101 segments.", 100 + 1, log.numberOfSegments)
+
+ // this cleanup shouldn't find any expired segments but should delete some to reduce size
+ logManager.cleanupLogs()
+ assertEquals("Now there should be exactly 7 segments", 6 + 1, log.numberOfSegments)
+ assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
+ try {
+ log.read(0, 1024)
+ fail("Should get exception from fetching earlier.")
+ } catch {
+ case e: OffsetOutOfRangeException => "This is good."
+ }
+ // log should still be appendable
+ log.append(TestUtils.singleMessageSet("test".getBytes()))
+ }
+
+ @Test
def testTimeBasedFlush() {
val props = TestUtils.createBrokerConfig(0, -1)
logManager.close