You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/24 22:38:28 UTC

svn commit: r1377093 - in /incubator/kafka/trunk/core/src: main/scala/kafka/log/ main/scala/kafka/server/ main/scala/kafka/utils/ test/scala/other/kafka/ test/scala/unit/kafka/log/

Author: junrao
Date: Fri Aug 24 20:38:27 2012
New Revision: 1377093

URL: http://svn.apache.org/viewvc?rev=1377093&view=rev
Log:
Time based log segment rollout; patched by Swapnil Ghike; reviewed by Jun Rao, Neha Narkhede; KAFKA-475

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Fri Aug 24 20:38:27 2012
@@ -90,9 +90,23 @@ private[log] object Log {
 /**
  * A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size 
  */
-private[log] class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long) extends Range {
+private[log] class LogSegment(val file: File, val time: Time, val messageSet: FileMessageSet, val start: Long) extends Range {
+  var firstAppendTime: Option[Long] = None
   @volatile var deleted = false
   def size: Long = messageSet.highWaterMark
+
+  private def updateFirstAppendTime() {
+    if (firstAppendTime.isEmpty)
+      firstAppendTime = Some(time.milliseconds)
+  }
+
+  def append(messages: ByteBufferMessageSet) {
+    if (messages.sizeInBytes > 0) {
+      messageSet.append(messages)
+      updateFirstAppendTime()
+    }
+   }
+
   override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
 }
 
@@ -101,9 +115,8 @@ private[log] class LogSegment(val file: 
  * An append-only log for storing messages. 
  */
 @threadsafe
-private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int,
-                       val flushInterval: Int, val needRecovery: Boolean) extends Logging {
-
+private[log] class Log(val dir: File, val time: Time, val maxSize: Long, val maxMessageSize: Int,
+                       val flushInterval: Int, val rollIntervalMs: Long, val needRecovery: Boolean) extends Logging {
   /* A lock that guards all modifications to the log */
   private val lock = new Object
 
@@ -121,7 +134,7 @@ private[log] class Log(val dir: File, va
 
   private val logStats = new LogStats(this)
 
-  Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)  
+  Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
 
   /* Load the log segments from the log files on disk */
   private def loadSegments(): SegmentList[LogSegment] = {
@@ -135,7 +148,7 @@ private[log] class Log(val dir: File, va
         val filename = file.getName()
         val start = filename.substring(0, filename.length - Log.FileSuffix.length).toLong
         val messageSet = new FileMessageSet(file, false)
-        accum.add(new LogSegment(file, messageSet, start))
+        accum.add(new LogSegment(file, time, messageSet, start))
       }
     }
 
@@ -143,7 +156,7 @@ private[log] class Log(val dir: File, va
       // no existing segments, create a new mutable segment
       val newFile = new File(dir, Log.nameFromOffset(0))
       val set = new FileMessageSet(newFile, true)
-      accum.add(new LogSegment(newFile, set, 0))
+      accum.add(new LogSegment(newFile, time, set, 0))
     } else {
       // there is at least one existing segment, validate and recover them/it
       // sort segments into ascending order for fast searching
@@ -160,7 +173,7 @@ private[log] class Log(val dir: File, va
       val last = accum.remove(accum.size - 1)
       last.messageSet.close()
       info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
-      val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
+      val mutable = new LogSegment(last.file, time, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
       accum.add(mutable)
     }
     new SegmentList(accum.toArray(new Array[LogSegment](accum.size)))
@@ -227,10 +240,11 @@ private[log] class Log(val dir: File, va
     // they are valid, insert them in the log
     lock synchronized {
       try {
-        val segment = segments.view.last
-        segment.messageSet.append(validMessages)
-        maybeFlush(numberOfMessages)
+        var segment = segments.view.last
         maybeRoll(segment)
+        segment = segments.view.last
+        segment.append(validMessages)
+        maybeFlush(numberOfMessages)
       }
       catch {
         case e: IOException =>
@@ -301,7 +315,8 @@ private[log] class Log(val dir: File, va
    * Roll the log over if necessary
    */
   private def maybeRoll(segment: LogSegment) {
-    if(segment.messageSet.sizeInBytes > maxSize)
+    if((segment.messageSet.sizeInBytes > maxSize) ||
+       ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)))
       roll()
   }
 
@@ -317,7 +332,7 @@ private[log] class Log(val dir: File, va
         newFile.delete()
       }
       debug("Rolling log '" + name + "' to " + newFile.getName())
-      segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
+      segments.append(new LogSegment(newFile, time, new FileMessageSet(newFile, true), newOffset))
     }
   }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Fri Aug 24 20:38:27 2012
@@ -33,13 +33,14 @@ import kafka.api.OffsetRequest
 private[kafka] class LogManager(val config: KafkaConfig,
                                 private val scheduler: KafkaScheduler,
                                 private val time: Time,
+                                val logRollDefaultIntervalMs: Long,
                                 val logCleanupIntervalMs: Long,
                                 val logCleanupDefaultAgeMs: Long,
                                 needRecovery: Boolean) extends Logging {
-  
+
   val logDir: File = new File(config.logDir)
   private val numPartitions = config.numPartitions
-  private val maxSize: Long = config.logFileSize
+  private val logFileSizeMap = config.logFileSizeMap
   private val flushInterval = config.flushInterval
   private val topicPartitionsMap = config.topicPartitionsMap
   private val logCreationLock = new Object
@@ -49,8 +50,9 @@ private[kafka] class LogManager(val conf
   private val startupLatch: CountDownLatch = if (config.enableZookeeper) new CountDownLatch(1) else null
   private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
   private val logFlushIntervalMap = config.flushIntervalMap
-  private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
-  private val logRetentionSize = config.logRetentionSize
+  private val logRetentionSizeMap = config.logRetentionSizeMap
+  private val logRetentionMsMap = getMsMap(config.logRetentionHoursMap)
+  private val logRollMsMap = getMsMap(config.logRollHoursMap)
 
   /* Initialize a log for each subdirectory of the main log directory */
   private val logs = new Pool[String, Pool[Int, Log]]()
@@ -67,7 +69,10 @@ private[kafka] class LogManager(val conf
         warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
       } else {
         info("Loading log '" + dir.getName() + "'")
-        val log = new Log(dir, maxSize, config.maxMessageSize, flushInterval, needRecovery)
+        val topic = Utils.getTopicPartition(dir.getName)._1
+        val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
+        val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
+        val log = new Log(dir, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needRecovery)
         val topicPartion = Utils.getTopicPartition(dir.getName)
         logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
         val parts = logs.get(topicPartion._1)
@@ -108,10 +113,11 @@ private[kafka] class LogManager(val conf
 
   case object StopActor
 
-  private def getLogRetentionMSMap(logRetentionHourMap: Map[String, Int]) : Map[String, Long] = {
+  private def getMsMap(hoursMap: Map[String, Int]) : Map[String, Long] = {
     var ret = new mutable.HashMap[String, Long]
-    for ( (topic, hour) <- logRetentionHourMap )
+    for ( (topic, hour) <- hoursMap ) {
       ret.put(topic, hour * 60 * 60 * 1000L)
+    }
     ret
   }
 
@@ -146,7 +152,9 @@ private[kafka] class LogManager(val conf
     logCreationLock synchronized {
       val d = new File(logDir, topic + "-" + partition)
       d.mkdirs()
-      new Log(d, maxSize, config.maxMessageSize, flushInterval, false)
+      val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
+      val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
+      new Log(d, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, false)
     }
   }
   
@@ -236,7 +244,7 @@ private[kafka] class LogManager(val conf
   private def cleanupExpiredSegments(log: Log): Int = {
     val startMs = time.milliseconds
     val topic = Utils.getTopicPartition(log.dir.getName)._1
-    val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
+    val logCleanupThresholdMS = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
     val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
     val total = deleteSegments(log, toBeDeleted)
     total
@@ -247,8 +255,10 @@ private[kafka] class LogManager(val conf
    *  is at least logRetentionSize bytes in size
    */
   private def cleanupSegmentsToMaintainSize(log: Log): Int = {
-    if(logRetentionSize < 0 || log.size < logRetentionSize) return 0
-    var diff = log.size - logRetentionSize
+    val topic = Utils.getTopicPartition(log.dir.getName)._1
+    val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize)
+    if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0
+    var diff = log.size - maxLogRetentionSize
     def shouldDelete(segment: LogSegment) = {
       if(diff - segment.size >= 0) {
         diff -= segment.size

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala Fri Aug 24 20:38:27 2012
@@ -60,18 +60,27 @@ class KafkaConfig(props: Properties) ext
   
   /* the maximum size of a single log file */
   val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
-  
-  /* the number of messages accumulated on a log partition before messages are flushed to disk */
-  val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
-  
+
+  /* the maximum size of a single log file for some specific topic */
+  val logFileSizeMap = Utils.getTopicFileSize(Utils.getString(props, "topic.log.file.size", ""))
+
+  /* the maximum time before a new log segment is rolled out */
+  val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, Int.MaxValue))
+
+  /* the number of hours before rolling out a new log segment for some specific topic */
+  val logRollHoursMap = Utils.getTopicRollHours(Utils.getString(props, "topic.log.roll.hours", ""))
+
   /* the number of hours to keep a log file before deleting it */
-  val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
+  val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24*7, (1, Int.MaxValue))
+
+  /* the number of hours to keep a log file before deleting it for some specific topic*/
+  val logRetentionHoursMap = Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
   
   /* the maximum size of the log before deleting it */
   val logRetentionSize = Utils.getLong(props, "log.retention.size", -1)
 
-  /* the number of hours to keep a log file before deleting it for some specific topic*/
-  val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
+  /* the maximum size of the log for some specific topic before deleting it */
+  val logRetentionSizeMap = Utils.getTopicRetentionSize(Utils.getString(props, "topic.log.retention.size", ""))
 
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
   val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
@@ -79,6 +88,9 @@ class KafkaConfig(props: Properties) ext
   /* enable zookeeper registration in the server */
   val enableZookeeper = Utils.getBoolean(props, "enable.zookeeper", true)
 
+  /* the number of messages accumulated on a log partition before messages are flushed to disk */
+  val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
+
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
   val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Fri Aug 24 20:38:27 2012
@@ -58,6 +58,7 @@ class KafkaServer(val config: KafkaConfi
     logManager = new LogManager(config,
                                 scheduler,
                                 SystemTime,
+                                1000L * 60 * 60 * config.logRollHours,
                                 1000L * 60 * config.logCleanupIntervalMinutes,
                                 1000L * 60 * 60 * config.logRetentionHours,
                                 needRecovery)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Fri Aug 24 20:38:27 2012
@@ -566,7 +566,7 @@ object Utils extends Logging {
   }
 
   /**
-   * This method gets comma seperated values which contains key,value pairs and returns a map of
+   * This method gets comma separated values which contains key,value pairs and returns a map of
    * key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
    */
   private def getCSVMap[K, V](allCSVals: String, exceptionMsg:String, successMsg:String) :Map[K, V] = {
@@ -595,12 +595,30 @@ object Utils extends Logging {
     }
   }
 
-  def getTopicRentionHours(retentionHours: String) : Map[String, Int] = {
+  def getTopicRetentionHours(retentionHours: String) : Map[String, Int] = {
     val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: "
     val successMsg =  "The retention hour for "
     getCSVMap(retentionHours, exceptionMsg, successMsg)
   }
 
+  def getTopicRollHours(rollHours: String) : Map[String, Int] = {
+    val exceptionMsg = "Malformed token for topic.log.roll.hours in server.properties: "
+    val successMsg =  "The roll hour for "
+    getCSVMap(rollHours, exceptionMsg, successMsg)
+  }
+
+  def getTopicFileSize(fileSizes: String): Map[String, Int] = {
+    val exceptionMsg = "Malformed token for topic.log.file.size in server.properties: "
+    val successMsg =  "The roll hour for "
+    getCSVMap(fileSizes, exceptionMsg, successMsg)
+  }
+
+  def getTopicRetentionSize(retentionSizes: String): Map[String, Long] = {
+    val exceptionMsg = "Malformed token for topic.log.retention.size in server.properties: "
+    val successMsg =  "The roll hour for "
+    getCSVMap(retentionSizes, exceptionMsg, successMsg)
+  }
+
   def getTopicFlushIntervals(allIntervals: String) : Map[String, Int] = {
     val exceptionMsg = "Malformed token for topic.flush.Intervals.ms in server.properties: "
     val successMsg =  "The flush interval for "

Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala Fri Aug 24 20:38:27 2012
@@ -18,7 +18,7 @@
 package kafka.log
 
 import kafka.message._
-import kafka.utils.{TestUtils, Utils}
+import kafka.utils.{TestUtils, Utils, SystemTime}
 import kafka.server.KafkaConfig
 
 object TestLogPerformance {
@@ -33,7 +33,7 @@ object TestLogPerformance {
     val batchSize = args(2).toInt
     val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
     val dir = TestUtils.tempDir()
-    val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, false)
+    val log = new Log(dir, SystemTime, 50*1024*1024, config.maxMessageSize, 5000000, 24*7*60*60*1000L, false)
     val bytes = new Array[Byte](messageSize)
     new java.util.Random().nextBytes(bytes)
     val message = new Message(bytes)

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Fri Aug 24 20:38:27 2012
@@ -28,6 +28,7 @@ import kafka.common.OffsetOutOfRangeExce
 class LogManagerTest extends JUnitSuite {
 
   val time: MockTime = new MockTime()
+  val maxSegAge = 100
   val maxLogAge = 1000
   var logDir: File = null
   var logManager: LogManager = null
@@ -41,7 +42,7 @@ class LogManagerTest extends JUnitSuite 
                    override val enableZookeeper = false
                    override val flushInterval = 100
                  }
-    logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+    logManager = new LogManager(config, null, time, maxSegAge, -1, maxLogAge, false)
     logManager.startup
     logDir = logManager.logDir
   }
@@ -111,11 +112,11 @@ class LogManagerTest extends JUnitSuite 
     config = new KafkaConfig(props) {
       override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
       override val enableZookeeper = false
-      override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] // keep exactly 6 segments + 1 roll over
+      override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
       override val logRetentionHours = retentionHours
       override val flushInterval = 100
     }
-    logManager = new LogManager(config, null, time, -1, retentionMs, false)
+    logManager = new LogManager(config, null, time, maxSegAge, -1, retentionMs, false)
     logManager.startup
 
     // create a log
@@ -132,12 +133,12 @@ class LogManagerTest extends JUnitSuite 
     log.flush
     Thread.sleep(2000)
 
-    // should be exactly 100 full segments + 1 new empty one
-    assertEquals("There should be example 101 segments.", 100 + 1, log.numberOfSegments)
+    // should be exactly 100 full segments
+    assertEquals("There should be example 100 segments.", 100, log.numberOfSegments)
 
     // this cleanup shouldn't find any expired segments but should delete some to reduce size
     logManager.cleanupLogs()
-    assertEquals("Now there should be exactly 7 segments", 6 + 1, log.numberOfSegments)
+    assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
     assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -161,7 +162,7 @@ class LogManagerTest extends JUnitSuite 
                    override val flushInterval = Int.MaxValue
                    override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
                  }
-    logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+    logManager = new LogManager(config, null, time, maxSegAge, -1, maxLogAge, false)
     logManager.startup
     val log = logManager.getOrCreateLog("timebasedflush", 0)
     for(i <- 0 until 200) {
@@ -185,7 +186,7 @@ class LogManagerTest extends JUnitSuite 
                    override val flushInterval = 100
                  }
     
-    logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+    logManager = new LogManager(config, null, time, maxSegAge, -1, maxLogAge, false)
     logManager.startup
     
     for(i <- 0 until 2) {

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Fri Aug 24 20:38:27 2012
@@ -22,7 +22,7 @@ import java.util.ArrayList
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
-import kafka.utils.{Utils, TestUtils, Range}
+import kafka.utils.{Utils, TestUtils, Range, SystemTime, MockTime}
 import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.server.KafkaConfig
@@ -47,18 +47,70 @@ class LogTest extends JUnitSuite {
     for(offset <- offsets)
       new File(dir, Integer.toString(offset) + Log.FileSuffix).createNewFile()
   }
-  
+
+  /** Test that the size and time based log segment rollout works. */
+  @Test
+  def testTimeBasedLogRoll() {
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    val rollMs = 1 * 60 * 60L
+    val time: MockTime = new MockTime()
+
+    // create a log
+    val log = new Log(logDir, time, 1000, config.maxMessageSize, 1000, rollMs, false)
+    time.currentMs += rollMs + 1
+
+    // segment age is less than its limit
+    log.append(set)
+    assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
+
+    log.append(set)
+    assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
+
+    // segment expires in age
+    time.currentMs += rollMs + 1
+    log.append(set)
+    assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
+
+    time.currentMs += rollMs + 1
+    val blank = Array[Message]()
+    log.append(new ByteBufferMessageSet(blank:_*))
+    assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
+
+    time.currentMs += rollMs + 1
+    // the last segment expired in age, but was blank. So new segment should not be generated
+    log.append(set)
+    assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
+  }
+
+  @Test
+  def testSizeBasedLogRoll() {
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    val setSize = set.sizeInBytes
+    val msgPerSeg = 10
+    val segSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be 10 messages
+
+    // create a log
+    val log = new Log(logDir, SystemTime, segSize, config.maxMessageSize, 1000, 10000, false)
+    assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+
+    // segments expire in size
+    for (i<- 1 to (msgPerSeg + 1)) {
+      log.append(set)
+    }
+    assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
+  }
+
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    new Log(logDir, 1024, config.maxMessageSize, 1000, false)
+    new Log(logDir, SystemTime, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
   }
   
   @Test
   def testLoadInvalidLogsFails() {
     createEmptyLogs(logDir, 0, 15)
     try {
-      new Log(logDir, 1024, config.maxMessageSize, 1000, false)
+      new Log(logDir, SystemTime, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
       fail("Allowed load of corrupt logs without complaint.")
     } catch {
       case e: IllegalStateException => "This is good"
@@ -67,7 +119,7 @@ class LogTest extends JUnitSuite {
   
   @Test
   def testAppendAndRead() {
-    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, false)
+    val log = new Log(logDir, SystemTime, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 10)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -84,7 +136,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, false)
+    val log = new Log(logDir, SystemTime, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
     assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -104,7 +156,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
+    val log = new Log(logDir, SystemTime, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
     val numMessages = 100
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -159,7 +211,7 @@ class LogTest extends JUnitSuite {
   def testEdgeLogRolls() {
     {
       // first test a log segment starting at 0
-      val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
+      val log = new Log(logDir, SystemTime, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
       val curOffset = log.nextAppendOffset
       assertEquals(curOffset, 0)
 
@@ -172,7 +224,7 @@ class LogTest extends JUnitSuite {
 
     {
       // second test an empty log segment starting at none-zero
-      val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
+      val log = new Log(logDir, SystemTime, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
       val numMessages = 1
       for(i <- 0 until numMessages)
         log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -201,7 +253,7 @@ class LogTest extends JUnitSuite {
     val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
 
     // append messages to log
-    val log = new Log(logDir, 100, 5, 1000, false)
+    val log = new Log(logDir, SystemTime, 100, 5, 1000, 24*7*60*60*1000L, false)
 
     var ret =
     try {