You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/24 22:38:28 UTC
svn commit: r1377093 - in /incubator/kafka/trunk/core/src:
main/scala/kafka/log/ main/scala/kafka/server/ main/scala/kafka/utils/
test/scala/other/kafka/ test/scala/unit/kafka/log/
Author: junrao
Date: Fri Aug 24 20:38:27 2012
New Revision: 1377093
URL: http://svn.apache.org/viewvc?rev=1377093&view=rev
Log:
Time based log segment rollout; patched by Swapnil Ghike; reviewed by Jun Rao, Neha Narkhede; KAFKA-475
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Fri Aug 24 20:38:27 2012
@@ -90,9 +90,23 @@ private[log] object Log {
/**
* A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size
*/
-private[log] class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long) extends Range {
+private[log] class LogSegment(val file: File, val time: Time, val messageSet: FileMessageSet, val start: Long) extends Range {
+ var firstAppendTime: Option[Long] = None
@volatile var deleted = false
def size: Long = messageSet.highWaterMark
+
+ private def updateFirstAppendTime() {
+ if (firstAppendTime.isEmpty)
+ firstAppendTime = Some(time.milliseconds)
+ }
+
+ def append(messages: ByteBufferMessageSet) {
+ if (messages.sizeInBytes > 0) {
+ messageSet.append(messages)
+ updateFirstAppendTime()
+ }
+ }
+
override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
}
@@ -101,9 +115,8 @@ private[log] class LogSegment(val file:
* An append-only log for storing messages.
*/
@threadsafe
-private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int,
- val flushInterval: Int, val needRecovery: Boolean) extends Logging {
-
+private[log] class Log(val dir: File, val time: Time, val maxSize: Long, val maxMessageSize: Int,
+ val flushInterval: Int, val rollIntervalMs: Long, val needRecovery: Boolean) extends Logging {
/* A lock that guards all modifications to the log */
private val lock = new Object
@@ -121,7 +134,7 @@ private[log] class Log(val dir: File, va
private val logStats = new LogStats(this)
- Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
+ Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
/* Load the log segments from the log files on disk */
private def loadSegments(): SegmentList[LogSegment] = {
@@ -135,7 +148,7 @@ private[log] class Log(val dir: File, va
val filename = file.getName()
val start = filename.substring(0, filename.length - Log.FileSuffix.length).toLong
val messageSet = new FileMessageSet(file, false)
- accum.add(new LogSegment(file, messageSet, start))
+ accum.add(new LogSegment(file, time, messageSet, start))
}
}
@@ -143,7 +156,7 @@ private[log] class Log(val dir: File, va
// no existing segments, create a new mutable segment
val newFile = new File(dir, Log.nameFromOffset(0))
val set = new FileMessageSet(newFile, true)
- accum.add(new LogSegment(newFile, set, 0))
+ accum.add(new LogSegment(newFile, time, set, 0))
} else {
// there is at least one existing segment, validate and recover them/it
// sort segments into ascending order for fast searching
@@ -160,7 +173,7 @@ private[log] class Log(val dir: File, va
val last = accum.remove(accum.size - 1)
last.messageSet.close()
info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
- val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
+ val mutable = new LogSegment(last.file, time, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
accum.add(mutable)
}
new SegmentList(accum.toArray(new Array[LogSegment](accum.size)))
@@ -227,10 +240,11 @@ private[log] class Log(val dir: File, va
// they are valid, insert them in the log
lock synchronized {
try {
- val segment = segments.view.last
- segment.messageSet.append(validMessages)
- maybeFlush(numberOfMessages)
+ var segment = segments.view.last
maybeRoll(segment)
+ segment = segments.view.last
+ segment.append(validMessages)
+ maybeFlush(numberOfMessages)
}
catch {
case e: IOException =>
@@ -301,7 +315,8 @@ private[log] class Log(val dir: File, va
* Roll the log over if necessary
*/
private def maybeRoll(segment: LogSegment) {
- if(segment.messageSet.sizeInBytes > maxSize)
+ if((segment.messageSet.sizeInBytes > maxSize) ||
+ ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)))
roll()
}
@@ -317,7 +332,7 @@ private[log] class Log(val dir: File, va
newFile.delete()
}
debug("Rolling log '" + name + "' to " + newFile.getName())
- segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
+ segments.append(new LogSegment(newFile, time, new FileMessageSet(newFile, true), newOffset))
}
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Fri Aug 24 20:38:27 2012
@@ -33,13 +33,14 @@ import kafka.api.OffsetRequest
private[kafka] class LogManager(val config: KafkaConfig,
private val scheduler: KafkaScheduler,
private val time: Time,
+ val logRollDefaultIntervalMs: Long,
val logCleanupIntervalMs: Long,
val logCleanupDefaultAgeMs: Long,
needRecovery: Boolean) extends Logging {
-
+
val logDir: File = new File(config.logDir)
private val numPartitions = config.numPartitions
- private val maxSize: Long = config.logFileSize
+ private val logFileSizeMap = config.logFileSizeMap
private val flushInterval = config.flushInterval
private val topicPartitionsMap = config.topicPartitionsMap
private val logCreationLock = new Object
@@ -49,8 +50,9 @@ private[kafka] class LogManager(val conf
private val startupLatch: CountDownLatch = if (config.enableZookeeper) new CountDownLatch(1) else null
private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
private val logFlushIntervalMap = config.flushIntervalMap
- private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
- private val logRetentionSize = config.logRetentionSize
+ private val logRetentionSizeMap = config.logRetentionSizeMap
+ private val logRetentionMsMap = getMsMap(config.logRetentionHoursMap)
+ private val logRollMsMap = getMsMap(config.logRollHoursMap)
/* Initialize a log for each subdirectory of the main log directory */
private val logs = new Pool[String, Pool[Int, Log]]()
@@ -67,7 +69,10 @@ private[kafka] class LogManager(val conf
warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
} else {
info("Loading log '" + dir.getName() + "'")
- val log = new Log(dir, maxSize, config.maxMessageSize, flushInterval, needRecovery)
+ val topic = Utils.getTopicPartition(dir.getName)._1
+ val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
+ val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
+ val log = new Log(dir, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needRecovery)
val topicPartion = Utils.getTopicPartition(dir.getName)
logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
val parts = logs.get(topicPartion._1)
@@ -108,10 +113,11 @@ private[kafka] class LogManager(val conf
case object StopActor
- private def getLogRetentionMSMap(logRetentionHourMap: Map[String, Int]) : Map[String, Long] = {
+ private def getMsMap(hoursMap: Map[String, Int]) : Map[String, Long] = {
var ret = new mutable.HashMap[String, Long]
- for ( (topic, hour) <- logRetentionHourMap )
+ for ( (topic, hour) <- hoursMap ) {
ret.put(topic, hour * 60 * 60 * 1000L)
+ }
ret
}
@@ -146,7 +152,9 @@ private[kafka] class LogManager(val conf
logCreationLock synchronized {
val d = new File(logDir, topic + "-" + partition)
d.mkdirs()
- new Log(d, maxSize, config.maxMessageSize, flushInterval, false)
+ val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
+ val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
+ new Log(d, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, false)
}
}
@@ -236,7 +244,7 @@ private[kafka] class LogManager(val conf
private def cleanupExpiredSegments(log: Log): Int = {
val startMs = time.milliseconds
val topic = Utils.getTopicPartition(log.dir.getName)._1
- val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
+ val logCleanupThresholdMS = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
val total = deleteSegments(log, toBeDeleted)
total
@@ -247,8 +255,10 @@ private[kafka] class LogManager(val conf
* is at least logRetentionSize bytes in size
*/
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
- if(logRetentionSize < 0 || log.size < logRetentionSize) return 0
- var diff = log.size - logRetentionSize
+ val topic = Utils.getTopicPartition(log.dir.getName)._1
+ val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize)
+ if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0
+ var diff = log.size - maxLogRetentionSize
def shouldDelete(segment: LogSegment) = {
if(diff - segment.size >= 0) {
diff -= segment.size
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala Fri Aug 24 20:38:27 2012
@@ -60,18 +60,27 @@ class KafkaConfig(props: Properties) ext
/* the maximum size of a single log file */
val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
-
- /* the number of messages accumulated on a log partition before messages are flushed to disk */
- val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
-
+
+ /* the maximum size of a single log file for some specific topic */
+ val logFileSizeMap = Utils.getTopicFileSize(Utils.getString(props, "topic.log.file.size", ""))
+
+ /* the maximum time before a new log segment is rolled out */
+ val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, Int.MaxValue))
+
+ /* the number of hours before rolling out a new log segment for some specific topic */
+ val logRollHoursMap = Utils.getTopicRollHours(Utils.getString(props, "topic.log.roll.hours", ""))
+
/* the number of hours to keep a log file before deleting it */
- val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
+ val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24*7, (1, Int.MaxValue))
+
+ /* the number of hours to keep a log file before deleting it for some specific topic*/
+ val logRetentionHoursMap = Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
/* the maximum size of the log before deleting it */
val logRetentionSize = Utils.getLong(props, "log.retention.size", -1)
- /* the number of hours to keep a log file before deleting it for some specific topic*/
- val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
+ /* the maximum size of the log for some specific topic before deleting it */
+ val logRetentionSizeMap = Utils.getTopicRetentionSize(Utils.getString(props, "topic.log.retention.size", ""))
/* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
@@ -79,6 +88,9 @@ class KafkaConfig(props: Properties) ext
/* enable zookeeper registration in the server */
val enableZookeeper = Utils.getBoolean(props, "enable.zookeeper", true)
+ /* the number of messages accumulated on a log partition before messages are flushed to disk */
+ val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
+
/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Fri Aug 24 20:38:27 2012
@@ -58,6 +58,7 @@ class KafkaServer(val config: KafkaConfi
logManager = new LogManager(config,
scheduler,
SystemTime,
+ 1000L * 60 * 60 * config.logRollHours,
1000L * 60 * config.logCleanupIntervalMinutes,
1000L * 60 * 60 * config.logRetentionHours,
needRecovery)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Fri Aug 24 20:38:27 2012
@@ -566,7 +566,7 @@ object Utils extends Logging {
}
/**
- * This method gets comma seperated values which contains key,value pairs and returns a map of
+ * This method gets comma separated values which contains key,value pairs and returns a map of
* key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
*/
private def getCSVMap[K, V](allCSVals: String, exceptionMsg:String, successMsg:String) :Map[K, V] = {
@@ -595,12 +595,30 @@ object Utils extends Logging {
}
}
- def getTopicRentionHours(retentionHours: String) : Map[String, Int] = {
+ def getTopicRetentionHours(retentionHours: String) : Map[String, Int] = {
val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: "
val successMsg = "The retention hour for "
getCSVMap(retentionHours, exceptionMsg, successMsg)
}
+ def getTopicRollHours(rollHours: String) : Map[String, Int] = {
+ val exceptionMsg = "Malformed token for topic.log.roll.hours in server.properties: "
+ val successMsg = "The roll hour for "
+ getCSVMap(rollHours, exceptionMsg, successMsg)
+ }
+
+ def getTopicFileSize(fileSizes: String): Map[String, Int] = {
+ val exceptionMsg = "Malformed token for topic.log.file.size in server.properties: "
+ val successMsg = "The roll hour for "
+ getCSVMap(fileSizes, exceptionMsg, successMsg)
+ }
+
+ def getTopicRetentionSize(retentionSizes: String): Map[String, Long] = {
+ val exceptionMsg = "Malformed token for topic.log.retention.size in server.properties: "
+ val successMsg = "The roll hour for "
+ getCSVMap(retentionSizes, exceptionMsg, successMsg)
+ }
+
def getTopicFlushIntervals(allIntervals: String) : Map[String, Int] = {
val exceptionMsg = "Malformed token for topic.flush.Intervals.ms in server.properties: "
val successMsg = "The flush interval for "
Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala Fri Aug 24 20:38:27 2012
@@ -18,7 +18,7 @@
package kafka.log
import kafka.message._
-import kafka.utils.{TestUtils, Utils}
+import kafka.utils.{TestUtils, Utils, SystemTime}
import kafka.server.KafkaConfig
object TestLogPerformance {
@@ -33,7 +33,7 @@ object TestLogPerformance {
val batchSize = args(2).toInt
val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
val dir = TestUtils.tempDir()
- val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, false)
+ val log = new Log(dir, SystemTime, 50*1024*1024, config.maxMessageSize, 5000000, 24*7*60*60*1000L, false)
val bytes = new Array[Byte](messageSize)
new java.util.Random().nextBytes(bytes)
val message = new Message(bytes)
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Fri Aug 24 20:38:27 2012
@@ -28,6 +28,7 @@ import kafka.common.OffsetOutOfRangeExce
class LogManagerTest extends JUnitSuite {
val time: MockTime = new MockTime()
+ val maxSegAge = 100
val maxLogAge = 1000
var logDir: File = null
var logManager: LogManager = null
@@ -41,7 +42,7 @@ class LogManagerTest extends JUnitSuite
override val enableZookeeper = false
override val flushInterval = 100
}
- logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+ logManager = new LogManager(config, null, time, maxSegAge, -1, maxLogAge, false)
logManager.startup
logDir = logManager.logDir
}
@@ -111,11 +112,11 @@ class LogManagerTest extends JUnitSuite
config = new KafkaConfig(props) {
override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
override val enableZookeeper = false
- override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] // keep exactly 6 segments + 1 roll over
+ override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
override val logRetentionHours = retentionHours
override val flushInterval = 100
}
- logManager = new LogManager(config, null, time, -1, retentionMs, false)
+ logManager = new LogManager(config, null, time, maxSegAge, -1, retentionMs, false)
logManager.startup
// create a log
@@ -132,12 +133,12 @@ class LogManagerTest extends JUnitSuite
log.flush
Thread.sleep(2000)
- // should be exactly 100 full segments + 1 new empty one
- assertEquals("There should be example 101 segments.", 100 + 1, log.numberOfSegments)
+ // should be exactly 100 full segments
+ assertEquals("There should be example 100 segments.", 100, log.numberOfSegments)
// this cleanup shouldn't find any expired segments but should delete some to reduce size
logManager.cleanupLogs()
- assertEquals("Now there should be exactly 7 segments", 6 + 1, log.numberOfSegments)
+ assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
try {
log.read(0, 1024)
@@ -161,7 +162,7 @@ class LogManagerTest extends JUnitSuite
override val flushInterval = Int.MaxValue
override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
}
- logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+ logManager = new LogManager(config, null, time, maxSegAge, -1, maxLogAge, false)
logManager.startup
val log = logManager.getOrCreateLog("timebasedflush", 0)
for(i <- 0 until 200) {
@@ -185,7 +186,7 @@ class LogManagerTest extends JUnitSuite
override val flushInterval = 100
}
- logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+ logManager = new LogManager(config, null, time, maxSegAge, -1, maxLogAge, false)
logManager.startup
for(i <- 0 until 2) {
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1377093&r1=1377092&r2=1377093&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Fri Aug 24 20:38:27 2012
@@ -22,7 +22,7 @@ import java.util.ArrayList
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
-import kafka.utils.{Utils, TestUtils, Range}
+import kafka.utils.{Utils, TestUtils, Range, SystemTime, MockTime}
import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import kafka.server.KafkaConfig
@@ -47,18 +47,70 @@ class LogTest extends JUnitSuite {
for(offset <- offsets)
new File(dir, Integer.toString(offset) + Log.FileSuffix).createNewFile()
}
-
+
+ /** Test that the size and time based log segment rollout works. */
+ @Test
+ def testTimeBasedLogRoll() {
+ val set = TestUtils.singleMessageSet("test".getBytes())
+ val rollMs = 1 * 60 * 60L
+ val time: MockTime = new MockTime()
+
+ // create a log
+ val log = new Log(logDir, time, 1000, config.maxMessageSize, 1000, rollMs, false)
+ time.currentMs += rollMs + 1
+
+ // segment age is less than its limit
+ log.append(set)
+ assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
+
+ log.append(set)
+ assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
+
+ // segment expires in age
+ time.currentMs += rollMs + 1
+ log.append(set)
+ assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
+
+ time.currentMs += rollMs + 1
+ val blank = Array[Message]()
+ log.append(new ByteBufferMessageSet(blank:_*))
+ assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
+
+ time.currentMs += rollMs + 1
+ // the last segment expired in age, but was blank. So new segment should not be generated
+ log.append(set)
+ assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
+ }
+
+ @Test
+ def testSizeBasedLogRoll() {
+ val set = TestUtils.singleMessageSet("test".getBytes())
+ val setSize = set.sizeInBytes
+ val msgPerSeg = 10
+ val segSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be 10 messages
+
+ // create a log
+ val log = new Log(logDir, SystemTime, segSize, config.maxMessageSize, 1000, 10000, false)
+ assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+
+ // segments expire in size
+ for (i<- 1 to (msgPerSeg + 1)) {
+ log.append(set)
+ }
+ assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
+ }
+
@Test
def testLoadEmptyLog() {
createEmptyLogs(logDir, 0)
- new Log(logDir, 1024, config.maxMessageSize, 1000, false)
+ new Log(logDir, SystemTime, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
}
@Test
def testLoadInvalidLogsFails() {
createEmptyLogs(logDir, 0, 15)
try {
- new Log(logDir, 1024, config.maxMessageSize, 1000, false)
+ new Log(logDir, SystemTime, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
fail("Allowed load of corrupt logs without complaint.")
} catch {
case e: IllegalStateException => "This is good"
@@ -67,7 +119,7 @@ class LogTest extends JUnitSuite {
@Test
def testAppendAndRead() {
- val log = new Log(logDir, 1024, config.maxMessageSize, 1000, false)
+ val log = new Log(logDir, SystemTime, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 10)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -84,7 +136,7 @@ class LogTest extends JUnitSuite {
@Test
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
- val log = new Log(logDir, 1024, config.maxMessageSize, 1000, false)
+ val log = new Log(logDir, SystemTime, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
try {
log.read(0, 1024)
@@ -104,7 +156,7 @@ class LogTest extends JUnitSuite {
@Test
def testLogRolls() {
/* create a multipart log with 100 messages */
- val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
+ val log = new Log(logDir, SystemTime, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
val numMessages = 100
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -159,7 +211,7 @@ class LogTest extends JUnitSuite {
def testEdgeLogRolls() {
{
// first test a log segment starting at 0
- val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
+ val log = new Log(logDir, SystemTime, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
val curOffset = log.nextAppendOffset
assertEquals(curOffset, 0)
@@ -172,7 +224,7 @@ class LogTest extends JUnitSuite {
{
// second test an empty log segment starting at none-zero
- val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
+ val log = new Log(logDir, SystemTime, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false)
val numMessages = 1
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -201,7 +253,7 @@ class LogTest extends JUnitSuite {
val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
// append messages to log
- val log = new Log(logDir, 100, 5, 1000, false)
+ val log = new Log(logDir, SystemTime, 100, 5, 1000, 24*7*60*60*1000L, false)
var ret =
try {