You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2013/01/29 04:36:20 UTC

[2/3] KAFKA-631 Implement a log cleaner for Kafka. Reviewed by Neha.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 51ea727..f4ba59c 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -103,9 +103,37 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
 
   /* the maximum size of the log for some specific topic before deleting it */
   val logRetentionBytesPerTopicMap = props.getMap("log.retention.bytes.per.topic", _.toLong > 0).mapValues(_.toLong)
-
+  
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
-  val logCleanupIntervalMins = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
+  val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue))
+  
+  /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "dedupe" */
+  val logCleanupPolicy = props.getString("log.cleanup.policy", "delete")
+  
+  /* a per-topic override for the cleanup policy for segments beyond the retention window */
+  val logCleanupPolicyMap = props.getMap("topic.log.cleanup.policy")
+  
+  /* the number of background threads to use for log cleaning */
+  val logCleanerThreads = props.getIntInRange("log.cleaner.threads", 1, (0, Int.MaxValue))
+  
+  /* the log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average */
+  val logCleanerIoMaxBytesPerSecond = props.getDouble("log.cleaner.io.max.bytes.per.second", Double.MaxValue)
+  
+  /* the total memory used for log deduplication across all cleaner threads */
+  val logCleanerDedupeBufferSize = props.getIntInRange("log.cleaner.dedupe.buffer.size", 500*1024*1024, (0, Int.MaxValue))
+  require(logCleanerDedupeBufferSize / logCleanerThreads > 1024*1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
+  
+  /* the total memory used for log cleaner I/O buffers across all cleaner threads */
+  val logCleanerIoBufferSize = props.getIntInRange("log.cleaner.io.buffer.size", 4*1024*1024, (0, Int.MaxValue))
+  
+  /* the amount of time to sleep when there are no logs to clean */
+  val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 30*1000, (0L, Long.MaxValue))
+  
+  /* the minimum ratio of dirty log to total log for a log to eligible for cleaning */
+  val logCleanerMinCleanRatio = props.getDouble("log.cleaner.min.cleanable.ratio", 0.5)
+  
+  /* should we enable log cleaning? */
+  val logCleanerEnable = props.getBoolean("log.cleaner.enable", false)
   
   /* the maximum size in bytes of the offset index */
   val logIndexSizeMaxBytes = props.getIntInRange("log.index.size.max.bytes", 10*1024*1024, (4, Int.MaxValue))
@@ -116,6 +144,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the number of messages accumulated on a log partition before messages are flushed to disk */
   val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 500, (1, Int.MaxValue))
 
+  /* the amount of time to wait before deleting a file from the filesystem */
   val logDeleteDelayMs = props.getLongInRange("log.segment.delete.delay.ms", 60000, (0, Long.MaxValue))
 
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9258b13..da6f716 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -18,9 +18,12 @@
 package kafka.server
 
 import kafka.network.SocketServer
+import kafka.log.LogConfig
+import kafka.log.CleanerConfig
 import kafka.log.LogManager
 import kafka.utils._
 import java.util.concurrent._
+import java.io.File
 import atomic.AtomicBoolean
 import org.I0Itec.zkclient.ZkClient
 import kafka.controller.{ControllerStats, KafkaController}
@@ -56,9 +59,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     kafkaScheduler.startup()
 
     /* start log manager */
-    logManager = new LogManager(config,
-                                kafkaScheduler,
-                                time)
+    logManager = createLogManager(config)
     logManager.startup()
 
     socketServer = new SocketServer(config.brokerId,
@@ -138,6 +139,50 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   def awaitShutdown(): Unit = shutdownLatch.await()
 
   def getLogManager(): LogManager = logManager
+  
+  private def createLogManager(config: KafkaConfig): LogManager = {
+    val topics = config.logCleanupPolicyMap.keys ++ 
+                 config.logSegmentBytesPerTopicMap.keys ++ 
+                 config.logFlushIntervalMsPerTopicMap.keys ++ 
+                 config.logRollHoursPerTopicMap.keys ++ 
+                 config.logRetentionBytesPerTopicMap.keys ++ 
+                 config.logRetentionHoursPerTopicMap.keys
+    val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, 
+                                     segmentMs = 60 * 60 * 1000 * config.logRollHours,
+                                     flushInterval = config.logFlushIntervalMessages,
+                                     flushMs = config.logFlushIntervalMs.toLong,
+                                     retentionSize = config.logRetentionBytes,
+                                     retentionMs = 60 * 60 * 1000 * config.logRetentionHours,
+                                     maxMessageSize = config.messageMaxBytes,
+                                     maxIndexSize = config.logIndexSizeMaxBytes,
+                                     indexInterval = config.logIndexIntervalBytes,
+                                     fileDeleteDelayMs = config.logDeleteDelayMs,
+                                     minCleanableRatio = config.logCleanerMinCleanRatio,
+                                     dedupe = config.logCleanupPolicy.trim.toLowerCase == "dedupe")
+    val logConfigs = for(topic <- topics) yield 
+      topic -> defaultLogConfig.copy(segmentSize = config.logSegmentBytesPerTopicMap.getOrElse(topic, config.logSegmentBytes), 
+                                     segmentMs = 60 * 60 * 1000 * config.logRollHoursPerTopicMap.getOrElse(topic, config.logRollHours),
+                                     flushMs = config.logFlushIntervalMsPerTopicMap.getOrElse(topic, config.logFlushIntervalMs).toLong,
+                                     retentionSize = config.logRetentionBytesPerTopicMap.getOrElse(topic, config.logRetentionBytes),
+                                     retentionMs = 60 * 60 * 1000 * config.logRetentionHoursPerTopicMap.getOrElse(topic, config.logRetentionHours),
+                                     dedupe = config.logCleanupPolicyMap.getOrElse(topic, config.logCleanupPolicy).trim.toLowerCase == "dedupe")
+    val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
+                                      dedupeBufferSize = config.logCleanerDedupeBufferSize,
+                                      ioBufferSize = config.logCleanerIoBufferSize,
+                                      maxMessageSize = config.messageMaxBytes,
+                                      maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
+                                      backOffMs = config.logCleanerBackoffMs,
+                                      enableCleaner = config.logCleanerEnable)
+    new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
+                   topicConfigs = logConfigs.toMap,
+                   defaultConfig = defaultLogConfig,
+                   cleanerConfig = cleanerConfig,
+                   flushCheckMs = config.logFlushSchedulerIntervalMs,
+                   retentionCheckMs = config.logCleanupIntervalMs,
+                   scheduler = kafkaScheduler,
+                   time = time)
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
new file mode 100644
index 0000000..79f29df
--- /dev/null
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import scala.collection._
+import kafka.utils.Logging
+import kafka.common._
+import java.util.concurrent.locks.ReentrantLock
+import java.io._
+
+/**
+ * This class saves out a map of topic/partition=>offsets to a file
+ */
+class OffsetCheckpoint(val file: File) extends Logging {
+  private val lock = new Object()
+  new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness
+  file.createNewFile() // in case the file doesn't exist
+
+  def write(offsets: Map[TopicAndPartition, Long]) {
+    lock synchronized {
+      // write to temp file and then swap with the existing file
+      val temp = new File(file.getAbsolutePath + ".tmp")
+
+      val writer = new BufferedWriter(new FileWriter(temp))
+      try {
+        // write the current version
+        writer.write(0.toString)
+        writer.newLine()
+      
+        // write the number of entries
+        writer.write(offsets.size.toString)
+        writer.newLine()
+
+        // write the entries
+        offsets.foreach { case (topicPart, offset) =>
+          writer.write("%s %d %d".format(topicPart.topic, topicPart.partition, offset))
+          writer.newLine()
+        }
+      
+        // flush and overwrite old file
+        writer.flush()
+        if(!temp.renameTo(file))
+          throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath))
+      } finally {
+        writer.close()
+      }
+    }
+  }
+
+  def read(): Map[TopicAndPartition, Long] = {
+    lock synchronized {
+      val reader = new BufferedReader(new FileReader(file))
+      try {
+        var line = reader.readLine()
+        if(line == null)
+          return Map.empty
+        val version = line.toInt
+        version match {
+          case 0 =>
+            line = reader.readLine()
+            if(line == null)
+              return Map.empty
+            val expectedSize = line.toInt
+            var offsets = Map[TopicAndPartition, Long]()
+            line = reader.readLine()
+            while(line != null) {
+              val pieces = line.split("\\s+")
+              if(pieces.length != 3)
+                throw new IOException("Malformed line in offset checkpoint file: '%s'.".format(line))
+              
+              val topic = pieces(0)
+              val partition = pieces(1).toInt
+              val offset = pieces(2).toLong
+              offsets += (TopicAndPartition(pieces(0), partition) -> offset)
+              line = reader.readLine()
+            }
+            if(offsets.size != expectedSize)
+              throw new IOException("Expected %d entries but found only %d".format(expectedSize, offsets.size))
+            offsets
+          case _ => 
+            throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
+        }
+      } finally {
+        reader.close()
+      }
+    }
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 7810c21..710c08b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -20,6 +20,7 @@ import kafka.cluster.{Broker, Partition, Replica}
 import collection._
 import mutable.HashMap
 import org.I0Itec.zkclient.ZkClient
+import java.io.{File, IOException}
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.utils._
 import kafka.log.LogManager
@@ -33,6 +34,7 @@ import kafka.controller.KafkaController
 
 object ReplicaManager {
   val UnknownLogEndOffset = -1L
+  val HighWatermarkFilename = "replication-offset-checkpoint"
 }
 
 class ReplicaManager(val config: KafkaConfig, 
@@ -48,7 +50,7 @@ class ReplicaManager(val config: KafkaConfig,
   val replicaFetcherManager = new ReplicaFetcherManager(config, this)
   this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
-  val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
+  val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
 
   newGauge(
     "LeaderCount",
@@ -67,7 +69,7 @@ class ReplicaManager(val config: KafkaConfig,
     }
   )
   val isrExpandRate = newMeter("IsrExpandsPerSec",  "expands", TimeUnit.SECONDS)
-  val isrShrinkRate = newMeter("ISRShrinksPerSec",  "shrinks", TimeUnit.SECONDS)
+  val isrShrinkRate = newMeter("IsrShrinksPerSec",  "shrinks", TimeUnit.SECONDS)
 
 
   def startHighWaterMarksCheckPointThread() = {
@@ -265,7 +267,13 @@ class ReplicaManager(val config: KafkaConfig,
     val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
     for((dir, reps) <- replicasByDir) {
       val hwms = reps.map(r => (TopicAndPartition(r.topic, r.partitionId) -> r.highWatermark)).toMap
-      highWatermarkCheckpoints(dir).write(hwms)
+      try {
+        highWatermarkCheckpoints(dir).write(hwms)
+      } catch {
+        case e: IOException =>
+          fatal("Error writing to highwatermark file: ", e)
+          Runtime.getRuntime().halt(1)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/utils/FileLock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/FileLock.scala b/core/src/main/scala/kafka/utils/FileLock.scala
index ad7a597..a5761b9 100644
--- a/core/src/main/scala/kafka/utils/FileLock.scala
+++ b/core/src/main/scala/kafka/utils/FileLock.scala
@@ -9,56 +9,56 @@ import java.nio.channels._
  * The given path will be created and opened if it doesn't exist.
  */
 class FileLock(val file: File) extends Logging {
-    file.createNewFile()
-    private val channel = new RandomAccessFile(file, "rw").getChannel()
-    private var flock: java.nio.channels.FileLock = null
-    
-    /**
-     * Lock the file or throw an exception if the lock is already held
-     */
-    def lock() {
-      this synchronized {
-        trace("Acquiring lock on " + file.getAbsolutePath)
-        flock = channel.lock()
-      }
+  file.createNewFile() // create the file if it doesn't exist
+  private val channel = new RandomAccessFile(file, "rw").getChannel()
+  private var flock: java.nio.channels.FileLock = null
+
+  /**
+   * Lock the file or throw an exception if the lock is already held
+   */
+  def lock() {
+    this synchronized {
+      trace("Acquiring lock on " + file.getAbsolutePath)
+      flock = channel.lock()
     }
-    
-    /**
-     * Try to lock the file and return true if the locking succeeds
-     */
-    def tryLock(): Boolean = {
-      this synchronized {
-        trace("Acquiring lock on " + file.getAbsolutePath)
-        try {
-          // weirdly this method will return null if the lock is held by another
-          // process, but will throw an exception if the lock is held by this process
-          // so we have to handle both cases
-          flock = channel.tryLock()
-          flock != null
-        } catch {
-          case e: OverlappingFileLockException => false
-        }
+  }
+
+  /**
+   * Try to lock the file and return true if the locking succeeds
+   */
+  def tryLock(): Boolean = {
+    this synchronized {
+      trace("Acquiring lock on " + file.getAbsolutePath)
+      try {
+        // weirdly this method will return null if the lock is held by another
+        // process, but will throw an exception if the lock is held by this process
+        // so we have to handle both cases
+        flock = channel.tryLock()
+        flock != null
+      } catch {
+        case e: OverlappingFileLockException => false
       }
     }
-    
-    /**
-     * Unlock the lock if it is held
-     */
-    def unlock() {
-      this synchronized {
-        trace("Releasing lock on " + file.getAbsolutePath)
-        if(flock != null)
-          flock.release()
-      }
+  }
+
+  /**
+   * Unlock the lock if it is held
+   */
+  def unlock() {
+    this synchronized {
+      trace("Releasing lock on " + file.getAbsolutePath)
+      if(flock != null)
+        flock.release()
     }
-    
-    /**
-     * Destroy this lock, closing the associated FileChannel
-     */
-    def destroy() = {
-      this synchronized {
-        unlock()
-        channel.close()
-      }
+  }
+
+  /**
+   * Destroy this lock, closing the associated FileChannel
+   */
+  def destroy() = {
+    this synchronized {
+      unlock()
+      channel.close()
     }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala
index d9f010b..2890e7f 100644
--- a/core/src/main/scala/kafka/utils/Logging.scala
+++ b/core/src/main/scala/kafka/utils/Logging.scala
@@ -23,12 +23,13 @@ trait Logging {
   val loggerName = this.getClass.getName
   lazy val logger = Logger.getLogger(loggerName)
 
-  protected var logIdent = ""
+  protected var logIdent: String = null
 
   // Force initialization to register Log4jControllerMBean
   private val log4jController = Log4jController
 
-  private def msgWithLogIdent(msg: String) = logIdent + msg
+  private def msgWithLogIdent(msg: String) = 
+    if(logIdent == null) msg else logIdent + msg
 
   def trace(msg: => String): Unit = {
     if (logger.isTraceEnabled())

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/utils/Throttler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala
index 9e53b03..c6c3c75 100644
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ b/core/src/main/scala/kafka/utils/Throttler.scala
@@ -32,20 +32,14 @@ import scala.math._
  */
 @threadsafe
 class Throttler(val desiredRatePerSec: Double, 
-                val checkIntervalMs: Long, 
-                val throttleDown: Boolean, 
-                val time: Time) extends Logging {
+                val checkIntervalMs: Long = 100L, 
+                val throttleDown: Boolean = true, 
+                val time: Time = SystemTime) extends Logging {
   
   private val lock = new Object
   private var periodStartNs: Long = time.nanoseconds
   private var observedSoFar: Double = 0.0
   
-  def this(desiredRatePerSec: Double, throttleDown: Boolean) = 
-    this(desiredRatePerSec, Throttler.DefaultCheckIntervalMs, throttleDown, SystemTime)
-
-  def this(desiredRatePerSec: Double) = 
-    this(desiredRatePerSec, Throttler.DefaultCheckIntervalMs, true, SystemTime)
-  
   def maybeThrottle(observed: Double) {
     lock synchronized {
       observedSoFar += observed
@@ -58,11 +52,11 @@ class Throttler(val desiredRatePerSec: Double,
         val needAdjustment = !(throttleDown ^ (rateInSecs > desiredRatePerSec))
         if(needAdjustment) {
           // solve for the amount of time to sleep to make us hit the desired rate
-          val desiredRateMs = desiredRatePerSec / Time.MsPerSec.asInstanceOf[Double]
+          val desiredRateMs = desiredRatePerSec / Time.MsPerSec.toDouble
           val elapsedMs = elapsedNs / Time.NsPerMs
           val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs)
           if(sleepTime > 0) {
-            println("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, desiredRatePerSec, sleepTime))
+            trace("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, desiredRatePerSec, sleepTime))
             time.sleep(sleepTime)
           }
         }
@@ -76,20 +70,20 @@ class Throttler(val desiredRatePerSec: Double,
 
 object Throttler {
   
-  val DefaultCheckIntervalMs = 100L
-  
   def main(args: Array[String]) {
     val rand = new Random()
-    val throttler = new Throttler(1000000, 100, true, SystemTime)
+    val throttler = new Throttler(100000, 100, true, SystemTime)
+    val interval = 30000
     var start = System.currentTimeMillis
     var total = 0
     while(true) {
       val value = rand.nextInt(1000)
+      Thread.sleep(1)
       throttler.maybeThrottle(value)
       total += value
       val now = System.currentTimeMillis
-      if(now - start >= 1000) {
-        println(total)
+      if(now - start >= interval) {
+        println(total / (interval/1000.0))
         start = now
         total = 0
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 2a01f69..1c88226 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -27,6 +27,7 @@ import scala.collection._
 import scala.collection.mutable
 import java.util.Properties
 import kafka.common.KafkaException
+import kafka.common.KafkaStorageException
 
 
 /**
@@ -159,7 +160,7 @@ object Utils extends Logging {
    * @param log The log method to use for logging. E.g. logger.warn
    * @param action The action to execute
    */
-  def swallow(log: (Object, Throwable) => Unit, action: => Unit) = {
+  def swallow(log: (Object, Throwable) => Unit, action: => Unit) {
     try {
       action
     } catch {
@@ -528,4 +529,37 @@ object Utils extends Logging {
    */
   def abs(n: Int) = n & 0x7fffffff
   
+  /**
+   * Replace the given string suffix with the new suffix. If the string doesn't end with the given suffix throw an exception.
+   */
+  def replaceSuffix(s: String, oldSuffix: String, newSuffix: String): String = {
+    if(!s.endsWith(oldSuffix))
+      throw new IllegalArgumentException("Expected string to end with '%s' but string is '%s'".format(oldSuffix, s))
+    s.substring(0, s.length - oldSuffix.length) + newSuffix
+  }
+
+  /**
+   * Create a file with the given path
+   * @param path The path to create
+   * @throw KafkaStorageException If the file create fails
+   * @return The created file
+   */
+  def createFile(path: String): File = {
+    val f = new File(path)
+    val created = f.createNewFile()
+    if(!created)
+      throw new KafkaStorageException("Failed to create file %s.".format(path))
+    f
+  }
+  
+  /**
+   * Read a big-endian integer from a byte array
+   */
+  def readInt(bytes: Array[Byte], offset: Int): Int = {
+    ((bytes(offset) & 0xFF) << 24) |
+    ((bytes(offset + 1) & 0xFF) << 16) |
+    ((bytes(offset + 2) & 0xFF) << 8) |
+    (bytes(offset + 3) & 0xFF)
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/utils/VerifiableProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index d694ba9..a2ac55c 100644
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -38,10 +38,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
   /**
    * Read a required integer property value or throw an exception if no such property is found
    */
-  def getInt(name: String): Int = {
-    require(containsKey(name), "Missing required property '" + name + "'")
-    return getInt(name, -1)
-  }
+  def getInt(name: String): Int = getString(name).toInt
 
   def getIntInRange(name: String, range: (Int, Int)): Int = {
     require(containsKey(name), "Missing required property '" + name + "'")
@@ -92,10 +89,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
   /**
    * Read a required long property value or throw an exception if no such property is found
    */
-  def getLong(name: String): Long = {
-    require(containsKey(name), "Missing required property '" + name + "'")
-    return getLong(name, -1)
-  }
+  def getLong(name: String): Long = getString(name).toLong
 
   /**
    * Read an long from the properties instance
@@ -124,6 +118,26 @@ class VerifiableProperties(val props: Properties) extends Logging {
     require(v >= range._1 && v <= range._2, name + " has value " + v + " which is not in the range " + range + ".")
     v
   }
+  
+  /**
+   * Get a required argument as a double
+   * @param name The property name
+   * @return the value
+   * @throw IllegalArgumentException If the given property is not present
+   */
+  def getDouble(name: String): Double = getString(name).toDouble
+  
+  /**
+   * Get an optional argument as a double
+   * @param name The property name
+   * @default The default value for the property if not present
+   */
+  def getDouble(name: String, default: Double): Double = {
+    if(containsKey(name))
+      getDouble(name)
+    else
+      default
+  } 
 
   /**
    * Read a boolean value from the properties instance
@@ -140,6 +154,8 @@ class VerifiableProperties(val props: Properties) extends Logging {
       v.toBoolean
     }
   }
+  
+  def getBoolean(name: String) = getString(name).toBoolean
 
   /**
    * Get a string property, or, if no such property is defined, return the given default value
@@ -162,7 +178,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
   /**
    * Get a Map[String, String] from a property list in the form k1:v2, k2:v2, ...
    */
-  def getMap(name: String, valid: String => Boolean): Map[String, String] = {
+  def getMap(name: String, valid: String => Boolean = s => true): Map[String, String] = {
     try {
       val m = Utils.parseCsvMap(getString(name, ""))
       m.foreach {
@@ -189,4 +205,5 @@ class VerifiableProperties(val props: Properties) extends Logging {
   }
   
   override def toString(): String = props.toString
+ 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 55429af..c6e7a57 100644
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -34,15 +34,11 @@ object StressTestLog {
     val dir = TestUtils.tempDir()
     val time = new MockTime
     val log = new Log(dir = dir,
-                      scheduler = time.scheduler,
-                      maxSegmentSize = 64*1024*1024,
-                      maxMessageSize = Int.MaxValue, 
-                      flushInterval = Int.MaxValue, 
-                      rollIntervalMs = Long.MaxValue, 
+                      config = LogConfig(segmentSize = 64*1024*1024,
+                                         maxMessageSize = Int.MaxValue,
+                                         maxIndexSize = 1024*1024),
                       needsRecovery = false,
-                      maxIndexSize = 1024*1024,
-                      indexIntervalBytes = 4096,
-                      segmentDeleteDelayMs = 60000,
+                      scheduler = time.scheduler,
                       time = time)
     val writer = new WriterThread(log)
     writer.start()

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/other/kafka/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLogCleaning.scala b/core/src/test/scala/other/kafka/TestLogCleaning.scala
new file mode 100644
index 0000000..d9c721b
--- /dev/null
+++ b/core/src/test/scala/other/kafka/TestLogCleaning.scala
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import joptsimple.OptionParser
+import java.util.Properties
+import java.util.Random
+import java.io._
+import scala.io.Source
+import scala.io.BufferedSource
+import kafka.producer._
+import kafka.consumer._
+import kafka.serializer._
+import kafka.utils._
+
+/**
+ * This is a torture test that runs against an existing broker. Here is how it works:
+ * 
+ * It produces a series of specially formatted messages to one or more partitions. Each message it produces
+ * it logs out to a text file. The messages have a limited set of keys, so there is duplication in the key space.
+ * 
+ * The broker will clean its log as the test runs.
+ * 
+ * When the specified number of messages have been produced we create a consumer and consume all the messages in the topic
+ * and write that out to another text file.
+ * 
+ * Using a stable unix sort we sort both the producer log of what was sent and the consumer log of what was retrieved by the message key. 
+ * Then we compare the final message in both logs for each key. If this final message is not the same for all keys we
+ * print an error and exit with exit code 1, otherwise we print the size reduction and exit with exit code 0.
+ */
+object TestLogCleaning {
+
+  def main(args: Array[String]) {
+    val parser = new OptionParser
+    val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.")
+                               .withRequiredArg
+                               .describedAs("count")
+                               .ofType(classOf[java.lang.Long])
+                               .defaultsTo(Long.MaxValue)
+    val numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.")
+                           .withRequiredArg
+                           .describedAs("count")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(5)
+    val brokerOpt = parser.accepts("broker", "Url to connect to.")
+                          .withRequiredArg
+                          .describedAs("url")
+                          .ofType(classOf[String])
+    val topicsOpt = parser.accepts("topics", "The number of topics to test.")
+                          .withRequiredArg
+                          .describedAs("count")
+                          .ofType(classOf[java.lang.Integer])
+                          .defaultsTo(1)
+    val zkConnectOpt = parser.accepts("zk", "Zk url.")
+                             .withRequiredArg
+                             .describedAs("url")
+                             .ofType(classOf[String])
+    val sleepSecsOpt = parser.accepts("sleep", "Time to sleep between production and consumption.")
+                             .withRequiredArg
+                             .describedAs("ms")
+                             .ofType(classOf[java.lang.Integer])
+                             .defaultsTo(0)
+    val cleanupOpt = parser.accepts("cleanup", "Delete temp files when done.")
+    
+    val options = parser.parse(args:_*)
+    
+    if(!options.has(brokerOpt) || !options.has(zkConnectOpt) || !options.has(numMessagesOpt)) {
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+    
+    // parse options
+    val messages = options.valueOf(numMessagesOpt).longValue
+    val dups = options.valueOf(numDupsOpt).intValue
+    val brokerUrl = options.valueOf(brokerOpt)
+    val topicCount = options.valueOf(topicsOpt).intValue
+    val zkUrl = options.valueOf(zkConnectOpt)
+    val sleepSecs = options.valueOf(sleepSecsOpt).intValue
+    val cleanup = options.has(cleanupOpt)
+    
+    val testId = new Random().nextInt(Int.MaxValue)
+    val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray
+    
+    println("Producing %d messages...".format(messages))
+    val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, cleanup)
+    println("Sleeping for %d seconds...".format(sleepSecs))
+    Thread.sleep(sleepSecs * 1000)
+    println("Consuming messages...")
+    val consumedDataFile = consumeMessages(zkUrl, topics, cleanup)
+    
+    val producedLines = lineCount(producedDataFile)
+    val consumedLines = lineCount(consumedDataFile)
+    val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble
+    println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction))
+    
+    println("Validating output files...")
+    validateOutput(externalSort(producedDataFile), externalSort(consumedDataFile))
+    println("All done.")
+  }
+  
+  def lineCount(file: File): Int = io.Source.fromFile(file).getLines.size
+  
+  def validateOutput(produced: BufferedReader, consumed: BufferedReader) {
+    while(true) {
+      val prod = readFinalValue(produced)
+      val cons = readFinalValue(consumed)
+      if(prod == null && cons == null) {
+        return
+      } else if(prod != cons) {
+        System.err.println("Validation failed prod = %s, cons = %s!".format(prod, cons))
+        System.exit(1)
+      }
+    }
+  }
+  
+  def readFinalValue(reader: BufferedReader): (String, Int, Int) = {
+    def readTuple() = {
+      val line = reader.readLine
+      if(line == null)
+        null
+      else
+        line.split("\t")
+    }
+    var prev = readTuple()
+    if(prev == null)
+      return null
+    while(true) {
+      reader.mark(1024)
+      val curr = readTuple()
+      if(curr == null || curr(0) != prev(0) || curr(1) != prev(1)) {
+        reader.reset()
+        return (prev(0), prev(1).toInt, prev(2).toInt)
+      } else {
+        prev = curr
+      }
+    }
+    return null
+  }
+  
+  def externalSort(file: File): BufferedReader = {
+    val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", file.getAbsolutePath)
+    val process = builder.start()
+    new BufferedReader(new InputStreamReader(process.getInputStream()))
+  }
+  
+  def produceMessages(brokerUrl: String, 
+                      topics: Array[String], 
+                      messages: Long, 
+                      dups: Int, 
+                      cleanup: Boolean): File = {
+    val producerProps = new Properties
+    producerProps.setProperty("producer.type", "async")
+    producerProps.setProperty("broker.list", brokerUrl)
+    producerProps.setProperty("serializer.class", classOf[StringEncoder].getName)
+    producerProps.setProperty("key.serializer.class", classOf[StringEncoder].getName)
+    producerProps.setProperty("queue.enqueue.timeout.ms", "-1")
+    producerProps.setProperty("batch.size", 1000.toString)
+    val producer = new Producer[String, String](new ProducerConfig(producerProps))
+    val rand = new Random(1)
+    val keyCount = (messages / dups).toInt
+    val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt")
+    if(cleanup)
+      producedFile.deleteOnExit()
+    val producedWriter = new BufferedWriter(new FileWriter(producedFile), 1024*1024)
+    for(i <- 0L until (messages * topics.length)) {
+      val topic = topics((i % topics.length).toInt)
+      val key = rand.nextInt(keyCount)
+      producer.send(KeyedMessage(topic = topic, key = key.toString, message = i.toString))
+      producedWriter.write("%s\t%s\t%s\n".format(topic, key, i))
+    }
+    producedWriter.close()
+    producer.close()
+    producedFile
+  }
+  
+  def consumeMessages(zkUrl: String, topics: Array[String], cleanup: Boolean): File = {
+    val consumerProps = new Properties
+    consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
+    consumerProps.setProperty("zk.connect", zkUrl)
+    consumerProps.setProperty("consumer.timeout.ms", (5*1000).toString)
+    val connector = new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps))
+    val streams = connector.createMessageStreams(topics.map(topic => (topic, 1)).toMap, new StringDecoder, new StringDecoder)
+    val consumedFile = File.createTempFile("kafka-log-cleaner-consumed-", ".txt")
+    if(cleanup)
+      consumedFile.deleteOnExit()
+    val consumedWriter = new BufferedWriter(new FileWriter(consumedFile))
+    for(topic <- topics) {
+      val stream = streams(topic).head
+      try {
+        for(item <- stream)
+          consumedWriter.write("%s\t%s\t%s\n".format(topic, item.key, item.message))
+      } catch {
+        case e: ConsumerTimeoutException => 
+      }
+    }
+    consumedWriter.close()
+    connector.shutdown()
+    consumedFile
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/other/kafka/TestLogPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLogPerformance.scala b/core/src/test/scala/other/kafka/TestLogPerformance.scala
index a7b661a..d91011e 100644
--- a/core/src/test/scala/other/kafka/TestLogPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestLogPerformance.scala
@@ -34,7 +34,8 @@ object TestLogPerformance {
     val config = new KafkaConfig(props)
     val dir = TestUtils.tempDir()
     val scheduler = new KafkaScheduler(1)
-    val log = new Log(dir, scheduler, 50*1024*1024, config.messageMaxBytes, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, segmentDeleteDelayMs = 0, time = SystemTime)
+    val logConfig = LogConfig()
+    val log = new Log(dir, logConfig, needsRecovery = false, scheduler = scheduler, time = SystemTime)
     val bytes = new Array[Byte](messageSize)
     new java.util.Random().nextBytes(bytes)
     val message = new Message(bytes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
new file mode 100644
index 0000000..cce2319
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import junit.framework.Assert._
+import org.scalatest.junit.JUnitSuite
+import org.junit.{After, Before, Test}
+import java.nio._
+import java.io.File
+import scala.collection._
+import kafka.common._
+import kafka.utils._
+import kafka.message._
+
+/**
+ * Unit tests for the log cleaning logic
+ */
+class CleanerTest extends JUnitSuite {
+  
+  val dir = TestUtils.tempDir()
+  val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, dedupe=true)
+  val time = new MockTime()
+  val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
+  
+  @After
+  def teardown() {
+    Utils.rm(dir)
+  }
+  
+  /**
+   * Test simple log cleaning
+   */
+  @Test
+  def testCleanSegments() {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val log = makeLog(config = logConfig.copy(segmentSize = 1024))
+    
+    // append messages to the log until we have four segments
+    while(log.numberOfSegments < 4)
+      log.append(messages(log.logEndOffset.toInt, log.logEndOffset.toInt))
+    val keysFound = keysInLog(log)
+    assertEquals((0L until log.logEndOffset), keysFound)
+    
+    // pretend we have the following keys
+    val keys = immutable.ListSet(1, 3, 5, 7, 9)
+    val map = new FakeOffsetMap(Int.MaxValue)
+    keys.foreach(k => map.put(key(k), Long.MaxValue))
+    
+    // clean the log
+    cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0)
+    val shouldRemain = keysInLog(log).filter(!keys.contains(_))
+    assertEquals(shouldRemain, keysInLog(log))
+  }
+  
+  /* extract all the keys from a log */
+  def keysInLog(log: Log): Iterable[Int] = 
+    log.logSegments.flatMap(s => s.log.map(m => Utils.readString(m.message.key).toInt))
+  
+  
+  /**
+   * Test that a truncation during cleaning throws an OptimisticLockFailureException
+   */
+  @Test
+  def testCleanSegmentsWithTruncation() {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val log = makeLog(config = logConfig.copy(segmentSize = 1024))
+    
+    // append messages to the log until we have four segments
+    while(log.numberOfSegments < 2)
+      log.append(messages(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      
+    log.truncateTo(log.logEndOffset-2)
+    val keys = keysInLog(log)
+    val map = new FakeOffsetMap(Int.MaxValue)
+    keys.foreach(k => map.put(key(k), Long.MaxValue))
+    intercept[OptimisticLockFailureException] {
+      cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0)
+    }
+  }
+  
+  /**
+   * Validate the logic for grouping log segments together for cleaning
+   */
+  @Test
+  def testSegmentGrouping() {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val log = makeLog(config = logConfig.copy(segmentSize = 300, indexInterval = 1))
+    
+    // append some messages to the log
+    var i = 0
+    while(log.numberOfSegments < 10) {
+      log.append(TestUtils.singleMessageSet("hello".getBytes))
+      i += 1
+    }
+    
+    // grouping by very large values should result in a single group with all the segments in it
+    var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
+    assertEquals(1, groups.size)
+    assertEquals(log.numberOfSegments, groups(0).size)
+    checkSegmentOrder(groups)
+    
+    // grouping by very small values should result in all groups having one entry
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue)
+    assertEquals(log.numberOfSegments, groups.size)
+    assertTrue("All groups should be singletons.", groups.forall(_.size == 1))
+    checkSegmentOrder(groups)
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = 1)
+    assertEquals(log.numberOfSegments, groups.size)
+    assertTrue("All groups should be singletons.", groups.forall(_.size == 1))
+    checkSegmentOrder(groups)
+
+    val groupSize = 3
+    
+    // check grouping by log size
+    val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue)
+    checkSegmentOrder(groups)
+    assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
+    
+    // check grouping by index size
+    val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes()).sum + 1
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize)
+    checkSegmentOrder(groups)
+    assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
+  }
+  
+  private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]) {
+    val offsets = groups.flatMap(_.map(_.baseOffset))
+    assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets)
+  }
+  
+  /**
+   * Test building an offset map off the log
+   */
+  @Test
+  def testBuildOffsetMap() {
+    val map = new FakeOffsetMap(1000)
+    val log = makeLog()
+    val cleaner = makeCleaner(Int.MaxValue)
+    val start = 0
+    val end = 500
+    val offsets = writeToLog(log, (start until end) zip (start until end))
+    def checkRange(map: FakeOffsetMap, start: Int, end: Int) {
+      val endOffset = cleaner.buildOffsetMap(log, start, end, map) + 1
+      assertEquals("Last offset should be the end offset.", end, endOffset)
+      assertEquals("Should have the expected number of messages in the map.", end-start, map.size)
+      for(i <- start until end)
+        assertEquals("Should find all the keys", i.toLong, map.get(key(i)))
+      assertEquals("Should not find a value too small", -1L, map.get(key(start-1)))
+      assertEquals("Should not find a value too large", -1L, map.get(key(end)))
+    }
+    val segments = log.logSegments.toSeq
+    checkRange(map, 0, segments(1).baseOffset.toInt)
+    checkRange(map, segments(1).baseOffset.toInt, segments(3).baseOffset.toInt)
+    checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt)
+  }
+  
+  /**
+   * Test that we don't exceed the maximum capacity of the offset map, that is that an offset map
+   * with a max size of 1000 will only clean 1000 new entries even if more than that are available.
+   */
+  @Test
+  def testBuildOffsetMapOverCapacity() {
+    val map = new FakeOffsetMap(1000)
+    val log = makeLog()
+    val cleaner = makeCleaner(Int.MaxValue)
+    val vals = 0 until 1001
+    val offsets = writeToLog(log, vals zip vals)
+    val lastOffset = cleaner.buildOffsetMap(log, vals.start, vals.end, map)
+    assertEquals("Shouldn't go beyond the capacity of the offset map.", 1000, lastOffset)
+  }
+  
+  def makeLog(dir: File = dir, config: LogConfig = logConfig) =
+    new Log(dir = dir, config = config, needsRecovery = false, scheduler = time.scheduler, time = time)
+  
+  def makeCleaner(capacity: Int) = 
+    new Cleaner(id = 0, new FakeOffsetMap(capacity), ioBufferSize = 64*1024, maxIoBufferSize = 64*1024, throttler = throttler, time = time)
+  
+  def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
+    for((key, value) <- seq)
+      yield log.append(messages(key, value)).firstOffset
+  }
+  
+  def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
+  
+  def messages(key: Int, value: Int) = 
+    new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes))
+  
+}
+
+class FakeOffsetMap(val capacity: Int) extends OffsetMap {
+  val map = new java.util.HashMap[String, Long]()
+  
+  private def keyFor(key: ByteBuffer) = 
+    new String(Utils.readBytes(key.duplicate), "UTF-8")
+  
+  def put(key: ByteBuffer, offset: Long): Unit = 
+    map.put(keyFor(key), offset)
+  
+  def get(key: ByteBuffer): Long = {
+    val k = keyFor(key)
+    if(map.containsKey(k))
+      map.get(k)
+    else
+      -1L
+  }
+  
+  def clear() = map.clear()
+  
+  def size: Int = map.size
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
new file mode 100644
index 0000000..5a489f9
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import scala.collection._
+import org.junit._
+import kafka.common.TopicAndPartition
+import kafka.utils._
+import kafka.message._
+import org.scalatest.junit.JUnitSuite
+import junit.framework.Assert._
+
+/**
+ * This is an integration test that tests the fully integrated log cleaner
+ */
+class LogCleanerIntegrationTest extends JUnitSuite {
+  
+  val time = new MockTime()
+  val segmentSize = 100
+  val deleteDelay = 1000
+  val logName = "log"
+  val logDir = TestUtils.tempDir()
+  var counter = 0
+  val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2))
+  
+  @Test
+  def cleanerTest() {
+    val cleaner = makeCleaner(parts = 3)
+    val log = cleaner.logs.get(topics(0))
+
+    val appends = writeDups(numKeys = 100, numDups = 3, log)
+    val startSize = log.size
+    cleaner.startup()
+    
+    val lastCleaned = log.activeSegment.baseOffset
+    // wait until we clean up to base_offset of active segment - minDirtyMessages
+    cleaner.awaitCleaned("log", 0, lastCleaned)
+    
+    val read = readFromLog(log)
+    assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap)
+    assertTrue(startSize > log.size)
+    
+    // write some more stuff and validate again
+    val appends2 = appends ++ writeDups(numKeys = 100, numDups = 3, log)
+    val lastCleaned2 = log.activeSegment.baseOffset
+    cleaner.awaitCleaned("log", 0, lastCleaned2)
+    val read2 = readFromLog(log)
+    assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap)
+    
+    cleaner.shutdown()
+  }
+  
+  def readFromLog(log: Log): Iterable[(Int, Int)] = {
+    for(segment <- log.logSegments; message <- segment.log) yield {
+      val key = Utils.readString(message.message.key).toInt
+      val value = Utils.readString(message.message.payload).toInt
+      key -> value
+    }
+  }
+  
+  def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
+    for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
+      val count = counter
+      val appendInfo = log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true)
+      counter += 1
+      (key, count)
+    }
+  }
+    
+  @After
+  def teardown() {
+    Utils.rm(logDir)
+  }
+  
+  /* create a cleaner instance and logs with the given parameters */
+  def makeCleaner(parts: Int, 
+                  minDirtyMessages: Int = 0, 
+                  numThreads: Int = 1,
+                  defaultPolicy: String = "dedupe",
+                  policyOverrides: Map[String, String] = Map()): LogCleaner = {
+    
+    // create partitions and add them to the pool
+    val logs = new Pool[TopicAndPartition, Log]()
+    for(i <- 0 until parts) {
+      val dir = new File(logDir, "log-" + i)
+      dir.mkdirs()
+      val log = new Log(dir = dir,
+                        LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, dedupe = true),
+                        needsRecovery = false,
+                        scheduler = time.scheduler,
+                        time = time)
+      logs.put(TopicAndPartition("log", i), log)      
+    }
+  
+    new LogCleaner(CleanerConfig(numThreads = numThreads),
+                   logDirs = Array(logDir),
+                   logs = logs,
+                   time = time)
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index f48c709..fad3baa 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -29,21 +29,18 @@ class LogManagerTest extends JUnit3Suite {
 
   val time: MockTime = new MockTime()
   val maxRollInterval = 100
-  val maxLogAgeHours = 10
+  val maxLogAgeMs = 10*60*60*1000
+  val logConfig = LogConfig(segmentSize = 1024, maxIndexSize = 4096, retentionMs = maxLogAgeMs)
   var logDir: File = null
   var logManager: LogManager = null
-  var config: KafkaConfig = null
   val name = "kafka"
   val veryLargeLogFlushInterval = 10000000L
+  val cleanerConfig = CleanerConfig(enableCleaner = false)
 
   override def setUp() {
     super.setUp()
-    config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) {
-                   override val logSegmentBytes = 1024
-                   override val logFlushIntervalMessages = 10000
-                   override val logRetentionHours = maxLogAgeHours
-                 }
-    logManager = new LogManager(config, time.scheduler, time)
+    logDir = TestUtils.tempDir()
+    logManager = new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time)
     logManager.startup
     logDir = logManager.logDirs(0)
   }
@@ -62,7 +59,7 @@ class LogManagerTest extends JUnit3Suite {
   @Test
   def testCreateLog() {
     val log = logManager.getOrCreateLog(name, 0)
-    val logFile = new File(config.logDirs(0), name + "-0")
+    val logFile = new File(logDir, name + "-0")
     assertTrue(logFile.exists)
     log.append(TestUtils.singleMessageSet("test".getBytes()))
   }
@@ -74,7 +71,7 @@ class LogManagerTest extends JUnit3Suite {
   def testGetNonExistentLog() {
     val log = logManager.getLog(name, 0)
     assertEquals("No log should be found.", None, log)
-    val logFile = new File(config.logDirs(0), name + "-0")
+    val logFile = new File(logDir, name + "-0")
     assertTrue(!logFile.exists)
   }
 
@@ -94,9 +91,9 @@ class LogManagerTest extends JUnit3Suite {
     
     log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds))
     
-    time.sleep(maxLogAgeHours*60*60*1000 + 1)
+    time.sleep(maxLogAgeMs + 1)
     assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
-    time.sleep(log.segmentDeleteDelayMs + 1)
+    time.sleep(log.config.fileDeleteDelayMs + 1)
     assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes)
 
@@ -116,14 +113,10 @@ class LogManagerTest extends JUnit3Suite {
   @Test
   def testCleanupSegmentsToMaintainSize() {
     val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
-    val props = TestUtils.createBrokerConfig(0, -1)
     logManager.shutdown()
-    config = new KafkaConfig(props) {
-      override val logSegmentBytes = (10 * (setSize - 1)) // each segment will be 10 messages
-      override val logRetentionBytes = (5 * 10 * setSize + 10).asInstanceOf[Long]
-      override val logRollHours = maxRollInterval
-    }
-    logManager = new LogManager(config, time.scheduler, time)
+
+    val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L)
+    logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time)
     logManager.startup
 
     // create a log
@@ -138,13 +131,12 @@ class LogManagerTest extends JUnit3Suite {
       offset = info.firstOffset
     }
 
-    // should be exactly 100 full segments + 1 new empty one
-    assertEquals("Check we have the expected number of segments.", numMessages * setSize / config.logSegmentBytes, log.numberOfSegments)
+    assertEquals("Check we have the expected number of segments.", numMessages * setSize / config.segmentSize, log.numberOfSegments)
 
     // this cleanup shouldn't find any expired segments but should delete some to reduce size
     time.sleep(logManager.InitialTaskDelayMs)
     assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
-    time.sleep(log.segmentDeleteDelayMs + 1)
+    time.sleep(log.config.fileDeleteDelayMs + 1)
     assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes)
     try {
@@ -162,14 +154,9 @@ class LogManagerTest extends JUnit3Suite {
    */
   @Test
   def testTimeBasedFlush() {
-    val props = TestUtils.createBrokerConfig(0, -1)
     logManager.shutdown()
-    config = new KafkaConfig(props) {
-                   override val logFlushSchedulerIntervalMs = 1000
-                   override val logFlushIntervalMs = 1000
-                   override val logFlushIntervalMessages = Int.MaxValue
-                 }
-    logManager = new LogManager(config, time.scheduler, time)
+    val config = logConfig.copy(flushMs = 1000)
+    logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time)
     logManager.startup
     val log = logManager.getOrCreateLog(name, 0)
     val lastFlush = log.lastFlushTime
@@ -187,13 +174,11 @@ class LogManagerTest extends JUnit3Suite {
   @Test
   def testLeastLoadedAssignment() {
     // create a log manager with multiple data directories
-    val props = TestUtils.createBrokerConfig(0, -1)
-    val dirs = Seq(TestUtils.tempDir().getAbsolutePath, 
-                   TestUtils.tempDir().getAbsolutePath, 
-                   TestUtils.tempDir().getAbsolutePath)
-    props.put("log.dirs", dirs.mkString(","))
+    val dirs = Array(TestUtils.tempDir(), 
+                     TestUtils.tempDir(), 
+                     TestUtils.tempDir())
     logManager.shutdown()
-    logManager = new LogManager(new KafkaConfig(props), time.scheduler, time)
+    logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time)
     
     // verify that logs are always assigned to the least loaded partition
     for(partition <- 0 until 20) {
@@ -209,7 +194,7 @@ class LogManagerTest extends JUnit3Suite {
    */
   def testTwoLogManagersUsingSameDirFails() {
     try {
-      new LogManager(logManager.config, time.scheduler, time)
+      new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time)
       fail("Should not be able to create a second log manager instance with the same data directory")
     } catch {
       case e: KafkaException => // this is good 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index bffe4a4..a185ce4 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -161,6 +161,21 @@ class LogSegmentTest extends JUnit3Suite {
   }
   
   /**
+   * Test that we can change the file suffixes for the log and index files
+   */
+  @Test
+  def testChangeFileSuffixes() {
+    val seg = createSegment(40)
+    val logFile = seg.log.file
+    val indexFile = seg.index.file
+    seg.changeFileSuffixes("", ".deleted")
+    assertEquals(logFile.getAbsolutePath + ".deleted", seg.log.file.getAbsolutePath)
+    assertEquals(indexFile.getAbsolutePath + ".deleted", seg.index.file.getAbsolutePath)
+    assertTrue(seg.log.file.exists)
+    assertTrue(seg.index.file.exists)
+  }
+  
+  /**
    * Create a segment with some data and an index. Then corrupt the index,
    * and recover the segment, the entries should all be readable.
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 0fc74fa..23e0e65 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -34,6 +34,7 @@ class LogTest extends JUnitSuite {
   var logDir: File = null
   val time = new MockTime
   var config: KafkaConfig = null
+  val logConfig = LogConfig()
 
   @Before
   def setUp() {
@@ -61,12 +62,15 @@ class LogTest extends JUnitSuite {
   @Test
   def testTimeBasedLogRoll() {
     val set = TestUtils.singleMessageSet("test".getBytes())
-    val rollMs = 1 * 60 * 60L
     val time: MockTime = new MockTime()
 
     // create a log
-    val log = new Log(logDir, time.scheduler, 1000, config.messageMaxBytes, 1000, rollMs, needsRecovery = false, time = time)
-    time.sleep(rollMs + 1)
+    val log = new Log(logDir, 
+                      logConfig.copy(segmentMs = 1 * 60 * 60L), 
+                      needsRecovery = false, 
+                      scheduler = time.scheduler, 
+                      time = time)
+    time.sleep(log.config.segmentMs + 1)
 
     // segment age is less than its limit
     log.append(set)
@@ -76,13 +80,13 @@ class LogTest extends JUnitSuite {
     assertEquals("There should still be exactly one segment.", 1, log.numberOfSegments)
 
     for(numSegments <- 2 until 4) {
-      time.sleep(rollMs + 1)
+      time.sleep(log.config.segmentMs + 1)
       log.append(set)
       assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
     }
 
     val numSegments = log.numberOfSegments
-    time.sleep(rollMs + 1)
+    time.sleep(log.config.segmentMs + 1)
     log.append(new ByteBufferMessageSet())
     assertEquals("Appending an empty message set should not roll log even if succient time has passed.", numSegments, log.numberOfSegments)
   }
@@ -95,10 +99,10 @@ class LogTest extends JUnitSuite {
     val set = TestUtils.singleMessageSet("test".getBytes)
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
-    val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
+    val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
 
     // create a log
-    val log = new Log(logDir, time.scheduler, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), needsRecovery = false, time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -114,7 +118,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    val log = new Log(logDir, time.scheduler, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    val log = new Log(logDir, logConfig, needsRecovery = false, time.scheduler, time = time)
     log.append(TestUtils.singleMessageSet("test".getBytes))
   }
 
@@ -123,7 +127,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testAppendAndReadWithSequentialOffsets() {
-    val log = new Log(logDir, time.scheduler, 71, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = 71), needsRecovery = false, time.scheduler, time = time)
     val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
     
     for(i <- 0 until messages.length)
@@ -142,7 +146,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testAppendAndReadWithNonSequentialOffsets() {
-    val log = new Log(logDir, time.scheduler, 71, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = 71), needsRecovery = false, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val messages = messageIds.map(id => new Message(id.toString.getBytes))
     
@@ -165,7 +169,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testReadAtLogGap() {
-    val log = new Log(logDir, time.scheduler, 300, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = 300), needsRecovery = false, time.scheduler, time = time)
     
     // keep appending until we have two segments with only a single message in the second segment
     while(log.numberOfSegments == 1)
@@ -185,7 +189,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val log = new Log(logDir, time.scheduler, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = 1024), needsRecovery = false, time.scheduler, time = time)
     assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -208,7 +212,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val log = new Log(logDir, time.scheduler, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = 100), needsRecovery = false, time.scheduler, time = time)
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
     val offsets = messageSets.map(log.append(_).firstOffset)
@@ -232,8 +236,8 @@ class LogTest extends JUnitSuite {
   @Test
   def testCompressedMessages() {
     /* this log should roll after every messageset */
-    val log = new Log(logDir, time.scheduler, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
-
+    val log = new Log(logDir, logConfig.copy(segmentSize = 10), needsRecovery = false, time.scheduler, time = time)
+    
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
@@ -255,7 +259,7 @@ class LogTest extends JUnitSuite {
     for(messagesToAppend <- List(0, 1, 25)) {
       logDir.mkdirs()
       // first test a log segment starting at 0
-      val log = new Log(logDir, time.scheduler, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+      val log = new Log(logDir, logConfig.copy(segmentSize = 100), needsRecovery = false, time.scheduler, time = time)
       for(i <- 0 until messagesToAppend)
         log.append(TestUtils.singleMessageSet(i.toString.getBytes))
       
@@ -289,7 +293,7 @@ class LogTest extends JUnitSuite {
 
     // append messages to log
     val maxMessageSize = second.sizeInBytes - 1
-    val log = new Log(logDir, time.scheduler, 100, maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    val log = new Log(logDir, logConfig.copy(maxMessageSize = maxMessageSize), needsRecovery = false, time.scheduler, time = time)
 
     // should be able to append the small message
     log.append(first)
@@ -311,7 +315,8 @@ class LogTest extends JUnitSuite {
     val messageSize = 100
     val segmentSize = 7 * messageSize
     val indexInterval = 3 * messageSize
-    var log = new Log(logDir, time.scheduler, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+    val config = logConfig.copy(segmentSize = segmentSize, indexInterval = indexInterval, maxIndexSize = 4096)
+    var log = new Log(logDir, config, needsRecovery = false, time.scheduler, time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
     assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
@@ -319,15 +324,14 @@ class LogTest extends JUnitSuite {
     val numIndexEntries = log.activeSegment.index.entries
     log.close()
     
-    // test non-recovery case
-    log = new Log(logDir, time.scheduler, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+    log = new Log(logDir, config, needsRecovery = false, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
     assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
     assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
     log.close()
     
     // test recovery case
-    log = new Log(logDir, time.scheduler, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+    log = new Log(logDir, config, needsRecovery = true, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
     assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
     assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
@@ -341,7 +345,8 @@ class LogTest extends JUnitSuite {
   def testIndexRebuild() {
     // publish the messages and close the log
     val numMessages = 200
-    var log = new Log(logDir, time.scheduler, 200, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
+    val config = logConfig.copy(segmentSize = 200, indexInterval = 1)
+    var log = new Log(logDir, config, needsRecovery = true, time.scheduler, time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
     val indexFiles = log.logSegments.map(_.index.file)
@@ -351,8 +356,7 @@ class LogTest extends JUnitSuite {
     indexFiles.foreach(_.delete())
     
     // reopen the log
-    log = new Log(logDir, time.scheduler, 200, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
-    
+    log = new Log(logDir, config, needsRecovery = true, time.scheduler, time)    
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages)
       assertEquals(i, log.read(i, 100, None).head.offset)
@@ -367,10 +371,10 @@ class LogTest extends JUnitSuite {
     val set = TestUtils.singleMessageSet("test".getBytes())
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
-    val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
+    val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
 
     // create a log
-    val log = new Log(logDir, time.scheduler, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
+    val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), needsRecovery = false, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (i<- 1 to msgPerSeg)
@@ -421,8 +425,9 @@ class LogTest extends JUnitSuite {
     val set = TestUtils.singleMessageSet("test".getBytes())
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
-    val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
-    val log = new Log(logDir, time.scheduler, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
+    val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
+    val config = logConfig.copy(segmentSize = segmentSize)
+    val log = new Log(logDir, config, needsRecovery = false, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
     for (i<- 1 to msgPerSeg)
       log.append(set)
@@ -430,10 +435,10 @@ class LogTest extends JUnitSuite {
     for (i<- 1 to msgPerSeg)
       log.append(set)
     assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
-    assertEquals("The index of the first segment should be trim to empty", 0, log.logSegments.toList(0).index.maxEntries)
+    assertEquals("The index of the first segment should be trimmed to empty", 0, log.logSegments.toList(0).index.maxEntries)
     log.truncateTo(0)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
-    assertEquals("The index of segment 1 should be resized to maxIndexSize", log.maxIndexSize/8, log.logSegments.toList(0).index.maxEntries)
+    assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList(0).index.maxEntries)
     for (i<- 1 to msgPerSeg)
       log.append(set)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
@@ -449,12 +454,12 @@ class LogTest extends JUnitSuite {
     
     val set = TestUtils.singleMessageSet("test".getBytes())
     val log = new Log(logDir, 
+                      logConfig.copy(segmentSize = set.sizeInBytes * 5, 
+                                     maxIndexSize = 1000, 
+                                     indexInterval = 1),
+                      needsRecovery = false,
                       time.scheduler,
-                      maxSegmentSize = set.sizeInBytes * 5, 
-                      maxMessageSize = config.messageMaxBytes,
-                      maxIndexSize = 1000,
-                      indexIntervalBytes = 1, 
-                      needsRecovery = false)
+                      time)
     
     assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
     assertFalse("The second index file should have been deleted.", bogusIndex2.exists)
@@ -472,27 +477,26 @@ class LogTest extends JUnitSuite {
   @Test
   def testReopenThenTruncate() {
     val set = TestUtils.singleMessageSet("test".getBytes())
+    val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, 
+                                maxIndexSize = 1000, 
+                                indexInterval = 10000)
 
     // create a log
     var log = new Log(logDir, 
+                      config,
+                      needsRecovery = true,
                       time.scheduler,
-                      maxSegmentSize = set.sizeInBytes * 5, 
-                      maxMessageSize = config.messageMaxBytes,
-                      maxIndexSize = 1000,
-                      indexIntervalBytes = 10000, 
-                      needsRecovery = true)
+                      time)
     
     // add enough messages to roll over several segments then close and re-open and attempt to truncate
     for(i <- 0 until 100)
       log.append(set)
     log.close()
     log = new Log(logDir, 
+                  config,
+                  needsRecovery = true,
                   time.scheduler,
-                  maxSegmentSize = set.sizeInBytes * 5, 
-                  maxMessageSize = config.messageMaxBytes,
-                  maxIndexSize = 1000,
-                  indexIntervalBytes = 10000, 
-                  needsRecovery = true)
+                  time)
     log.truncateTo(3)
     assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
     assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
@@ -505,14 +509,15 @@ class LogTest extends JUnitSuite {
   def testAsyncDelete() {
     val set = TestUtils.singleMessageSet("test".getBytes())
     val asyncDeleteMs = 1000
-    val log = new Log(logDir, 
+    val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, 
+                                fileDeleteDelayMs = asyncDeleteMs, 
+                                maxIndexSize = 1000, 
+                                indexInterval = 10000)
+    val log = new Log(logDir,
+                      config,
+                      needsRecovery = true,                      
                       time.scheduler,
-                      maxSegmentSize = set.sizeInBytes * 5, 
-                      maxMessageSize = config.messageMaxBytes,
-                      maxIndexSize = 1000, 
-                      indexIntervalBytes = 10000, 
-                      segmentDeleteDelayMs = asyncDeleteMs,
-                      needsRecovery = true)
+                      time)
     
     // append some messages to create some segments
     for(i <- 0 until 100)
@@ -520,15 +525,20 @@ class LogTest extends JUnitSuite {
     
     // files should be renamed
     val segments = log.logSegments.toArray
+    val oldFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
     log.deleteOldSegments((s) => true)
     
     assertEquals("Only one segment should remain.", 1, log.numberOfSegments)
-    val renamed = segments.map(segment => new File(segment.log.file.getPath + Log.DeletedFileSuffix))
-    assertTrue("Files should all be renamed to .deleted.", renamed.forall(_.exists))
+    assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) && 
+                                                                 segments.forall(_.index.file.getName.endsWith(Log.DeletedFileSuffix)))
+    assertTrue("The .deleted files should still be there.", segments.forall(_.log.file.exists) &&
+                                                            segments.forall(_.index.file.exists))
+    assertTrue("The original file should be gone.", oldFiles.forall(!_.exists))
     
     // when enough time passes the files should be deleted
+    val deletedFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
     time.sleep(asyncDeleteMs + 1)
-    assertTrue("Files should all be gone.", renamed.forall(!_.exists))
+    assertTrue("Files should all be gone.", deletedFiles.forall(!_.exists))
   }
   
   /**
@@ -537,13 +547,12 @@ class LogTest extends JUnitSuite {
   @Test
   def testOpenDeletesObsoleteFiles() {
     val set = TestUtils.singleMessageSet("test".getBytes())
-    var log = new Log(logDir, 
+    val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, maxIndexSize = 1000)
+    var log = new Log(logDir,
+                      config,
+                      needsRecovery = false,
                       time.scheduler,
-                      maxSegmentSize = set.sizeInBytes * 5, 
-                      maxMessageSize = config.messageMaxBytes,
-                      maxIndexSize = 1000, 
-                      indexIntervalBytes = 10000, 
-                      needsRecovery = false)
+                      time)
     
     // append some messages to create some segments
     for(i <- 0 until 100)
@@ -553,12 +562,10 @@ class LogTest extends JUnitSuite {
     log.close()
     
     log = new Log(logDir, 
+                  config,
+                  needsRecovery = false,
                   time.scheduler,
-                  maxSegmentSize = set.sizeInBytes * 5, 
-                  maxMessageSize = config.messageMaxBytes,
-                  maxIndexSize = 1000, 
-                  indexIntervalBytes = 10000,
-                  needsRecovery = false)
+                  time)
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
new file mode 100644
index 0000000..99a0c4b
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.nio._
+import org.junit._
+import org.scalatest.junit.JUnitSuite
+import junit.framework.Assert._
+
+class OffsetMapTest extends JUnitSuite {
+  
+  @Test
+  def testBasicValidation() {
+    validateMap(10)
+    validateMap(100)
+    validateMap(1000)
+  }
+  
+  @Test
+  def testClear() {
+    val map = new SkimpyOffsetMap(4000, 0.75)
+    for(i <- 0 until 10)
+      map.put(key(i), i)
+    for(i <- 0 until 10)
+      assertEquals(i.toLong, map.get(key(i)))
+    map.clear()
+    for(i <- 0 until 10)
+      assertEquals(map.get(key(i)), -1L)
+  }
+  
+  @Test
+  def testCapacity() {
+    val map = new SkimpyOffsetMap(1024, 0.75)
+    var i = 0
+    while(map.size < map.capacity) {
+      map.put(key(i), i)
+      i += 1
+    }
+    // now the map is full, it should throw an exception
+    intercept[IllegalStateException] {
+      map.put(key(i), i)
+    }
+  }
+  
+  def key(key: Int) = ByteBuffer.wrap(key.toString.getBytes)
+  
+  def validateMap(items: Int) {
+    val map = new SkimpyOffsetMap(items * 2 * 24, 0.75)
+    for(i <- 0 until items)
+      map.put(key(i), i)
+    var misses = 0
+    for(i <- 0 until items) {
+      map.get(key(i)) match {
+        case -1L => misses += 1
+        case offset => assertEquals(i.toLong, offset) 
+      }
+    }
+    println("Miss rate: " + (misses.toDouble / items))
+  }
+  
+}
+
+object OffsetMapTest {
+  def main(args: Array[String]) {
+    if(args.length != 1) {
+      System.err.println("USAGE: java OffsetMapTest size")
+      System.exit(1)
+    }
+    val test = new OffsetMapTest()
+    test.validateMap(args(0).toInt)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index b609585..8a3e33b 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -16,13 +16,14 @@
 */
 package kafka.server
 
-import kafka.log.LogManager
+import kafka.log._
+import java.io.File
 import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
 import org.easymock.EasyMock
 import org.junit._
 import org.junit.Assert._
-import kafka.common.KafkaException
+import kafka.common._
 import kafka.cluster.Replica
 import kafka.utils._
 
@@ -30,7 +31,14 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
 
   val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_))
   val topic = "foo"
-  val logManagers = configs.map(config => new LogManager(config, new KafkaScheduler(1), new MockTime))
+  val logManagers = configs.map(config => new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
+                                                         topicConfigs = Map(),
+                                                         defaultConfig = LogConfig(),
+                                                         cleanerConfig = CleanerConfig(),
+                                                         flushCheckMs = 30000,
+                                                         retentionCheckMs = 30000,
+                                                         scheduler = new KafkaScheduler(1),
+                                                         time = new MockTime))
     
   @After
   def teardown() {
@@ -133,7 +141,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
   }
 
   def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = {
-    replicaManager.highWatermarkCheckpoints(replicaManager.config.logDirs(0)).read(topic, partition)
+    replicaManager.highWatermarkCheckpoints(replicaManager.config.logDirs(0)).read.getOrElse(TopicAndPartition(topic, partition), 0L)
   }
   
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index e3752cb..f857171 100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -205,7 +205,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     props.put("enable.zookeeper", "false")
     props.put("num.partitions", "20")
     props.put("log.retention.hours", "10")
-    props.put("log.cleanup.interval.mins", "5")
+    props.put("log.retention.check.interval.ms", (5*1000*60).toString)
     props.put("log.segment.bytes", logSize.toString)
     props.put("zk.connect", zkConnect.toString)
     props