You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2013/01/29 04:36:20 UTC
[2/3] KAFKA-631 Implement a log cleaner for Kafka. Reviewed by Neha.
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 51ea727..f4ba59c 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -103,9 +103,37 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the maximum size of the log for some specific topic before deleting it */
val logRetentionBytesPerTopicMap = props.getMap("log.retention.bytes.per.topic", _.toLong > 0).mapValues(_.toLong)
-
+
/* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
- val logCleanupIntervalMins = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
+ val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue))
+
+ /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "dedupe" */
+ val logCleanupPolicy = props.getString("log.cleanup.policy", "delete")
+
+ /* a per-topic override for the cleanup policy for segments beyond the retention window */
+ val logCleanupPolicyMap = props.getMap("topic.log.cleanup.policy")
+
+ /* the number of background threads to use for log cleaning */
+ val logCleanerThreads = props.getIntInRange("log.cleaner.threads", 1, (0, Int.MaxValue))
+
+ /* the log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average */
+ val logCleanerIoMaxBytesPerSecond = props.getDouble("log.cleaner.io.max.bytes.per.second", Double.MaxValue)
+
+ /* the total memory used for log deduplication across all cleaner threads */
+ val logCleanerDedupeBufferSize = props.getIntInRange("log.cleaner.dedupe.buffer.size", 500*1024*1024, (0, Int.MaxValue))
+ require(logCleanerDedupeBufferSize / logCleanerThreads > 1024*1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
+
+ /* the total memory used for log cleaner I/O buffers across all cleaner threads */
+ val logCleanerIoBufferSize = props.getIntInRange("log.cleaner.io.buffer.size", 4*1024*1024, (0, Int.MaxValue))
+
+ /* the amount of time to sleep when there are no logs to clean */
+ val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 30*1000, (0L, Long.MaxValue))
+
+ /* the minimum ratio of dirty log to total log for a log to eligible for cleaning */
+ val logCleanerMinCleanRatio = props.getDouble("log.cleaner.min.cleanable.ratio", 0.5)
+
+ /* should we enable log cleaning? */
+ val logCleanerEnable = props.getBoolean("log.cleaner.enable", false)
/* the maximum size in bytes of the offset index */
val logIndexSizeMaxBytes = props.getIntInRange("log.index.size.max.bytes", 10*1024*1024, (4, Int.MaxValue))
@@ -116,6 +144,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the number of messages accumulated on a log partition before messages are flushed to disk */
val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 500, (1, Int.MaxValue))
+ /* the amount of time to wait before deleting a file from the filesystem */
val logDeleteDelayMs = props.getLongInRange("log.segment.delete.delay.ms", 60000, (0, Long.MaxValue))
/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9258b13..da6f716 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -18,9 +18,12 @@
package kafka.server
import kafka.network.SocketServer
+import kafka.log.LogConfig
+import kafka.log.CleanerConfig
import kafka.log.LogManager
import kafka.utils._
import java.util.concurrent._
+import java.io.File
import atomic.AtomicBoolean
import org.I0Itec.zkclient.ZkClient
import kafka.controller.{ControllerStats, KafkaController}
@@ -56,9 +59,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
kafkaScheduler.startup()
/* start log manager */
- logManager = new LogManager(config,
- kafkaScheduler,
- time)
+ logManager = createLogManager(config)
logManager.startup()
socketServer = new SocketServer(config.brokerId,
@@ -138,6 +139,50 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
def awaitShutdown(): Unit = shutdownLatch.await()
def getLogManager(): LogManager = logManager
+
+ private def createLogManager(config: KafkaConfig): LogManager = {
+ val topics = config.logCleanupPolicyMap.keys ++
+ config.logSegmentBytesPerTopicMap.keys ++
+ config.logFlushIntervalMsPerTopicMap.keys ++
+ config.logRollHoursPerTopicMap.keys ++
+ config.logRetentionBytesPerTopicMap.keys ++
+ config.logRetentionHoursPerTopicMap.keys
+ val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
+ segmentMs = 60 * 60 * 1000 * config.logRollHours,
+ flushInterval = config.logFlushIntervalMessages,
+ flushMs = config.logFlushIntervalMs.toLong,
+ retentionSize = config.logRetentionBytes,
+ retentionMs = 60 * 60 * 1000 * config.logRetentionHours,
+ maxMessageSize = config.messageMaxBytes,
+ maxIndexSize = config.logIndexSizeMaxBytes,
+ indexInterval = config.logIndexIntervalBytes,
+ fileDeleteDelayMs = config.logDeleteDelayMs,
+ minCleanableRatio = config.logCleanerMinCleanRatio,
+ dedupe = config.logCleanupPolicy.trim.toLowerCase == "dedupe")
+ val logConfigs = for(topic <- topics) yield
+ topic -> defaultLogConfig.copy(segmentSize = config.logSegmentBytesPerTopicMap.getOrElse(topic, config.logSegmentBytes),
+ segmentMs = 60 * 60 * 1000 * config.logRollHoursPerTopicMap.getOrElse(topic, config.logRollHours),
+ flushMs = config.logFlushIntervalMsPerTopicMap.getOrElse(topic, config.logFlushIntervalMs).toLong,
+ retentionSize = config.logRetentionBytesPerTopicMap.getOrElse(topic, config.logRetentionBytes),
+ retentionMs = 60 * 60 * 1000 * config.logRetentionHoursPerTopicMap.getOrElse(topic, config.logRetentionHours),
+ dedupe = config.logCleanupPolicyMap.getOrElse(topic, config.logCleanupPolicy).trim.toLowerCase == "dedupe")
+ val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
+ dedupeBufferSize = config.logCleanerDedupeBufferSize,
+ ioBufferSize = config.logCleanerIoBufferSize,
+ maxMessageSize = config.messageMaxBytes,
+ maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
+ backOffMs = config.logCleanerBackoffMs,
+ enableCleaner = config.logCleanerEnable)
+ new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
+ topicConfigs = logConfigs.toMap,
+ defaultConfig = defaultLogConfig,
+ cleanerConfig = cleanerConfig,
+ flushCheckMs = config.logFlushSchedulerIntervalMs,
+ retentionCheckMs = config.logCleanupIntervalMs,
+ scheduler = kafkaScheduler,
+ time = time)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
new file mode 100644
index 0000000..79f29df
--- /dev/null
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import scala.collection._
+import kafka.utils.Logging
+import kafka.common._
+import java.util.concurrent.locks.ReentrantLock
+import java.io._
+
+/**
+ * This class saves out a map of topic/partition=>offsets to a file
+ */
+class OffsetCheckpoint(val file: File) extends Logging {
+ private val lock = new Object()
+ new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness
+ file.createNewFile() // in case the file doesn't exist
+
+ def write(offsets: Map[TopicAndPartition, Long]) {
+ lock synchronized {
+ // write to temp file and then swap with the existing file
+ val temp = new File(file.getAbsolutePath + ".tmp")
+
+ val writer = new BufferedWriter(new FileWriter(temp))
+ try {
+ // write the current version
+ writer.write(0.toString)
+ writer.newLine()
+
+ // write the number of entries
+ writer.write(offsets.size.toString)
+ writer.newLine()
+
+ // write the entries
+ offsets.foreach { case (topicPart, offset) =>
+ writer.write("%s %d %d".format(topicPart.topic, topicPart.partition, offset))
+ writer.newLine()
+ }
+
+ // flush and overwrite old file
+ writer.flush()
+ if(!temp.renameTo(file))
+ throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath))
+ } finally {
+ writer.close()
+ }
+ }
+ }
+
+ def read(): Map[TopicAndPartition, Long] = {
+ lock synchronized {
+ val reader = new BufferedReader(new FileReader(file))
+ try {
+ var line = reader.readLine()
+ if(line == null)
+ return Map.empty
+ val version = line.toInt
+ version match {
+ case 0 =>
+ line = reader.readLine()
+ if(line == null)
+ return Map.empty
+ val expectedSize = line.toInt
+ var offsets = Map[TopicAndPartition, Long]()
+ line = reader.readLine()
+ while(line != null) {
+ val pieces = line.split("\\s+")
+ if(pieces.length != 3)
+ throw new IOException("Malformed line in offset checkpoint file: '%s'.".format(line))
+
+ val topic = pieces(0)
+ val partition = pieces(1).toInt
+ val offset = pieces(2).toLong
+ offsets += (TopicAndPartition(pieces(0), partition) -> offset)
+ line = reader.readLine()
+ }
+ if(offsets.size != expectedSize)
+ throw new IOException("Expected %d entries but found only %d".format(expectedSize, offsets.size))
+ offsets
+ case _ =>
+ throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
+ }
+ } finally {
+ reader.close()
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 7810c21..710c08b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -20,6 +20,7 @@ import kafka.cluster.{Broker, Partition, Replica}
import collection._
import mutable.HashMap
import org.I0Itec.zkclient.ZkClient
+import java.io.{File, IOException}
import java.util.concurrent.atomic.AtomicBoolean
import kafka.utils._
import kafka.log.LogManager
@@ -33,6 +34,7 @@ import kafka.controller.KafkaController
object ReplicaManager {
val UnknownLogEndOffset = -1L
+ val HighWatermarkFilename = "replication-offset-checkpoint"
}
class ReplicaManager(val config: KafkaConfig,
@@ -48,7 +50,7 @@ class ReplicaManager(val config: KafkaConfig,
val replicaFetcherManager = new ReplicaFetcherManager(config, this)
this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
- val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
+ val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
newGauge(
"LeaderCount",
@@ -67,7 +69,7 @@ class ReplicaManager(val config: KafkaConfig,
}
)
val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
- val isrShrinkRate = newMeter("ISRShrinksPerSec", "shrinks", TimeUnit.SECONDS)
+ val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
def startHighWaterMarksCheckPointThread() = {
@@ -265,7 +267,13 @@ class ReplicaManager(val config: KafkaConfig,
val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
for((dir, reps) <- replicasByDir) {
val hwms = reps.map(r => (TopicAndPartition(r.topic, r.partitionId) -> r.highWatermark)).toMap
- highWatermarkCheckpoints(dir).write(hwms)
+ try {
+ highWatermarkCheckpoints(dir).write(hwms)
+ } catch {
+ case e: IOException =>
+ fatal("Error writing to highwatermark file: ", e)
+ Runtime.getRuntime().halt(1)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/utils/FileLock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/FileLock.scala b/core/src/main/scala/kafka/utils/FileLock.scala
index ad7a597..a5761b9 100644
--- a/core/src/main/scala/kafka/utils/FileLock.scala
+++ b/core/src/main/scala/kafka/utils/FileLock.scala
@@ -9,56 +9,56 @@ import java.nio.channels._
* The given path will be created and opened if it doesn't exist.
*/
class FileLock(val file: File) extends Logging {
- file.createNewFile()
- private val channel = new RandomAccessFile(file, "rw").getChannel()
- private var flock: java.nio.channels.FileLock = null
-
- /**
- * Lock the file or throw an exception if the lock is already held
- */
- def lock() {
- this synchronized {
- trace("Acquiring lock on " + file.getAbsolutePath)
- flock = channel.lock()
- }
+ file.createNewFile() // create the file if it doesn't exist
+ private val channel = new RandomAccessFile(file, "rw").getChannel()
+ private var flock: java.nio.channels.FileLock = null
+
+ /**
+ * Lock the file or throw an exception if the lock is already held
+ */
+ def lock() {
+ this synchronized {
+ trace("Acquiring lock on " + file.getAbsolutePath)
+ flock = channel.lock()
}
-
- /**
- * Try to lock the file and return true if the locking succeeds
- */
- def tryLock(): Boolean = {
- this synchronized {
- trace("Acquiring lock on " + file.getAbsolutePath)
- try {
- // weirdly this method will return null if the lock is held by another
- // process, but will throw an exception if the lock is held by this process
- // so we have to handle both cases
- flock = channel.tryLock()
- flock != null
- } catch {
- case e: OverlappingFileLockException => false
- }
+ }
+
+ /**
+ * Try to lock the file and return true if the locking succeeds
+ */
+ def tryLock(): Boolean = {
+ this synchronized {
+ trace("Acquiring lock on " + file.getAbsolutePath)
+ try {
+ // weirdly this method will return null if the lock is held by another
+ // process, but will throw an exception if the lock is held by this process
+ // so we have to handle both cases
+ flock = channel.tryLock()
+ flock != null
+ } catch {
+ case e: OverlappingFileLockException => false
}
}
-
- /**
- * Unlock the lock if it is held
- */
- def unlock() {
- this synchronized {
- trace("Releasing lock on " + file.getAbsolutePath)
- if(flock != null)
- flock.release()
- }
+ }
+
+ /**
+ * Unlock the lock if it is held
+ */
+ def unlock() {
+ this synchronized {
+ trace("Releasing lock on " + file.getAbsolutePath)
+ if(flock != null)
+ flock.release()
}
-
- /**
- * Destroy this lock, closing the associated FileChannel
- */
- def destroy() = {
- this synchronized {
- unlock()
- channel.close()
- }
+ }
+
+ /**
+ * Destroy this lock, closing the associated FileChannel
+ */
+ def destroy() = {
+ this synchronized {
+ unlock()
+ channel.close()
}
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala
index d9f010b..2890e7f 100644
--- a/core/src/main/scala/kafka/utils/Logging.scala
+++ b/core/src/main/scala/kafka/utils/Logging.scala
@@ -23,12 +23,13 @@ trait Logging {
val loggerName = this.getClass.getName
lazy val logger = Logger.getLogger(loggerName)
- protected var logIdent = ""
+ protected var logIdent: String = null
// Force initialization to register Log4jControllerMBean
private val log4jController = Log4jController
- private def msgWithLogIdent(msg: String) = logIdent + msg
+ private def msgWithLogIdent(msg: String) =
+ if(logIdent == null) msg else logIdent + msg
def trace(msg: => String): Unit = {
if (logger.isTraceEnabled())
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/utils/Throttler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala
index 9e53b03..c6c3c75 100644
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ b/core/src/main/scala/kafka/utils/Throttler.scala
@@ -32,20 +32,14 @@ import scala.math._
*/
@threadsafe
class Throttler(val desiredRatePerSec: Double,
- val checkIntervalMs: Long,
- val throttleDown: Boolean,
- val time: Time) extends Logging {
+ val checkIntervalMs: Long = 100L,
+ val throttleDown: Boolean = true,
+ val time: Time = SystemTime) extends Logging {
private val lock = new Object
private var periodStartNs: Long = time.nanoseconds
private var observedSoFar: Double = 0.0
- def this(desiredRatePerSec: Double, throttleDown: Boolean) =
- this(desiredRatePerSec, Throttler.DefaultCheckIntervalMs, throttleDown, SystemTime)
-
- def this(desiredRatePerSec: Double) =
- this(desiredRatePerSec, Throttler.DefaultCheckIntervalMs, true, SystemTime)
-
def maybeThrottle(observed: Double) {
lock synchronized {
observedSoFar += observed
@@ -58,11 +52,11 @@ class Throttler(val desiredRatePerSec: Double,
val needAdjustment = !(throttleDown ^ (rateInSecs > desiredRatePerSec))
if(needAdjustment) {
// solve for the amount of time to sleep to make us hit the desired rate
- val desiredRateMs = desiredRatePerSec / Time.MsPerSec.asInstanceOf[Double]
+ val desiredRateMs = desiredRatePerSec / Time.MsPerSec.toDouble
val elapsedMs = elapsedNs / Time.NsPerMs
val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs)
if(sleepTime > 0) {
- println("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, desiredRatePerSec, sleepTime))
+ trace("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, desiredRatePerSec, sleepTime))
time.sleep(sleepTime)
}
}
@@ -76,20 +70,20 @@ class Throttler(val desiredRatePerSec: Double,
object Throttler {
- val DefaultCheckIntervalMs = 100L
-
def main(args: Array[String]) {
val rand = new Random()
- val throttler = new Throttler(1000000, 100, true, SystemTime)
+ val throttler = new Throttler(100000, 100, true, SystemTime)
+ val interval = 30000
var start = System.currentTimeMillis
var total = 0
while(true) {
val value = rand.nextInt(1000)
+ Thread.sleep(1)
throttler.maybeThrottle(value)
total += value
val now = System.currentTimeMillis
- if(now - start >= 1000) {
- println(total)
+ if(now - start >= interval) {
+ println(total / (interval/1000.0))
start = now
total = 0
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 2a01f69..1c88226 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -27,6 +27,7 @@ import scala.collection._
import scala.collection.mutable
import java.util.Properties
import kafka.common.KafkaException
+import kafka.common.KafkaStorageException
/**
@@ -159,7 +160,7 @@ object Utils extends Logging {
* @param log The log method to use for logging. E.g. logger.warn
* @param action The action to execute
*/
- def swallow(log: (Object, Throwable) => Unit, action: => Unit) = {
+ def swallow(log: (Object, Throwable) => Unit, action: => Unit) {
try {
action
} catch {
@@ -528,4 +529,37 @@ object Utils extends Logging {
*/
def abs(n: Int) = n & 0x7fffffff
+ /**
+ * Replace the given string suffix with the new suffix. If the string doesn't end with the given suffix throw an exception.
+ */
+ def replaceSuffix(s: String, oldSuffix: String, newSuffix: String): String = {
+ if(!s.endsWith(oldSuffix))
+ throw new IllegalArgumentException("Expected string to end with '%s' but string is '%s'".format(oldSuffix, s))
+ s.substring(0, s.length - oldSuffix.length) + newSuffix
+ }
+
+ /**
+ * Create a file with the given path
+ * @param path The path to create
+ * @throw KafkaStorageException If the file create fails
+ * @return The created file
+ */
+ def createFile(path: String): File = {
+ val f = new File(path)
+ val created = f.createNewFile()
+ if(!created)
+ throw new KafkaStorageException("Failed to create file %s.".format(path))
+ f
+ }
+
+ /**
+ * Read a big-endian integer from a byte array
+ */
+ def readInt(bytes: Array[Byte], offset: Int): Int = {
+ ((bytes(offset) & 0xFF) << 24) |
+ ((bytes(offset + 1) & 0xFF) << 16) |
+ ((bytes(offset + 2) & 0xFF) << 8) |
+ (bytes(offset + 3) & 0xFF)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/utils/VerifiableProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index d694ba9..a2ac55c 100644
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -38,10 +38,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
/**
* Read a required integer property value or throw an exception if no such property is found
*/
- def getInt(name: String): Int = {
- require(containsKey(name), "Missing required property '" + name + "'")
- return getInt(name, -1)
- }
+ def getInt(name: String): Int = getString(name).toInt
def getIntInRange(name: String, range: (Int, Int)): Int = {
require(containsKey(name), "Missing required property '" + name + "'")
@@ -92,10 +89,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
/**
* Read a required long property value or throw an exception if no such property is found
*/
- def getLong(name: String): Long = {
- require(containsKey(name), "Missing required property '" + name + "'")
- return getLong(name, -1)
- }
+ def getLong(name: String): Long = getString(name).toLong
/**
* Read an long from the properties instance
@@ -124,6 +118,26 @@ class VerifiableProperties(val props: Properties) extends Logging {
require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
v
}
+
+ /**
+ * Get a required argument as a double
+ * @param name The property name
+ * @return the value
+ * @throw IllegalArgumentException If the given property is not present
+ */
+ def getDouble(name: String): Double = getString(name).toDouble
+
+ /**
+ * Get an optional argument as a double
+ * @param name The property name
+ * @default The default value for the property if not present
+ */
+ def getDouble(name: String, default: Double): Double = {
+ if(containsKey(name))
+ getDouble(name)
+ else
+ default
+ }
/**
* Read a boolean value from the properties instance
@@ -140,6 +154,8 @@ class VerifiableProperties(val props: Properties) extends Logging {
v.toBoolean
}
}
+
+ def getBoolean(name: String) = getString(name).toBoolean
/**
* Get a string property, or, if no such property is defined, return the given default value
@@ -162,7 +178,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
/**
* Get a Map[String, String] from a property list in the form k1:v2, k2:v2, ...
*/
- def getMap(name: String, valid: String => Boolean): Map[String, String] = {
+ def getMap(name: String, valid: String => Boolean = s => true): Map[String, String] = {
try {
val m = Utils.parseCsvMap(getString(name, ""))
m.foreach {
@@ -189,4 +205,5 @@ class VerifiableProperties(val props: Properties) extends Logging {
}
override def toString(): String = props.toString
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 55429af..c6e7a57 100644
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -34,15 +34,11 @@ object StressTestLog {
val dir = TestUtils.tempDir()
val time = new MockTime
val log = new Log(dir = dir,
- scheduler = time.scheduler,
- maxSegmentSize = 64*1024*1024,
- maxMessageSize = Int.MaxValue,
- flushInterval = Int.MaxValue,
- rollIntervalMs = Long.MaxValue,
+ config = LogConfig(segmentSize = 64*1024*1024,
+ maxMessageSize = Int.MaxValue,
+ maxIndexSize = 1024*1024),
needsRecovery = false,
- maxIndexSize = 1024*1024,
- indexIntervalBytes = 4096,
- segmentDeleteDelayMs = 60000,
+ scheduler = time.scheduler,
time = time)
val writer = new WriterThread(log)
writer.start()
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/other/kafka/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLogCleaning.scala b/core/src/test/scala/other/kafka/TestLogCleaning.scala
new file mode 100644
index 0000000..d9c721b
--- /dev/null
+++ b/core/src/test/scala/other/kafka/TestLogCleaning.scala
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import joptsimple.OptionParser
+import java.util.Properties
+import java.util.Random
+import java.io._
+import scala.io.Source
+import scala.io.BufferedSource
+import kafka.producer._
+import kafka.consumer._
+import kafka.serializer._
+import kafka.utils._
+
+/**
+ * This is a torture test that runs against an existing broker. Here is how it works:
+ *
+ * It produces a series of specially formatted messages to one or more partitions. Each message it produces
+ * it logs out to a text file. The messages have a limited set of keys, so there is duplication in the key space.
+ *
+ * The broker will clean its log as the test runs.
+ *
+ * When the specified number of messages have been produced we create a consumer and consume all the messages in the topic
+ * and write that out to another text file.
+ *
+ * Using a stable unix sort we sort both the producer log of what was sent and the consumer log of what was retrieved by the message key.
+ * Then we compare the final message in both logs for each key. If this final message is not the same for all keys we
+ * print an error and exit with exit code 1, otherwise we print the size reduction and exit with exit code 0.
+ */
+object TestLogCleaning {
+
+ def main(args: Array[String]) {
+ val parser = new OptionParser
+ val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.")
+ .withRequiredArg
+ .describedAs("count")
+ .ofType(classOf[java.lang.Long])
+ .defaultsTo(Long.MaxValue)
+ val numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.")
+ .withRequiredArg
+ .describedAs("count")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(5)
+ val brokerOpt = parser.accepts("broker", "Url to connect to.")
+ .withRequiredArg
+ .describedAs("url")
+ .ofType(classOf[String])
+ val topicsOpt = parser.accepts("topics", "The number of topics to test.")
+ .withRequiredArg
+ .describedAs("count")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1)
+ val zkConnectOpt = parser.accepts("zk", "Zk url.")
+ .withRequiredArg
+ .describedAs("url")
+ .ofType(classOf[String])
+ val sleepSecsOpt = parser.accepts("sleep", "Time to sleep between production and consumption.")
+ .withRequiredArg
+ .describedAs("ms")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(0)
+ val cleanupOpt = parser.accepts("cleanup", "Delete temp files when done.")
+
+ val options = parser.parse(args:_*)
+
+ if(!options.has(brokerOpt) || !options.has(zkConnectOpt) || !options.has(numMessagesOpt)) {
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+
+ // parse options
+ val messages = options.valueOf(numMessagesOpt).longValue
+ val dups = options.valueOf(numDupsOpt).intValue
+ val brokerUrl = options.valueOf(brokerOpt)
+ val topicCount = options.valueOf(topicsOpt).intValue
+ val zkUrl = options.valueOf(zkConnectOpt)
+ val sleepSecs = options.valueOf(sleepSecsOpt).intValue
+ val cleanup = options.has(cleanupOpt)
+
+ val testId = new Random().nextInt(Int.MaxValue)
+ val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray
+
+ println("Producing %d messages...".format(messages))
+ val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, cleanup)
+ println("Sleeping for %d seconds...".format(sleepSecs))
+ Thread.sleep(sleepSecs * 1000)
+ println("Consuming messages...")
+ val consumedDataFile = consumeMessages(zkUrl, topics, cleanup)
+
+ val producedLines = lineCount(producedDataFile)
+ val consumedLines = lineCount(consumedDataFile)
+ val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble
+ println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction))
+
+ println("Validating output files...")
+ validateOutput(externalSort(producedDataFile), externalSort(consumedDataFile))
+ println("All done.")
+ }
+
+ def lineCount(file: File): Int = io.Source.fromFile(file).getLines.size
+
+ def validateOutput(produced: BufferedReader, consumed: BufferedReader) {
+ while(true) {
+ val prod = readFinalValue(produced)
+ val cons = readFinalValue(consumed)
+ if(prod == null && cons == null) {
+ return
+ } else if(prod != cons) {
+ System.err.println("Validation failed prod = %s, cons = %s!".format(prod, cons))
+ System.exit(1)
+ }
+ }
+ }
+
+ def readFinalValue(reader: BufferedReader): (String, Int, Int) = {
+ def readTuple() = {
+ val line = reader.readLine
+ if(line == null)
+ null
+ else
+ line.split("\t")
+ }
+ var prev = readTuple()
+ if(prev == null)
+ return null
+ while(true) {
+ reader.mark(1024)
+ val curr = readTuple()
+ if(curr == null || curr(0) != prev(0) || curr(1) != prev(1)) {
+ reader.reset()
+ return (prev(0), prev(1).toInt, prev(2).toInt)
+ } else {
+ prev = curr
+ }
+ }
+ return null
+ }
+
+ def externalSort(file: File): BufferedReader = {
+ val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", file.getAbsolutePath)
+ val process = builder.start()
+ new BufferedReader(new InputStreamReader(process.getInputStream()))
+ }
+
+ def produceMessages(brokerUrl: String,
+ topics: Array[String],
+ messages: Long,
+ dups: Int,
+ cleanup: Boolean): File = {
+ val producerProps = new Properties
+ producerProps.setProperty("producer.type", "async")
+ producerProps.setProperty("broker.list", brokerUrl)
+ producerProps.setProperty("serializer.class", classOf[StringEncoder].getName)
+ producerProps.setProperty("key.serializer.class", classOf[StringEncoder].getName)
+ producerProps.setProperty("queue.enqueue.timeout.ms", "-1")
+ producerProps.setProperty("batch.size", 1000.toString)
+ val producer = new Producer[String, String](new ProducerConfig(producerProps))
+ val rand = new Random(1)
+ val keyCount = (messages / dups).toInt
+ val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt")
+ if(cleanup)
+ producedFile.deleteOnExit()
+ val producedWriter = new BufferedWriter(new FileWriter(producedFile), 1024*1024)
+ for(i <- 0L until (messages * topics.length)) {
+ val topic = topics((i % topics.length).toInt)
+ val key = rand.nextInt(keyCount)
+ producer.send(KeyedMessage(topic = topic, key = key.toString, message = i.toString))
+ producedWriter.write("%s\t%s\t%s\n".format(topic, key, i))
+ }
+ producedWriter.close()
+ producer.close()
+ producedFile
+ }
+
+ def consumeMessages(zkUrl: String, topics: Array[String], cleanup: Boolean): File = {
+ val consumerProps = new Properties
+ consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
+ consumerProps.setProperty("zk.connect", zkUrl)
+ consumerProps.setProperty("consumer.timeout.ms", (5*1000).toString)
+ val connector = new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps))
+ val streams = connector.createMessageStreams(topics.map(topic => (topic, 1)).toMap, new StringDecoder, new StringDecoder)
+ val consumedFile = File.createTempFile("kafka-log-cleaner-consumed-", ".txt")
+ if(cleanup)
+ consumedFile.deleteOnExit()
+ val consumedWriter = new BufferedWriter(new FileWriter(consumedFile))
+ for(topic <- topics) {
+ val stream = streams(topic).head
+ try {
+ for(item <- stream)
+ consumedWriter.write("%s\t%s\t%s\n".format(topic, item.key, item.message))
+ } catch {
+ case e: ConsumerTimeoutException =>
+ }
+ }
+ consumedWriter.close()
+ connector.shutdown()
+ consumedFile
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/other/kafka/TestLogPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLogPerformance.scala b/core/src/test/scala/other/kafka/TestLogPerformance.scala
index a7b661a..d91011e 100644
--- a/core/src/test/scala/other/kafka/TestLogPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestLogPerformance.scala
@@ -34,7 +34,8 @@ object TestLogPerformance {
val config = new KafkaConfig(props)
val dir = TestUtils.tempDir()
val scheduler = new KafkaScheduler(1)
- val log = new Log(dir, scheduler, 50*1024*1024, config.messageMaxBytes, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, segmentDeleteDelayMs = 0, time = SystemTime)
+ val logConfig = LogConfig()
+ val log = new Log(dir, logConfig, needsRecovery = false, scheduler = scheduler, time = SystemTime)
val bytes = new Array[Byte](messageSize)
new java.util.Random().nextBytes(bytes)
val message = new Message(bytes)
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
new file mode 100644
index 0000000..cce2319
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import junit.framework.Assert._
+import org.scalatest.junit.JUnitSuite
+import org.junit.{After, Before, Test}
+import java.nio._
+import java.io.File
+import scala.collection._
+import kafka.common._
+import kafka.utils._
+import kafka.message._
+
+/**
+ * Unit tests for the log cleaning logic
+ */
+class CleanerTest extends JUnitSuite {
+
+ val dir = TestUtils.tempDir()
+ val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, dedupe=true)
+ val time = new MockTime()
+ val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
+
+ @After
+ def teardown() {
+ Utils.rm(dir)
+ }
+
+ /**
+ * Test simple log cleaning
+ */
+ @Test
+ def testCleanSegments() {
+ val cleaner = makeCleaner(Int.MaxValue)
+ val log = makeLog(config = logConfig.copy(segmentSize = 1024))
+
+ // append messages to the log until we have four segments
+ while(log.numberOfSegments < 4)
+ log.append(messages(log.logEndOffset.toInt, log.logEndOffset.toInt))
+ val keysFound = keysInLog(log)
+ assertEquals((0L until log.logEndOffset), keysFound)
+
+ // pretend we have the following keys
+ val keys = immutable.ListSet(1, 3, 5, 7, 9)
+ val map = new FakeOffsetMap(Int.MaxValue)
+ keys.foreach(k => map.put(key(k), Long.MaxValue))
+
+ // clean the log
+ cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0)
+ val shouldRemain = keysInLog(log).filter(!keys.contains(_))
+ assertEquals(shouldRemain, keysInLog(log))
+ }
+
+ /* extract all the keys from a log */
+ def keysInLog(log: Log): Iterable[Int] =
+ log.logSegments.flatMap(s => s.log.map(m => Utils.readString(m.message.key).toInt))
+
+
+ /**
+ * Test that a truncation during cleaning throws an OptimisticLockFailureException
+ */
+ @Test
+ def testCleanSegmentsWithTruncation() {
+ val cleaner = makeCleaner(Int.MaxValue)
+ val log = makeLog(config = logConfig.copy(segmentSize = 1024))
+
+ // append messages to the log until we have four segments
+ while(log.numberOfSegments < 2)
+ log.append(messages(log.logEndOffset.toInt, log.logEndOffset.toInt))
+
+ log.truncateTo(log.logEndOffset-2)
+ val keys = keysInLog(log)
+ val map = new FakeOffsetMap(Int.MaxValue)
+ keys.foreach(k => map.put(key(k), Long.MaxValue))
+ intercept[OptimisticLockFailureException] {
+ cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0)
+ }
+ }
+
+ /**
+ * Validate the logic for grouping log segments together for cleaning
+ */
+ @Test
+ def testSegmentGrouping() {
+ val cleaner = makeCleaner(Int.MaxValue)
+ val log = makeLog(config = logConfig.copy(segmentSize = 300, indexInterval = 1))
+
+ // append some messages to the log
+ var i = 0
+ while(log.numberOfSegments < 10) {
+ log.append(TestUtils.singleMessageSet("hello".getBytes))
+ i += 1
+ }
+
+ // grouping by very large values should result in a single group with all the segments in it
+ var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
+ assertEquals(1, groups.size)
+ assertEquals(log.numberOfSegments, groups(0).size)
+ checkSegmentOrder(groups)
+
+ // grouping by very small values should result in all groups having one entry
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue)
+ assertEquals(log.numberOfSegments, groups.size)
+ assertTrue("All groups should be singletons.", groups.forall(_.size == 1))
+ checkSegmentOrder(groups)
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = 1)
+ assertEquals(log.numberOfSegments, groups.size)
+ assertTrue("All groups should be singletons.", groups.forall(_.size == 1))
+ checkSegmentOrder(groups)
+
+ val groupSize = 3
+
+ // check grouping by log size
+ val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue)
+ checkSegmentOrder(groups)
+ assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
+
+ // check grouping by index size
+ val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes()).sum + 1
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize)
+ checkSegmentOrder(groups)
+ assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
+ }
+
+ private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]) {
+ val offsets = groups.flatMap(_.map(_.baseOffset))
+ assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets)
+ }
+
+ /**
+ * Test building an offset map off the log
+ */
+ @Test
+ def testBuildOffsetMap() {
+ val map = new FakeOffsetMap(1000)
+ val log = makeLog()
+ val cleaner = makeCleaner(Int.MaxValue)
+ val start = 0
+ val end = 500
+ val offsets = writeToLog(log, (start until end) zip (start until end))
+ def checkRange(map: FakeOffsetMap, start: Int, end: Int) {
+ val endOffset = cleaner.buildOffsetMap(log, start, end, map) + 1
+ assertEquals("Last offset should be the end offset.", end, endOffset)
+ assertEquals("Should have the expected number of messages in the map.", end-start, map.size)
+ for(i <- start until end)
+ assertEquals("Should find all the keys", i.toLong, map.get(key(i)))
+ assertEquals("Should not find a value too small", -1L, map.get(key(start-1)))
+ assertEquals("Should not find a value too large", -1L, map.get(key(end)))
+ }
+ val segments = log.logSegments.toSeq
+ checkRange(map, 0, segments(1).baseOffset.toInt)
+ checkRange(map, segments(1).baseOffset.toInt, segments(3).baseOffset.toInt)
+ checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt)
+ }
+
+ /**
+ * Test that we don't exceed the maximum capacity of the offset map, that is that an offset map
+ * with a max size of 1000 will only clean 1000 new entries even if more than that are available.
+ */
+ @Test
+ def testBuildOffsetMapOverCapacity() {
+ val map = new FakeOffsetMap(1000)
+ val log = makeLog()
+ val cleaner = makeCleaner(Int.MaxValue)
+ val vals = 0 until 1001
+ val offsets = writeToLog(log, vals zip vals)
+ val lastOffset = cleaner.buildOffsetMap(log, vals.start, vals.end, map)
+ assertEquals("Shouldn't go beyond the capacity of the offset map.", 1000, lastOffset)
+ }
+
+ def makeLog(dir: File = dir, config: LogConfig = logConfig) =
+ new Log(dir = dir, config = config, needsRecovery = false, scheduler = time.scheduler, time = time)
+
+ def makeCleaner(capacity: Int) =
+ new Cleaner(id = 0, new FakeOffsetMap(capacity), ioBufferSize = 64*1024, maxIoBufferSize = 64*1024, throttler = throttler, time = time)
+
+ def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
+ for((key, value) <- seq)
+ yield log.append(messages(key, value)).firstOffset
+ }
+
+ def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
+
+ def messages(key: Int, value: Int) =
+ new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes))
+
+}
+
+class FakeOffsetMap(val capacity: Int) extends OffsetMap {
+ val map = new java.util.HashMap[String, Long]()
+
+ private def keyFor(key: ByteBuffer) =
+ new String(Utils.readBytes(key.duplicate), "UTF-8")
+
+ def put(key: ByteBuffer, offset: Long): Unit =
+ map.put(keyFor(key), offset)
+
+ def get(key: ByteBuffer): Long = {
+ val k = keyFor(key)
+ if(map.containsKey(k))
+ map.get(k)
+ else
+ -1L
+ }
+
+ def clear() = map.clear()
+
+ def size: Int = map.size
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
new file mode 100644
index 0000000..5a489f9
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import scala.collection._
+import org.junit._
+import kafka.common.TopicAndPartition
+import kafka.utils._
+import kafka.message._
+import org.scalatest.junit.JUnitSuite
+import junit.framework.Assert._
+
+/**
+ * This is an integration test that tests the fully integrated log cleaner
+ */
+class LogCleanerIntegrationTest extends JUnitSuite {
+
+ val time = new MockTime()
+ val segmentSize = 100
+ val deleteDelay = 1000
+ val logName = "log"
+ val logDir = TestUtils.tempDir()
+ var counter = 0
+ val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2))
+
+ @Test
+ def cleanerTest() {
+ val cleaner = makeCleaner(parts = 3)
+ val log = cleaner.logs.get(topics(0))
+
+ val appends = writeDups(numKeys = 100, numDups = 3, log)
+ val startSize = log.size
+ cleaner.startup()
+
+ val lastCleaned = log.activeSegment.baseOffset
+ // wait until we clean up to base_offset of active segment - minDirtyMessages
+ cleaner.awaitCleaned("log", 0, lastCleaned)
+
+ val read = readFromLog(log)
+ assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap)
+ assertTrue(startSize > log.size)
+
+ // write some more stuff and validate again
+ val appends2 = appends ++ writeDups(numKeys = 100, numDups = 3, log)
+ val lastCleaned2 = log.activeSegment.baseOffset
+ cleaner.awaitCleaned("log", 0, lastCleaned2)
+ val read2 = readFromLog(log)
+ assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap)
+
+ cleaner.shutdown()
+ }
+
+ def readFromLog(log: Log): Iterable[(Int, Int)] = {
+ for(segment <- log.logSegments; message <- segment.log) yield {
+ val key = Utils.readString(message.message.key).toInt
+ val value = Utils.readString(message.message.payload).toInt
+ key -> value
+ }
+ }
+
+ def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
+ for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
+ val count = counter
+ val appendInfo = log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true)
+ counter += 1
+ (key, count)
+ }
+ }
+
+ @After
+ def teardown() {
+ Utils.rm(logDir)
+ }
+
+ /* create a cleaner instance and logs with the given parameters */
+ def makeCleaner(parts: Int,
+ minDirtyMessages: Int = 0,
+ numThreads: Int = 1,
+ defaultPolicy: String = "dedupe",
+ policyOverrides: Map[String, String] = Map()): LogCleaner = {
+
+ // create partitions and add them to the pool
+ val logs = new Pool[TopicAndPartition, Log]()
+ for(i <- 0 until parts) {
+ val dir = new File(logDir, "log-" + i)
+ dir.mkdirs()
+ val log = new Log(dir = dir,
+ LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, dedupe = true),
+ needsRecovery = false,
+ scheduler = time.scheduler,
+ time = time)
+ logs.put(TopicAndPartition("log", i), log)
+ }
+
+ new LogCleaner(CleanerConfig(numThreads = numThreads),
+ logDirs = Array(logDir),
+ logs = logs,
+ time = time)
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index f48c709..fad3baa 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -29,21 +29,18 @@ class LogManagerTest extends JUnit3Suite {
val time: MockTime = new MockTime()
val maxRollInterval = 100
- val maxLogAgeHours = 10
+ val maxLogAgeMs = 10*60*60*1000
+ val logConfig = LogConfig(segmentSize = 1024, maxIndexSize = 4096, retentionMs = maxLogAgeMs)
var logDir: File = null
var logManager: LogManager = null
- var config: KafkaConfig = null
val name = "kafka"
val veryLargeLogFlushInterval = 10000000L
+ val cleanerConfig = CleanerConfig(enableCleaner = false)
override def setUp() {
super.setUp()
- config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) {
- override val logSegmentBytes = 1024
- override val logFlushIntervalMessages = 10000
- override val logRetentionHours = maxLogAgeHours
- }
- logManager = new LogManager(config, time.scheduler, time)
+ logDir = TestUtils.tempDir()
+ logManager = new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time)
logManager.startup
logDir = logManager.logDirs(0)
}
@@ -62,7 +59,7 @@ class LogManagerTest extends JUnit3Suite {
@Test
def testCreateLog() {
val log = logManager.getOrCreateLog(name, 0)
- val logFile = new File(config.logDirs(0), name + "-0")
+ val logFile = new File(logDir, name + "-0")
assertTrue(logFile.exists)
log.append(TestUtils.singleMessageSet("test".getBytes()))
}
@@ -74,7 +71,7 @@ class LogManagerTest extends JUnit3Suite {
def testGetNonExistentLog() {
val log = logManager.getLog(name, 0)
assertEquals("No log should be found.", None, log)
- val logFile = new File(config.logDirs(0), name + "-0")
+ val logFile = new File(logDir, name + "-0")
assertTrue(!logFile.exists)
}
@@ -94,9 +91,9 @@ class LogManagerTest extends JUnit3Suite {
log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds))
- time.sleep(maxLogAgeHours*60*60*1000 + 1)
+ time.sleep(maxLogAgeMs + 1)
assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
- time.sleep(log.segmentDeleteDelayMs + 1)
+ time.sleep(log.config.fileDeleteDelayMs + 1)
assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes)
@@ -116,14 +113,10 @@ class LogManagerTest extends JUnit3Suite {
@Test
def testCleanupSegmentsToMaintainSize() {
val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
- val props = TestUtils.createBrokerConfig(0, -1)
logManager.shutdown()
- config = new KafkaConfig(props) {
- override val logSegmentBytes = (10 * (setSize - 1)) // each segment will be 10 messages
- override val logRetentionBytes = (5 * 10 * setSize + 10).asInstanceOf[Long]
- override val logRollHours = maxRollInterval
- }
- logManager = new LogManager(config, time.scheduler, time)
+
+ val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L)
+ logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time)
logManager.startup
// create a log
@@ -138,13 +131,12 @@ class LogManagerTest extends JUnit3Suite {
offset = info.firstOffset
}
- // should be exactly 100 full segments + 1 new empty one
- assertEquals("Check we have the expected number of segments.", numMessages * setSize / config.logSegmentBytes, log.numberOfSegments)
+ assertEquals("Check we have the expected number of segments.", numMessages * setSize / config.segmentSize, log.numberOfSegments)
// this cleanup shouldn't find any expired segments but should delete some to reduce size
time.sleep(logManager.InitialTaskDelayMs)
assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
- time.sleep(log.segmentDeleteDelayMs + 1)
+ time.sleep(log.config.fileDeleteDelayMs + 1)
assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes)
try {
@@ -162,14 +154,9 @@ class LogManagerTest extends JUnit3Suite {
*/
@Test
def testTimeBasedFlush() {
- val props = TestUtils.createBrokerConfig(0, -1)
logManager.shutdown()
- config = new KafkaConfig(props) {
- override val logFlushSchedulerIntervalMs = 1000
- override val logFlushIntervalMs = 1000
- override val logFlushIntervalMessages = Int.MaxValue
- }
- logManager = new LogManager(config, time.scheduler, time)
+ val config = logConfig.copy(flushMs = 1000)
+ logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time)
logManager.startup
val log = logManager.getOrCreateLog(name, 0)
val lastFlush = log.lastFlushTime
@@ -187,13 +174,11 @@ class LogManagerTest extends JUnit3Suite {
@Test
def testLeastLoadedAssignment() {
// create a log manager with multiple data directories
- val props = TestUtils.createBrokerConfig(0, -1)
- val dirs = Seq(TestUtils.tempDir().getAbsolutePath,
- TestUtils.tempDir().getAbsolutePath,
- TestUtils.tempDir().getAbsolutePath)
- props.put("log.dirs", dirs.mkString(","))
+ val dirs = Array(TestUtils.tempDir(),
+ TestUtils.tempDir(),
+ TestUtils.tempDir())
logManager.shutdown()
- logManager = new LogManager(new KafkaConfig(props), time.scheduler, time)
+ logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time)
// verify that logs are always assigned to the least loaded partition
for(partition <- 0 until 20) {
@@ -209,7 +194,7 @@ class LogManagerTest extends JUnit3Suite {
*/
def testTwoLogManagersUsingSameDirFails() {
try {
- new LogManager(logManager.config, time.scheduler, time)
+ new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time)
fail("Should not be able to create a second log manager instance with the same data directory")
} catch {
case e: KafkaException => // this is good
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index bffe4a4..a185ce4 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -161,6 +161,21 @@ class LogSegmentTest extends JUnit3Suite {
}
/**
+ * Test that we can change the file suffixes for the log and index files
+ */
+ @Test
+ def testChangeFileSuffixes() {
+ val seg = createSegment(40)
+ val logFile = seg.log.file
+ val indexFile = seg.index.file
+ seg.changeFileSuffixes("", ".deleted")
+ assertEquals(logFile.getAbsolutePath + ".deleted", seg.log.file.getAbsolutePath)
+ assertEquals(indexFile.getAbsolutePath + ".deleted", seg.index.file.getAbsolutePath)
+ assertTrue(seg.log.file.exists)
+ assertTrue(seg.index.file.exists)
+ }
+
+ /**
* Create a segment with some data and an index. Then corrupt the index,
* and recover the segment, the entries should all be readable.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 0fc74fa..23e0e65 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -34,6 +34,7 @@ class LogTest extends JUnitSuite {
var logDir: File = null
val time = new MockTime
var config: KafkaConfig = null
+ val logConfig = LogConfig()
@Before
def setUp() {
@@ -61,12 +62,15 @@ class LogTest extends JUnitSuite {
@Test
def testTimeBasedLogRoll() {
val set = TestUtils.singleMessageSet("test".getBytes())
- val rollMs = 1 * 60 * 60L
val time: MockTime = new MockTime()
// create a log
- val log = new Log(logDir, time.scheduler, 1000, config.messageMaxBytes, 1000, rollMs, needsRecovery = false, time = time)
- time.sleep(rollMs + 1)
+ val log = new Log(logDir,
+ logConfig.copy(segmentMs = 1 * 60 * 60L),
+ needsRecovery = false,
+ scheduler = time.scheduler,
+ time = time)
+ time.sleep(log.config.segmentMs + 1)
// segment age is less than its limit
log.append(set)
@@ -76,13 +80,13 @@ class LogTest extends JUnitSuite {
assertEquals("There should still be exactly one segment.", 1, log.numberOfSegments)
for(numSegments <- 2 until 4) {
- time.sleep(rollMs + 1)
+ time.sleep(log.config.segmentMs + 1)
log.append(set)
assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
}
val numSegments = log.numberOfSegments
- time.sleep(rollMs + 1)
+ time.sleep(log.config.segmentMs + 1)
log.append(new ByteBufferMessageSet())
assertEquals("Appending an empty message set should not roll log even if succient time has passed.", numSegments, log.numberOfSegments)
}
@@ -95,10 +99,10 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes)
val setSize = set.sizeInBytes
val msgPerSeg = 10
- val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
+ val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
// create a log
- val log = new Log(logDir, time.scheduler, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
+ val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), needsRecovery = false, time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
// segments expire in size
@@ -114,7 +118,7 @@ class LogTest extends JUnitSuite {
@Test
def testLoadEmptyLog() {
createEmptyLogs(logDir, 0)
- val log = new Log(logDir, time.scheduler, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, logConfig, needsRecovery = false, time.scheduler, time = time)
log.append(TestUtils.singleMessageSet("test".getBytes))
}
@@ -123,7 +127,7 @@ class LogTest extends JUnitSuite {
*/
@Test
def testAppendAndReadWithSequentialOffsets() {
- val log = new Log(logDir, time.scheduler, 71, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, logConfig.copy(segmentSize = 71), needsRecovery = false, time.scheduler, time = time)
val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
for(i <- 0 until messages.length)
@@ -142,7 +146,7 @@ class LogTest extends JUnitSuite {
*/
@Test
def testAppendAndReadWithNonSequentialOffsets() {
- val log = new Log(logDir, time.scheduler, 71, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, logConfig.copy(segmentSize = 71), needsRecovery = false, time.scheduler, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val messages = messageIds.map(id => new Message(id.toString.getBytes))
@@ -165,7 +169,7 @@ class LogTest extends JUnitSuite {
*/
@Test
def testReadAtLogGap() {
- val log = new Log(logDir, time.scheduler, 300, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, logConfig.copy(segmentSize = 300), needsRecovery = false, time.scheduler, time = time)
// keep appending until we have two segments with only a single message in the second segment
while(log.numberOfSegments == 1)
@@ -185,7 +189,7 @@ class LogTest extends JUnitSuite {
@Test
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
- val log = new Log(logDir, time.scheduler, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, logConfig.copy(segmentSize = 1024), needsRecovery = false, time.scheduler, time = time)
assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes)
try {
log.read(0, 1024)
@@ -208,7 +212,7 @@ class LogTest extends JUnitSuite {
@Test
def testLogRolls() {
/* create a multipart log with 100 messages */
- val log = new Log(logDir, time.scheduler, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, logConfig.copy(segmentSize = 100), needsRecovery = false, time.scheduler, time = time)
val numMessages = 100
val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
val offsets = messageSets.map(log.append(_).firstOffset)
@@ -232,8 +236,8 @@ class LogTest extends JUnitSuite {
@Test
def testCompressedMessages() {
/* this log should roll after every messageset */
- val log = new Log(logDir, time.scheduler, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
-
+ val log = new Log(logDir, logConfig.copy(segmentSize = 10), needsRecovery = false, time.scheduler, time = time)
+
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
@@ -255,7 +259,7 @@ class LogTest extends JUnitSuite {
for(messagesToAppend <- List(0, 1, 25)) {
logDir.mkdirs()
// first test a log segment starting at 0
- val log = new Log(logDir, time.scheduler, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, logConfig.copy(segmentSize = 100), needsRecovery = false, time.scheduler, time = time)
for(i <- 0 until messagesToAppend)
log.append(TestUtils.singleMessageSet(i.toString.getBytes))
@@ -289,7 +293,7 @@ class LogTest extends JUnitSuite {
// append messages to log
val maxMessageSize = second.sizeInBytes - 1
- val log = new Log(logDir, time.scheduler, 100, maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, logConfig.copy(maxMessageSize = maxMessageSize), needsRecovery = false, time.scheduler, time = time)
// should be able to append the small message
log.append(first)
@@ -311,7 +315,8 @@ class LogTest extends JUnitSuite {
val messageSize = 100
val segmentSize = 7 * messageSize
val indexInterval = 3 * messageSize
- var log = new Log(logDir, time.scheduler, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+ val config = logConfig.copy(segmentSize = segmentSize, indexInterval = indexInterval, maxIndexSize = 4096)
+ var log = new Log(logDir, config, needsRecovery = false, time.scheduler, time)
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
@@ -319,15 +324,14 @@ class LogTest extends JUnitSuite {
val numIndexEntries = log.activeSegment.index.entries
log.close()
- // test non-recovery case
- log = new Log(logDir, time.scheduler, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+ log = new Log(logDir, config, needsRecovery = false, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
log.close()
// test recovery case
- log = new Log(logDir, time.scheduler, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+ log = new Log(logDir, config, needsRecovery = true, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
@@ -341,7 +345,8 @@ class LogTest extends JUnitSuite {
def testIndexRebuild() {
// publish the messages and close the log
val numMessages = 200
- var log = new Log(logDir, time.scheduler, 200, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
+ val config = logConfig.copy(segmentSize = 200, indexInterval = 1)
+ var log = new Log(logDir, config, needsRecovery = true, time.scheduler, time)
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
val indexFiles = log.logSegments.map(_.index.file)
@@ -351,8 +356,7 @@ class LogTest extends JUnitSuite {
indexFiles.foreach(_.delete())
// reopen the log
- log = new Log(logDir, time.scheduler, 200, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
-
+ log = new Log(logDir, config, needsRecovery = true, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
for(i <- 0 until numMessages)
assertEquals(i, log.read(i, 100, None).head.offset)
@@ -367,10 +371,10 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes())
val setSize = set.sizeInBytes
val msgPerSeg = 10
- val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
+ val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
// create a log
- val log = new Log(logDir, time.scheduler, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
+ val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), needsRecovery = false, scheduler = time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
@@ -421,8 +425,9 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes())
val setSize = set.sizeInBytes
val msgPerSeg = 10
- val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
- val log = new Log(logDir, time.scheduler, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
+ val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
+ val config = logConfig.copy(segmentSize = segmentSize)
+ val log = new Log(logDir, config, needsRecovery = false, scheduler = time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
log.append(set)
@@ -430,10 +435,10 @@ class LogTest extends JUnitSuite {
for (i<- 1 to msgPerSeg)
log.append(set)
assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
- assertEquals("The index of the first segment should be trim to empty", 0, log.logSegments.toList(0).index.maxEntries)
+ assertEquals("The index of the first segment should be trimmed to empty", 0, log.logSegments.toList(0).index.maxEntries)
log.truncateTo(0)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
- assertEquals("The index of segment 1 should be resized to maxIndexSize", log.maxIndexSize/8, log.logSegments.toList(0).index.maxEntries)
+ assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList(0).index.maxEntries)
for (i<- 1 to msgPerSeg)
log.append(set)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
@@ -449,12 +454,12 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes())
val log = new Log(logDir,
+ logConfig.copy(segmentSize = set.sizeInBytes * 5,
+ maxIndexSize = 1000,
+ indexInterval = 1),
+ needsRecovery = false,
time.scheduler,
- maxSegmentSize = set.sizeInBytes * 5,
- maxMessageSize = config.messageMaxBytes,
- maxIndexSize = 1000,
- indexIntervalBytes = 1,
- needsRecovery = false)
+ time)
assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
assertFalse("The second index file should have been deleted.", bogusIndex2.exists)
@@ -472,27 +477,26 @@ class LogTest extends JUnitSuite {
@Test
def testReopenThenTruncate() {
val set = TestUtils.singleMessageSet("test".getBytes())
+ val config = logConfig.copy(segmentSize = set.sizeInBytes * 5,
+ maxIndexSize = 1000,
+ indexInterval = 10000)
// create a log
var log = new Log(logDir,
+ config,
+ needsRecovery = true,
time.scheduler,
- maxSegmentSize = set.sizeInBytes * 5,
- maxMessageSize = config.messageMaxBytes,
- maxIndexSize = 1000,
- indexIntervalBytes = 10000,
- needsRecovery = true)
+ time)
// add enough messages to roll over several segments then close and re-open and attempt to truncate
for(i <- 0 until 100)
log.append(set)
log.close()
log = new Log(logDir,
+ config,
+ needsRecovery = true,
time.scheduler,
- maxSegmentSize = set.sizeInBytes * 5,
- maxMessageSize = config.messageMaxBytes,
- maxIndexSize = 1000,
- indexIntervalBytes = 10000,
- needsRecovery = true)
+ time)
log.truncateTo(3)
assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
@@ -505,14 +509,15 @@ class LogTest extends JUnitSuite {
def testAsyncDelete() {
val set = TestUtils.singleMessageSet("test".getBytes())
val asyncDeleteMs = 1000
- val log = new Log(logDir,
+ val config = logConfig.copy(segmentSize = set.sizeInBytes * 5,
+ fileDeleteDelayMs = asyncDeleteMs,
+ maxIndexSize = 1000,
+ indexInterval = 10000)
+ val log = new Log(logDir,
+ config,
+ needsRecovery = true,
time.scheduler,
- maxSegmentSize = set.sizeInBytes * 5,
- maxMessageSize = config.messageMaxBytes,
- maxIndexSize = 1000,
- indexIntervalBytes = 10000,
- segmentDeleteDelayMs = asyncDeleteMs,
- needsRecovery = true)
+ time)
// append some messages to create some segments
for(i <- 0 until 100)
@@ -520,15 +525,20 @@ class LogTest extends JUnitSuite {
// files should be renamed
val segments = log.logSegments.toArray
+ val oldFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
log.deleteOldSegments((s) => true)
assertEquals("Only one segment should remain.", 1, log.numberOfSegments)
- val renamed = segments.map(segment => new File(segment.log.file.getPath + Log.DeletedFileSuffix))
- assertTrue("Files should all be renamed to .deleted.", renamed.forall(_.exists))
+ assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) &&
+ segments.forall(_.index.file.getName.endsWith(Log.DeletedFileSuffix)))
+ assertTrue("The .deleted files should still be there.", segments.forall(_.log.file.exists) &&
+ segments.forall(_.index.file.exists))
+ assertTrue("The original file should be gone.", oldFiles.forall(!_.exists))
// when enough time passes the files should be deleted
+ val deletedFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
time.sleep(asyncDeleteMs + 1)
- assertTrue("Files should all be gone.", renamed.forall(!_.exists))
+ assertTrue("Files should all be gone.", deletedFiles.forall(!_.exists))
}
/**
@@ -537,13 +547,12 @@ class LogTest extends JUnitSuite {
@Test
def testOpenDeletesObsoleteFiles() {
val set = TestUtils.singleMessageSet("test".getBytes())
- var log = new Log(logDir,
+ val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, maxIndexSize = 1000)
+ var log = new Log(logDir,
+ config,
+ needsRecovery = false,
time.scheduler,
- maxSegmentSize = set.sizeInBytes * 5,
- maxMessageSize = config.messageMaxBytes,
- maxIndexSize = 1000,
- indexIntervalBytes = 10000,
- needsRecovery = false)
+ time)
// append some messages to create some segments
for(i <- 0 until 100)
@@ -553,12 +562,10 @@ class LogTest extends JUnitSuite {
log.close()
log = new Log(logDir,
+ config,
+ needsRecovery = false,
time.scheduler,
- maxSegmentSize = set.sizeInBytes * 5,
- maxMessageSize = config.messageMaxBytes,
- maxIndexSize = 1000,
- indexIntervalBytes = 10000,
- needsRecovery = false)
+ time)
assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
new file mode 100644
index 0000000..99a0c4b
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.nio._
+import org.junit._
+import org.scalatest.junit.JUnitSuite
+import junit.framework.Assert._
+
+class OffsetMapTest extends JUnitSuite {
+
+ @Test
+ def testBasicValidation() {
+ validateMap(10)
+ validateMap(100)
+ validateMap(1000)
+ }
+
+ @Test
+ def testClear() {
+ val map = new SkimpyOffsetMap(4000, 0.75)
+ for(i <- 0 until 10)
+ map.put(key(i), i)
+ for(i <- 0 until 10)
+ assertEquals(i.toLong, map.get(key(i)))
+ map.clear()
+ for(i <- 0 until 10)
+ assertEquals(map.get(key(i)), -1L)
+ }
+
+ @Test
+ def testCapacity() {
+ val map = new SkimpyOffsetMap(1024, 0.75)
+ var i = 0
+ while(map.size < map.capacity) {
+ map.put(key(i), i)
+ i += 1
+ }
+ // now the map is full, it should throw an exception
+ intercept[IllegalStateException] {
+ map.put(key(i), i)
+ }
+ }
+
+ def key(key: Int) = ByteBuffer.wrap(key.toString.getBytes)
+
+ def validateMap(items: Int) {
+ val map = new SkimpyOffsetMap(items * 2 * 24, 0.75)
+ for(i <- 0 until items)
+ map.put(key(i), i)
+ var misses = 0
+ for(i <- 0 until items) {
+ map.get(key(i)) match {
+ case -1L => misses += 1
+ case offset => assertEquals(i.toLong, offset)
+ }
+ }
+ println("Miss rate: " + (misses.toDouble / items))
+ }
+
+}
+
+object OffsetMapTest {
+ def main(args: Array[String]) {
+ if(args.length != 1) {
+ System.err.println("USAGE: java OffsetMapTest size")
+ System.exit(1)
+ }
+ val test = new OffsetMapTest()
+ test.validateMap(args(0).toInt)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index b609585..8a3e33b 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -16,13 +16,14 @@
*/
package kafka.server
-import kafka.log.LogManager
+import kafka.log._
+import java.io.File
import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
import org.easymock.EasyMock
import org.junit._
import org.junit.Assert._
-import kafka.common.KafkaException
+import kafka.common._
import kafka.cluster.Replica
import kafka.utils._
@@ -30,7 +31,14 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_))
val topic = "foo"
- val logManagers = configs.map(config => new LogManager(config, new KafkaScheduler(1), new MockTime))
+ val logManagers = configs.map(config => new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
+ topicConfigs = Map(),
+ defaultConfig = LogConfig(),
+ cleanerConfig = CleanerConfig(),
+ flushCheckMs = 30000,
+ retentionCheckMs = 30000,
+ scheduler = new KafkaScheduler(1),
+ time = new MockTime))
@After
def teardown() {
@@ -133,7 +141,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
}
def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = {
- replicaManager.highWatermarkCheckpoints(replicaManager.config.logDirs(0)).read(topic, partition)
+ replicaManager.highWatermarkCheckpoints(replicaManager.config.logDirs(0)).read.getOrElse(TopicAndPartition(topic, partition), 0L)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index e3752cb..f857171 100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -205,7 +205,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
props.put("enable.zookeeper", "false")
props.put("num.partitions", "20")
props.put("log.retention.hours", "10")
- props.put("log.cleanup.interval.mins", "5")
+ props.put("log.retention.check.interval.ms", (5*1000*60).toString)
props.put("log.segment.bytes", logSize.toString)
props.put("zk.connect", zkConnect.toString)
props