You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2013/01/29 04:36:20 UTC

[3/3] git commit: KAFKA-631 Implement a log cleaner for Kafka. Reviewed by Neha.

Updated Branches:
  refs/heads/trunk 92f177b30 -> e7edb5e1e


KAFKA-631 Implement a log cleaner for Kafka. Reviewed by Neha.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e7edb5e1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e7edb5e1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e7edb5e1

Branch: refs/heads/trunk
Commit: e7edb5e1e933f5535378d546bcf4d8b178d2e69c
Parents: 92f177b
Author: Jay Kreps <ja...@gmail.com>
Authored: Mon Jan 28 19:31:17 2013 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Mon Jan 28 19:31:17 2013 -0800

----------------------------------------------------------------------
 config/log4j.properties                            |   15 +-
 config/server.properties                           |    1 +
 core/src/main/scala/kafka/cluster/Partition.scala  |    8 +-
 .../common/OptimisticLockFailureException.scala    |   23 +
 core/src/main/scala/kafka/log/CleanerConfig.scala  |   41 ++
 core/src/main/scala/kafka/log/FileMessageSet.scala |   30 +-
 core/src/main/scala/kafka/log/Log.scala            |  265 +++++---
 core/src/main/scala/kafka/log/LogCleaner.scala     |  557 +++++++++++++++
 core/src/main/scala/kafka/log/LogConfig.scala      |   51 ++
 core/src/main/scala/kafka/log/LogManager.scala     |  120 ++--
 core/src/main/scala/kafka/log/LogSegment.scala     |   24 +-
 core/src/main/scala/kafka/log/OffsetIndex.scala    |   33 +-
 core/src/main/scala/kafka/log/OffsetMap.scala      |  136 ++++
 .../scala/kafka/message/ByteBufferMessageSet.scala |    4 +-
 .../kafka/server/HighwaterMarkCheckpoint.scala     |  118 ---
 core/src/main/scala/kafka/server/KafkaConfig.scala |   33 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |   51 ++-
 .../main/scala/kafka/server/OffsetCheckpoint.scala |  103 +++
 .../main/scala/kafka/server/ReplicaManager.scala   |   14 +-
 core/src/main/scala/kafka/utils/FileLock.scala     |   94 ++--
 core/src/main/scala/kafka/utils/Logging.scala      |    5 +-
 core/src/main/scala/kafka/utils/Throttler.scala    |   26 +-
 core/src/main/scala/kafka/utils/Utils.scala        |   36 +-
 .../scala/kafka/utils/VerifiableProperties.scala   |   35 +-
 .../src/test/scala/other/kafka/StressTestLog.scala |   12 +-
 .../test/scala/other/kafka/TestLogCleaning.scala   |  216 ++++++
 .../scala/other/kafka/TestLogPerformance.scala     |    3 +-
 .../test/scala/unit/kafka/log/CleanerTest.scala    |  227 ++++++
 .../unit/kafka/log/LogCleanerIntegrationTest.scala |  117 +++
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   57 +-
 .../test/scala/unit/kafka/log/LogSegmentTest.scala |   15 +
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  139 ++--
 .../test/scala/unit/kafka/log/OffsetMapTest.scala  |   87 +++
 .../server/HighwatermarkPersistenceTest.scala      |   16 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala    |    2 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |   48 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |    2 -
 .../scala/unit/kafka/utils/MockScheduler.scala     |   32 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   11 +-
 .../test/scala/unit/kafka/utils/UtilsTest.scala    |   20 +
 40 files changed, 2280 insertions(+), 547 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index 00f891c..b36d3e0 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -36,6 +36,12 @@ log4j.appender.requestAppender.File=kafka-request.log
 log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
 log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 
+log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.cleanerAppender.File=log-cleaner.log
+log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
 # Turn on all our debugging info
 #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
 #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
@@ -44,13 +50,18 @@ log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
 log4j.logger.kafka=INFO
 
-log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender
+log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
 log4j.additivity.kafka.network.RequestChannel$=false
 
 #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
 #log4j.additivity.kafka.server.KafkaApis=false
-log4j.logger.kafka.request.logger=TRACE, requestAppender
+log4j.logger.kafka.request.logger=WARN, requestAppender
 log4j.additivity.kafka.request.logger=false
 
 log4j.logger.kafka.controller=TRACE, stateChangeAppender
 log4j.additivity.kafka.controller=false
+
+log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
+log4j.additivity.kafka.log.LogCleaner=false
+log4j.logger.kafka.log.Cleaner=INFO, cleanerAppender
+log4j.additivity.kafka.log.Cleaner=false

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 04408dd..5a16caf 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -114,3 +114,4 @@ kafka.csv.metrics.dir=/tmp/kafka_metrics
 # Disable csv reporting by default.
 kafka.csv.metrics.reporter.enabled=false
 
+log.cleanup.policy=delete

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 71eb980..af80631 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -23,7 +23,7 @@ import kafka.api.LeaderAndIsr
 import kafka.server.ReplicaManager
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ErrorMapping
+import kafka.common._
 import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
 
 
@@ -75,11 +75,11 @@ class Partition(val topic: String,
       case None =>
         if (isReplicaLocal(replicaId)) {
           val log = logManager.getOrCreateLog(topic, partitionId)
-          val offset = replicaManager.highWatermarkCheckpoints(log.dir.getParent).read(topic, partitionId).min(log.logEndOffset)
+          val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) 
+          val offset = checkpoint.read.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
           val localReplica = new Replica(replicaId, this, time, offset, Some(log))
           addReplicaIfNotExists(localReplica)
-        }
-        else {
+        } else {
           val remoteReplica = new Replica(replicaId, this, time)
           addReplicaIfNotExists(remoteReplica)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala b/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala
new file mode 100644
index 0000000..0e69110
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when an optimistic locking attempt receives concurrent modifications
+ */
+class OptimisticLockFailureException(message: String) extends RuntimeException(message)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/CleanerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/CleanerConfig.scala b/core/src/main/scala/kafka/log/CleanerConfig.scala
new file mode 100644
index 0000000..999fee8
--- /dev/null
+++ b/core/src/main/scala/kafka/log/CleanerConfig.scala
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+/**
+ * Configuration parameters for the log cleaner
+ * 
+ * @param numThreads The number of cleaner threads to run
+ * @param dedupeBufferSize The total memory used for log deduplication
+ * @param dedupeBufferLoadFactor The maximum percent full for the deduplication buffer
+ * @param maxMessageSize The maximum size of a message that can appear in the log
+ * @param maxIoBytesPerSecond The maximum read and write I/O that all cleaner threads are allowed to do
+ * @param backOffMs The amount of time to wait before rechecking if no logs are eligible for cleaning
+ * @param enableCleaner Allows completely disabling the log cleaner
+ * @param hashAlgorithm The hash algorithm to use in key comparison.
+ */
+case class CleanerConfig(val numThreads: Int = 1, 
+                         val dedupeBufferSize: Int = 4*1024*1024,
+                         val dedupeBufferLoadFactor: Double = 0.75,
+                         val ioBufferSize: Int = 1024*1024,
+                         val maxMessageSize: Int = 32*1024*1024,
+                         val maxIoBytesPerSecond: Double = Double.MaxValue,
+                         val backOffMs: Long = 60 * 1000,
+                         val enableCleaner: Boolean = true,
+                         val hashAlgorithm: String = "MD5") {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 37e8d87..a74abfe 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -38,7 +38,7 @@ import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
  * @param isSlice Should the start and end parameters be used for slicing?
  */
 @nonthreadsafe
-class FileMessageSet private[kafka](val file: File,
+class FileMessageSet private[kafka](@volatile var file: File,
                                     private[log] val channel: FileChannel,
                                     private[log] val start: Int,
                                     private[log] val end: Int,
@@ -223,14 +223,36 @@ class FileMessageSet private[kafka](val file: File,
    * Truncate this file message set to the given size in bytes. Note that this API does no checking that the 
    * given size falls on a valid message boundary.
    * @param targetSize The size to truncate to.
+   * @return The number of bytes truncated off
    */
-  def truncateTo(targetSize: Int) = {
-    if(targetSize > sizeInBytes || targetSize < 0)
+  def truncateTo(targetSize: Int): Int = {
+    val originalSize = sizeInBytes
+    if(targetSize > originalSize || targetSize < 0)
       throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
-                               " size of this log segment is " + sizeInBytes + " bytes.")
+                               " size of this log segment is " + originalSize + " bytes.")
     channel.truncate(targetSize)
     channel.position(targetSize)
     _size.set(targetSize)
+    originalSize - targetSize
+  }
+  
+  /**
+   * Read from the underlying file into the buffer starting at the given position
+   */
+  def readInto(buffer: ByteBuffer, position: Int): ByteBuffer = {
+    channel.read(buffer, position)
+    buffer.flip()
+    buffer
+  }
+  
+  /**
+   * Rename the file that backs this message set
+   * @return true iff the rename was successful
+   */
+  def renameTo(f: File): Boolean = {
+    val success = this.file.renameTo(f)
+    this.file = f
+    success
   }
   
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 5ea9489..ac12b74 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -49,15 +49,9 @@ import com.yammer.metrics.core.Gauge
  */
 @threadsafe
 class Log(val dir: File,
+          val config: LogConfig,
+          val needsRecovery: Boolean,
           val scheduler: Scheduler,
-          val maxSegmentSize: Int,
-          val maxMessageSize: Int, 
-          val flushInterval: Int = Int.MaxValue,
-          val rollIntervalMs: Long = Long.MaxValue, 
-          val needsRecovery: Boolean, 
-          val maxIndexSize: Int = (10*1024*1024),
-          val indexIntervalBytes: Int = 4096,
-          val segmentDeleteDelayMs: Long = 60000,
           time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
 
   import kafka.log.Log._
@@ -73,6 +67,9 @@ class Log(val dir: File,
 
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[Long,LogSegment] = loadSegments()
+  
+  /* The number of times the log has been truncated */
+  private val truncates = new AtomicInteger(0)
     
   /* Calculate the offset of the next message */
   private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset())
@@ -90,58 +87,82 @@ class Log(val dir: File,
   private def loadSegments(): ConcurrentNavigableMap[Long, LogSegment] = {
     // open all the segments read-only
     val logSegments = new ConcurrentSkipListMap[Long, LogSegment]
-    val ls = dir.listFiles()
-    if(ls != null) {
-      for(file <- ls if file.isFile) {
-        if(!file.canRead)
-          throw new IOException("Could not read file " + file)
-        val filename = file.getName
-        if(filename.endsWith(DeletedFileSuffix)) {
-          // if the file ends in .deleted, delete it
-          val deleted = file.delete()
-          if(!deleted)
-            warn("Attempt to delete defunct segment file %s failed.".format(filename))
-        }  else if(filename.endsWith(IndexFileSuffix)) {
-          // if it is an index file, make sure it has a corresponding .log file
-          val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
-          if(!logFile.exists) {
-            warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
-            file.delete()
-          }
-        } else if(filename.endsWith(LogFileSuffix)) {
-          // if its a log file, load the corresponding log segment
-          val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
-          val hasIndex = Log.indexFilename(dir, start).exists
-          val segment = new LogSegment(dir = dir, 
-                                       startOffset = start,
-                                       indexIntervalBytes = indexIntervalBytes, 
-                                       maxIndexSize = maxIndexSize)
-          if(!hasIndex) {
-            // this can only happen if someone manually deletes the index file
-            error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
-            segment.recover(maxMessageSize)
-          }
-          logSegments.put(start, segment)
+
+    // create the log directory if it doesn't exist
+    dir.mkdirs()
+    
+    // first do a pass through the files in the log directory and remove any temporary files 
+    // and complete any interrupted swap operations
+    for(file <- dir.listFiles if file.isFile) {
+      if(!file.canRead)
+        throw new IOException("Could not read file " + file)
+      val filename = file.getName
+      if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
+        // if the file ends in .deleted or .cleaned, delete it
+        file.delete()
+      } else if(filename.endsWith(SwapFileSuffix)) {
+        // we crashed in the middle of a swap operation, to recover:
+        // if a log, swap it in and delete the .index file
+        // if an index just delete it, it will be rebuilt
+        val baseName = new File(Utils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
+        if(baseName.getPath.endsWith(IndexFileSuffix)) {
+          file.delete()
+        } else if(baseName.getPath.endsWith(LogFileSuffix)){
+          // delete the index
+          val index = new File(Utils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
+          index.delete()
+          // complete the swap operation
+          val renamed = file.renameTo(baseName)
+          if(renamed)
+            info("Found log file %s from interrupted swap operation, repairing.".format(file.getPath))
+          else
+            throw new KafkaException("Failed to rename file %s.".format(file.getPath))
         }
       }
     }
 
+    // now do a second pass and load all the .log and .index files
+    for(file <- dir.listFiles if file.isFile) {
+      val filename = file.getName
+      if(filename.endsWith(IndexFileSuffix)) {
+        // if it is an index file, make sure it has a corresponding .log file
+        val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
+        if(!logFile.exists) {
+          warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
+          file.delete()
+        }
+      } else if(filename.endsWith(LogFileSuffix)) {
+        // if its a log file, load the corresponding log segment
+        val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
+        val hasIndex = Log.indexFilename(dir, start).exists
+        val segment = new LogSegment(dir = dir, 
+                                     startOffset = start,
+                                     indexIntervalBytes = config.indexInterval, 
+                                     maxIndexSize = config.maxIndexSize)
+        if(!hasIndex) {
+          error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
+          segment.recover(config.maxMessageSize)
+        }
+        logSegments.put(start, segment)
+      }
+    }
+
     if(logSegments.size == 0) {
       // no existing segments, create a new mutable segment beginning at offset 0
       logSegments.put(0,
                       new LogSegment(dir = dir, 
                                      startOffset = 0,
-                                     indexIntervalBytes = indexIntervalBytes, 
-                                     maxIndexSize = maxIndexSize))
+                                     indexIntervalBytes = config.indexInterval, 
+                                     maxIndexSize = config.maxIndexSize))
     } else {
       // reset the index size of the currently active log segment to allow more entries
       val active = logSegments.lastEntry.getValue
-      active.index.resize(maxIndexSize)
+      active.index.resize(config.maxIndexSize)
 
       // run recovery on the active segment if necessary
       if(needsRecovery) {
         info("Recovering active segment of %s.".format(name))
-        active.recover(maxMessageSize)
+        active.recover(config.maxMessageSize)
       }
     }
     logSegments
@@ -152,6 +173,11 @@ class Log(val dir: File,
    * Take care! this is an O(n) operation.
    */
   def numberOfSegments: Int = segments.size
+  
+  /**
+   * The number of truncates that have occurred since the log was opened.
+   */
+  def numberOfTruncates: Int = truncates.get
 
   /**
    * Close this log
@@ -194,22 +220,24 @@ class Log(val dir: File,
         val segment = maybeRoll()
           
         if(assignOffsets) {
-           // assign offsets to the messageset
+          // assign offsets to the messageset
           appendInfo.firstOffset = nextOffset.get
-          val offsetCounter = new AtomicLong(nextOffset.get)
-          validMessages = validMessages.assignOffsets(offsetCounter, appendInfo.codec)
-          appendInfo.lastOffset = offsetCounter.get - 1
+          val offset = new AtomicLong(nextOffset.get)
+          validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
+          appendInfo.lastOffset = offset.get - 1
         } else {
           // we are taking the offsets we are given
           if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get)
-              throw new IllegalArgumentException("Out of order offsets found in " + messages)
+            throw new IllegalArgumentException("Out of order offsets found in " + messages)
         }
 
         // now append to the log
         trace("Appending message set to %s with offsets %d to %d.".format(name, appendInfo.firstOffset, appendInfo.lastOffset))
         segment.append(appendInfo.firstOffset, validMessages)
+        
+        // increment the log end offset
         nextOffset.set(appendInfo.lastOffset + 1)
-
+          
         // maybe flush the log and index
         maybeFlush(appendInfo.count)
         
@@ -263,8 +291,8 @@ class Log(val dir: File,
       // check the validity of the message by checking CRC and message size
       val m = messageAndOffset.message
       m.ensureValid()
-      if(MessageSet.entrySize(m) > maxMessageSize)
-        throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(m), maxMessageSize))
+      if(MessageSet.entrySize(m) > config.maxMessageSize)
+        throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(m), config.maxMessageSize))
       
       messageCount += 1;
       
@@ -372,18 +400,21 @@ class Log(val dir: File,
    */
   private def maybeRoll(): LogSegment = {
     val segment = activeSegment
-    if (segment.size > maxSegmentSize) {
-      info("Rolling %s due to full data log".format(name))
-      roll()
-    } else if (segment.size > 0 && time.milliseconds - segment.created > rollIntervalMs) {
-      info("Rolling %s due to time based rolling".format(name))
-      roll()
-    } else if (segment.index.isFull) {
-      info("Rolling %s due to full index maxIndexSize = %d, entries = %d, maxEntries = %d"
-        .format(name, segment.index.maxIndexSize, segment.index.entries(), segment.index.maxEntries))
+    if (segment.size > config.segmentSize || 
+        segment.size > 0 && time.milliseconds - segment.created > config.segmentMs ||
+        segment.index.isFull) {
+      debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."
+            .format(name,
+                    segment.size,
+                    config.segmentSize,
+                    segment.index.entries,
+                    segment.index.maxEntries,
+                    time.milliseconds - segment.created,
+                    config.segmentMs))
       roll()
-    } else
+    } else {
       segment
+    }
   }
   
   /**
@@ -412,11 +443,11 @@ class Log(val dir: File,
       }
       val segment = new LogSegment(dir, 
                                    startOffset = newOffset,
-                                   indexIntervalBytes = indexIntervalBytes, 
-                                   maxIndexSize = maxIndexSize)
-      val prev = segments.put(segment.baseOffset, segment)
+                                   indexIntervalBytes = config.indexInterval, 
+                                   maxIndexSize = config.maxIndexSize)
+      val prev = addSegment(segment)
       if(prev != null)
-        throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(dir.getName, newOffset))
+        throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
       segment
     }
   }
@@ -426,12 +457,12 @@ class Log(val dir: File,
    * @param numberOfMessages The number of messages that are being appended
    */
   private def maybeFlush(numberOfMessages : Int) {
-    if(unflushed.addAndGet(numberOfMessages) >= flushInterval)
+    if(unflushed.addAndGet(numberOfMessages) >= config.flushInterval)
       flush()
   }
 
   /**
-   * Flush this log file and assoicated index to the physical disk
+   * Flush this log file and associated index to the physical disk
    */
   def flush() : Unit = {
     if (unflushed.get == 0)
@@ -475,6 +506,7 @@ class Log(val dir: File,
         activeSegment.truncateTo(targetOffset)
         this.nextOffset.set(targetOffset)
       }
+      truncates.getAndIncrement
     }
   }
     
@@ -487,12 +519,12 @@ class Log(val dir: File,
     lock synchronized {
       val segmentsToDelete = logSegments.toList
       segmentsToDelete.foreach(deleteSegment(_))
-      segments.put(newOffset, 
-                   new LogSegment(dir, 
-                                  newOffset,
-                                  indexIntervalBytes = indexIntervalBytes, 
-                                  maxIndexSize = maxIndexSize))
+      addSegment(new LogSegment(dir, 
+                                newOffset,
+                                indexIntervalBytes = config.indexInterval, 
+                                maxIndexSize = config.maxIndexSize))
       this.nextOffset.set(newOffset)
+      truncates.getAndIncrement
     }
   }
 
@@ -511,7 +543,13 @@ class Log(val dir: File,
    */
   def logSegments: Iterable[LogSegment] = asIterable(segments.values)
   
-  override def toString() = "Log(" + this.dir + ")"
+  /**
+   * Get all segments beginning with the segment that includes "from" and ending with the segment
+   * that includes up to "to-1" or the end of the log (if to > logEndOffset)
+   */
+  def logSegments(from: Long, to: Long) = asIterable(segments.subMap(from, true, to, false).values)
+  
+  override def toString() = "Log(" + dir + ")"
   
   /**
    * This method performs an asynchronous log segment delete by doing the following:
@@ -526,37 +564,82 @@ class Log(val dir: File,
    * @param segment The log segment to schedule for deletion
    */
   private def deleteSegment(segment: LogSegment) {
-    info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, dir.getName))
+    info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, name))
     lock synchronized {
       segments.remove(segment.baseOffset)
-      val deletedLog = new File(segment.log.file.getPath + Log.DeletedFileSuffix)
-      val deletedIndex = new File(segment.index.file.getPath + Log.DeletedFileSuffix)
-      val renamedLog = segment.log.file.renameTo(deletedLog)
-      val renamedIndex = segment.index.file.renameTo(deletedIndex)
-      if(!renamedLog && segment.log.file.exists)
-        throw new KafkaStorageException("Failed to rename file %s to %s for log %s.".format(segment.log.file.getPath, deletedLog.getPath, name))
-      if(!renamedIndex && segment.index.file.exists)
-        throw new KafkaStorageException("Failed to rename file %s to %s for log %s.".format(segment.index.file.getPath, deletedIndex.getPath, name))
-      def asyncDeleteFiles() {
-        info("Deleting log segment %s for log %s.".format(segment.baseOffset, name))
-        if(!deletedLog.delete())
-          warn("Failed to delete log segment file %s for log %s.".format(deletedLog.getPath, name))
-        if(!deletedIndex.delete())
-          warn("Failed to delete index segment file %s for log %s.".format(deletedLog.getPath, name))
-      }
-      scheduler.schedule("delete-log-segment", asyncDeleteFiles, delay = segmentDeleteDelayMs)
+      asyncDeleteSegment(segment)
+    }
+  }
+  
+  /**
+   * Perform an asynchronous delete on the given file if it exists (otherwise do nothing)
+   * @throws KafkaStorageException if the file can't be renamed and still exists 
+   */
+  private def asyncDeleteSegment(segment: LogSegment) {
+    segment.changeFileSuffixes("", Log.DeletedFileSuffix)
+    def deleteSeg() {
+      info("Deleting segment %d from log %s.".format(segment.baseOffset, name))
+      segment.delete()
     }
+    scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
+  }
+  
+  /**
+   * Swap a new segment in place and delete one or more existing segments in a crash-safe manner. The old segments will
+   * be asynchronously deleted.
+   * 
+   * @param newSegment The new log segment to add to the log
+   * @param oldSegments The old log segments to delete from the log
+   */
+  private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], expectedTruncates: Int) {
+    lock synchronized {
+      if(expectedTruncates != numberOfTruncates)
+        throw new OptimisticLockFailureException("The log has been truncated, expected %d but found %d.".format(expectedTruncates, numberOfTruncates))
+      // need to do this in two phases to be crash safe AND do the delete asynchronously
+      // if we crash in the middle of this we complete the swap in loadSegments()
+      newSegment.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)
+      addSegment(newSegment)
+        
+      // delete the old files
+      for(seg <- oldSegments) {
+        // remove the index entry
+        if(seg.baseOffset != newSegment.baseOffset)
+          segments.remove(seg.baseOffset)
+        // delete segment
+        asyncDeleteSegment(seg)
+      }
+      // okay we are safe now, remove the swap suffix
+      newSegment.changeFileSuffixes(Log.SwapFileSuffix, "")
+    }  
   }
   
+  /**
+   * Add the given segment to the segments in this log. If this segment replaces an existing segment, delete it.
+   * @param segment The segment to add
+   */
+  def addSegment(segment: LogSegment) = this.segments.put(segment.baseOffset, segment)
+  
 }
 
 /**
  * Helper functions for logs
  */
 object Log {
+  
+  /** a log file */
   val LogFileSuffix = ".log"
+    
+  /** an index file */
   val IndexFileSuffix = ".index"
+    
+  /** a file that is scheduled to be deleted */
   val DeletedFileSuffix = ".deleted"
+    
+  /** A temporary file that is being used for log cleaning */
+  val CleanedFileSuffix = ".cleaned"
+    
+  /** A temporary file used when swapping files into the log */
+  val SwapFileSuffix = ".swap"
 
   /**
    * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
new file mode 100644
index 0000000..368a12b
--- /dev/null
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -0,0 +1,557 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import scala.collection._
+import scala.math
+import java.nio._
+import java.util.concurrent.Semaphore
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic._
+import java.io.File
+import kafka.common._
+import kafka.message._
+import kafka.server.OffsetCheckpoint
+import kafka.utils._
+
+/**
+ * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy.
+ * A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'.
+ * 
+ * Each log can be thought of being split into two sections of segments: a "clean" section which has previously been cleaned followed by a
+ * "dirty" section that has not yet been cleaned. The active log segment is always excluded from cleaning.
+ *
+ * The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "dedupe" retention policy 
+ * and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log. 
+ * 
+ * To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. For memory efficiency this mapping
+ * is approximate. That is allowed to lose some key=>offset pairs, but never to return a wrong answer. See kafka.log.OffsetMap for details of
+ * the implementation of the mapping. 
+ * 
+ * Once the key=>offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a 
+ * higher offset than what is found in the segment (i.e. messages with a key that appears in the dirty section of the log).
+ * 
+ * To avoid segments shrinking to very small sizes with repeated cleanings we implement a rule by which if we will merge successive segments when
+ * doing a cleaning if their log and index size are less than the maximum log and index size prior to the clean beginning.
+ * 
+ * Cleaned segments are swapped into the log as they become available.
+ * 
+ * One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted.
+ * 
+ * @param config Configuration parameters for the cleaner
+ * @param logDirs The directories where offset checkpoints reside
+ * @param logs The pool of logs
+ * @param time A way to control the passage of time
+ */
+class LogCleaner(val config: CleanerConfig,
+                 val logDirs: Array[File],
+                 val logs: Pool[TopicAndPartition, Log], 
+                 time: Time = SystemTime) extends Logging {
+  
+  /* the offset checkpoints holding the last cleaned point for each log */
+  private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap
+  
+  /* the set of logs currently being cleaned */
+  private val inProgress = mutable.HashSet[TopicAndPartition]()
+  
+  /* a global lock used to control all access to the in-progress set and the offset checkpoints */
+  private val lock = new Object
+    
+  /* a counter for creating unique thread names*/
+  private val threadId = new AtomicInteger(0)
+  
+  /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
+  private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, 
+                                        checkIntervalMs = 300, 
+                                        throttleDown = true, 
+                                        time = time)
+  
+  /* the threads */
+  private val cleaners = (0 until config.numThreads).map(_ => new CleanerThread())
+  
+  /* a hook for testing to synchronize on log cleaning completions */
+  private val cleaned = new Semaphore(0)
+  
+  /**
+   * Start the background cleaning
+   */
+  def startup() {
+    info("Starting the log cleaner")
+    cleaners.foreach(_.start())
+  }
+  
+  /**
+   * Stop the background cleaning
+   */
+  def shutdown() {
+    info("Shutting down the log cleaner.")
+    cleaners.foreach(_.interrupt())
+    cleaners.foreach(_.join())
+  }
+  
+  /**
+   * For testing, a way to know when work has completed. This method blocks until the 
+   * cleaner has processed up to the given offset on the specified topic/partition
+   */
+  def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit = {
+    while(!allCleanerCheckpoints.contains(TopicAndPartition(topic, part)))
+      cleaned.tryAcquire(timeout, TimeUnit.MILLISECONDS)
+  }
+  
+  /**
+   * @return the position processed for all logs.
+   */
+  def allCleanerCheckpoints(): Map[TopicAndPartition, Long] = 
+    checkpoints.values.flatMap(_.read()).toMap
+  
+   /**
+    * Choose the log to clean next and add it to the in-progress set. We recompute this
+    * every time off the full set of logs to allow logs to be dynamically added to the pool of logs
+    * the log manager maintains.
+    */
+  private def grabFilthiestLog(): Option[LogToClean] = {
+    lock synchronized {
+      val lastClean = allCleanerCheckpoints()      
+      val cleanableLogs = logs.filter(l => l._2.config.dedupe)                                     // skip any logs marked for delete rather than dedupe
+                              .filterNot(l => inProgress.contains(l._1))                           // skip any logs already in-progress
+                              .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0)))      // create a LogToClean instance for each
+      val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0)                                  // must have some bytes
+                                   .filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio
+      if(dirtyLogs.isEmpty) {
+        None
+      } else {
+        val filthiest = dirtyLogs.max
+        inProgress += filthiest.topicPartition
+        Some(filthiest)
+      }
+    }
+  }
+  
+  /**
+   * Save out the endOffset and remove the given log from the in-progress set.
+   */
+  private def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) {
+    lock synchronized {
+      val checkpoint = checkpoints(dataDir)
+      val offsets = checkpoint.read() + ((topicAndPartition, endOffset))
+      checkpoint.write(offsets)
+      inProgress -= topicAndPartition
+    }
+    cleaned.release()
+  }
+
+  /**
+   * The cleaner threads do the actual log cleaning. Each thread processes does its cleaning repeatedly by
+   * choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments.
+   */
+  private class CleanerThread extends Thread {
+    val cleaner = new Cleaner(id = threadId.getAndIncrement(),
+                              offsetMap = new SkimpyOffsetMap(memory = config.dedupeBufferSize / config.numThreads, 
+                                                              maxLoadFactor = config.dedupeBufferLoadFactor, 
+                                                              hashAlgorithm = config.hashAlgorithm),
+                              ioBufferSize = config.ioBufferSize / config.numThreads / 2,
+                              maxIoBufferSize = config.maxMessageSize,
+                              throttler = throttler,
+                              time = time)
+    
+    setName("kafka-log-cleaner-thread-" + cleaner.id)
+    setDaemon(false)
+
+    /**
+     * The main loop for the cleaner thread
+     */
+    override def run() {
+      info("Starting cleaner thread %d...".format(cleaner.id))
+      try {
+        while(!isInterrupted) {
+          cleanOrSleep()
+        }
+      } catch {
+        case e: InterruptedException => // all done
+        case e: Exception =>
+          error("Error in cleaner thread %d:".format(cleaner.id), e)
+      }
+      info("Shutting down cleaner thread %d.".format(cleaner.id))
+    }
+    
+    /**
+     * Clean a log if there is a dirty log available, otherwise sleep for a bit
+     */
+    private def cleanOrSleep() {
+      grabFilthiestLog() match {
+        case None =>
+          // there are no cleanable logs, sleep a while
+          time.sleep(config.backOffMs)
+        case Some(cleanable) =>
+          // there's a log, clean it
+          var endOffset = cleanable.firstDirtyOffset
+          try {
+            endOffset = cleaner.clean(cleanable)
+            logStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleaner.stats)
+          } catch {
+            case e: OptimisticLockFailureException => 
+              info("Cleaning of log was aborted due to colliding truncate operation.")
+          } finally {
+            doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
+          }
+      }
+    }
+    
+    /**
+     * Log out statistics on a single run of the cleaner.
+     */
+    def logStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) {
+      def mb(bytes: Double) = bytes / (1024*1024)
+      val message = 
+        "%n\tLog cleaner %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + 
+        "\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead), 
+                                                                                stats.elapsedSecs, 
+                                                                                mb(stats.bytesRead/stats.elapsedSecs)) + 
+        "\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead), 
+                                                                                           stats.elapsedIndexSecs, 
+                                                                                           mb(stats.mapBytesRead)/stats.elapsedIndexSecs, 
+                                                                                           100 * stats.elapsedIndexSecs.toDouble/stats.elapsedSecs) + 
+        "\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead), 
+                                                                                           stats.elapsedSecs - stats.elapsedIndexSecs, 
+                                                                                           mb(stats.bytesRead)/(stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs).toDouble/stats.elapsedSecs) + 
+        "\tStart size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesRead), stats.messagesRead) +
+        "\tEnd size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesWritten), stats.messagesWritten) + 
+        "\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead), 
+                                                                   100.0 * (1.0 - stats.messagesWritten.toDouble/stats.messagesRead))
+      info(message)
+    }
+   
+  }
+}
+
+/**
+ * This class holds the actual logic for cleaning a log
+ * @param id An identifier used for logging
+ * @param offsetMap The map used for deduplication
+ * @param bufferSize The size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer.
+ * @param throttler The throttler instance to use for limiting I/O rate.
+ * @param time The time instance
+ */
+private[log] class Cleaner(val id: Int,
+                           offsetMap: OffsetMap,
+                           ioBufferSize: Int,
+                           maxIoBufferSize: Int,
+                           throttler: Throttler,
+                           time: Time) extends Logging {
+
+  this.logIdent = "Cleaner " + id + ":"
+  val stats = new CleanerStats(time)
+  private var readBuffer = ByteBuffer.allocate(ioBufferSize) // buffer for disk read I/O
+  private var writeBuffer = ByteBuffer.allocate(ioBufferSize) // buffer for disk write I/O
+
+  /**
+   * Clean the given log
+   *
+   * @param cleanable The log to be cleaned
+   *
+   * @return The first offset not cleaned
+   */
+  private[log] def clean(cleanable: LogToClean): Long = {
+    stats.clear()
+    val topic = cleanable.topicPartition.topic
+    val part = cleanable.topicPartition.partition
+    info("Beginning cleaning of %s-%d.".format(topic, part))
+    val log = cleanable.log
+    val truncateCount = log.numberOfTruncates
+    
+    // build the offset map
+    val upperBoundOffset = math.min(log.activeSegment.baseOffset, cleanable.firstDirtyOffset + offsetMap.capacity)
+    val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + 1
+    stats.indexDone()
+    
+    // group the segments and clean the groups
+    for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) {
+      info("Cleaning segments %s for log %s...".format(group.map(_.baseOffset).mkString(","), log.name))
+      cleanSegments(log, group, offsetMap, truncateCount)
+    }
+    stats.allDone()
+    endOffset
+  }
+
+  /**
+   * Clean a group of segments into a single replacement segment
+   *
+   * @param log The log being cleaned
+   * @param segments The group of segments being cleaned
+   * @param map The offset map to use for cleaning segments
+   * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet
+   */
+  private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, expectedTruncateCount: Int) {
+    // create a new segment with the suffix .cleaned appended to both the log and index name
+    val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
+    logFile.delete()
+    val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
+    indexFile.delete()
+    val messages = new FileMessageSet(logFile)
+    val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
+    val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, time)
+
+    // clean segments into the new destination segment
+    for (old <- segments)
+      cleanInto(old, cleaned, map)
+      
+    // trim excess index
+    index.trimToValidSize()
+    
+    // flush new segment to disk before swap
+    cleaned.flush()
+
+    // swap in new segment  
+    info("Swapping in cleaned segment %d for %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name))
+    try {
+      log.replaceSegments(cleaned, segments, expectedTruncateCount)
+    } catch {
+      case e: OptimisticLockFailureException =>
+        cleaned.delete()
+        throw e  
+    }
+  }
+
+  /**
+   * Clean the given source log segment into the destination segment using the key=>offset mapping
+   * provided
+   *
+   * @param source The dirty log segment
+   * @param dest The cleaned log segment
+   * @param map The key=>offset mapping
+   *
+   * TODO: Implement proper compression support
+   */
+  private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap) {
+    var position = 0
+    while (position < source.log.sizeInBytes) {
+      checkDone()
+      // read a chunk of messages and copy any that are to be retained to the write buffer to be written out
+      readBuffer.clear()
+      writeBuffer.clear()
+      val messages = new ByteBufferMessageSet(source.log.readInto(readBuffer, position))
+      throttler.maybeThrottle(messages.sizeInBytes)
+      // check each message to see if it is to be retained
+      var messagesRead = 0
+      for (entry <- messages) {
+        messagesRead += 1
+        val size = MessageSet.entrySize(entry.message)
+        position += size
+        stats.readMessage(size)
+        val key = entry.message.key
+        require(key != null, "Found null key in log segment %s which is marked as dedupe.".format(source.log.file.getAbsolutePath))
+        val lastOffset = map.get(key)
+        /* retain the record if it isn't present in the map OR it is present but this offset is the highest (and it's not a delete) */
+        val retainRecord = lastOffset < 0 || (entry.offset >= lastOffset && entry.message.payload != null)
+        if (retainRecord) {
+          ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
+          stats.recopyMessage(size)
+        }
+      }
+      // if any messages are to be retained, write them out
+      if(writeBuffer.position > 0) {
+        writeBuffer.flip()
+        val retained = new ByteBufferMessageSet(writeBuffer)
+        dest.append(retained.head.offset, retained)
+        throttler.maybeThrottle(writeBuffer.limit)
+      }
+      
+      // if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again
+      if(readBuffer.limit > 0 && messagesRead == 0)
+        growBuffers()
+    }
+    restoreBuffers()
+  }
+  
+  /**
+   * Double the I/O buffer capacity
+   */
+  def growBuffers() {
+    if(readBuffer.capacity >= maxIoBufferSize || writeBuffer.capacity >= maxIoBufferSize)
+      throw new IllegalStateException("This log contains a message larger than maximum allowable size of %s.".format(maxIoBufferSize))
+    val newSize = math.min(this.readBuffer.capacity * 2, maxIoBufferSize)
+    info("Growing cleaner I/O buffers from " + readBuffer.capacity + "bytes to " + newSize + " bytes.")
+    this.readBuffer = ByteBuffer.allocate(newSize)
+    this.writeBuffer = ByteBuffer.allocate(newSize)
+  }
+  
+  /**
+   * Restore the I/O buffer capacity to its original size
+   */
+  def restoreBuffers() {
+    if(this.readBuffer.capacity > this.ioBufferSize)
+      this.readBuffer = ByteBuffer.allocate(this.ioBufferSize)
+    if(this.writeBuffer.capacity > this.ioBufferSize)
+      this.writeBuffer = ByteBuffer.allocate(this.ioBufferSize)
+  }
+
+  /**
+   * Group the segments in a log into groups totaling less than a given size. the size is enforced separately for the log data and the index data.
+   * We collect a group of such segments together into a single
+   * destination segment. This prevents segment sizes from shrinking too much.
+   *
+   * @param segments The log segments to group
+   * @param maxSize the maximum size in bytes for the total of all log data in a group
+   * @param maxIndexSize the maximum size in bytes for the total of all index data in a group
+   *
+   * @return A list of grouped segments
+   */
+  private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = {
+    var grouped = List[List[LogSegment]]()
+    var segs = segments.toList
+    while(!segs.isEmpty) {
+      var group = List(segs.head)
+      var logSize = segs.head.size
+      var indexSize = segs.head.index.sizeInBytes
+      segs = segs.tail
+      while(!segs.isEmpty &&
+            logSize + segs.head.size < maxSize &&
+            indexSize + segs.head.index.sizeInBytes < maxIndexSize) {
+        group = segs.head :: group
+        logSize += segs.head.size
+        indexSize += segs.head.index.sizeInBytes
+        segs = segs.tail
+      }
+      grouped ::= group.reverse
+    }
+    grouped.reverse
+  }
+
+  /**
+   * Build a map of key_hash => offset for the keys in the dirty portion of the log to use in cleaning.
+   * @param log The log to use
+   * @param start The offset at which dirty messages begin
+   * @param end The ending offset for the map that is being built
+   * @param map The map in which to store the mappings
+   *
+   * @return The final offset the map covers
+   */
+  private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap): Long = {
+    map.clear()
+    val segments = log.logSegments(start, end)
+    info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, segments.size, start, end))
+    var offset = segments.head.baseOffset
+    require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name))
+    for (segment <- segments) {
+      checkDone()
+      offset = buildOffsetMap(segment, map)
+    }
+    info("Offset map for log %s complete.".format(log.name))
+    offset
+  }
+
+  /**
+   * Add the messages in the given segment to the offset map
+   *
+   * @param segment The segment to index
+   * @param map The map in which to store the key=>offset mapping
+   *
+   * @return The final offset covered by the map
+   */
+  private def buildOffsetMap(segment: LogSegment, map: OffsetMap): Long = {
+    var position = 0
+    var offset = segment.baseOffset
+    while (position < segment.log.sizeInBytes) {
+      checkDone()
+      readBuffer.clear()
+      val messages = new ByteBufferMessageSet(segment.log.readInto(readBuffer, position))
+      throttler.maybeThrottle(messages.sizeInBytes)
+      val startPosition = position
+      for (entry <- messages) {
+        val message = entry.message
+        require(message.hasKey)
+        val size = MessageSet.entrySize(message)
+        position += size
+        map.put(message.key, entry.offset)
+        offset = entry.offset
+        stats.indexMessage(size)
+      }
+      // if we didn't read even one complete message, our read buffer may be too small
+      if(position == startPosition)
+        growBuffers()
+    }
+    restoreBuffers()
+    offset
+  }
+
+  /**
+   * If we aren't running any more throw an AllDoneException
+   */
+  private def checkDone() {
+    if (Thread.currentThread.isInterrupted)
+      throw new InterruptedException
+  }
+}
+
+/**
+ * A simple struct for collecting stats about log cleaning
+ */
+private case class CleanerStats(time: Time = SystemTime) {
+  var startTime, mapCompleteTime, endTime, bytesRead, bytesWritten, mapBytesRead, mapMessagesRead, messagesRead, messagesWritten = 0L
+  clear()
+  
+  def readMessage(size: Int) {
+    messagesRead += 1
+    bytesRead += size
+  }
+  
+  def recopyMessage(size: Int) {
+    messagesWritten += 1
+    bytesWritten += size
+  }
+  
+  def indexMessage(size: Int) {
+    mapMessagesRead += 1
+    mapBytesRead += size
+  }
+  
+  def indexDone() {
+    mapCompleteTime = time.milliseconds
+  }
+  
+  def allDone() {
+    endTime = time.milliseconds
+  }
+  
+  def elapsedSecs = (endTime - startTime)/1000.0
+  
+  def elapsedIndexSecs = (mapCompleteTime - startTime)/1000.0
+  
+  def clear() {
+    startTime = time.milliseconds
+    mapCompleteTime = -1L
+    endTime = -1L
+    bytesRead = 0L
+    bytesWritten = 0L
+    mapBytesRead = 0L
+    mapMessagesRead = 0L
+    messagesRead = 0L
+    messagesWritten = 0L
+  }
+}
+
+/**
+ * Helper class for a log, its topic/partition, and the last clean position
+ */
+private case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long) extends Ordered[LogToClean] {
+  val cleanBytes = log.logSegments(-1, firstDirtyOffset-1).map(_.size).sum
+  val dirtyBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, log.activeSegment.baseOffset)).map(_.size).sum
+  val cleanableRatio = dirtyBytes / totalBytes.toDouble
+  def totalBytes = cleanBytes + dirtyBytes
+  override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
new file mode 100644
index 0000000..5a10bef
--- /dev/null
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import scala.collection._
+import kafka.common._
+
+/**
+ * Configuration settings for a log
+ * @param segmentSize The soft maximum for the size of a segment file in the log
+ * @param segmentMs The soft maximum on the amount of time before a new log segment is rolled
+ * @param flushInterval The number of messages that can be written to the log before a flush is forced
+ * @param flushMs The amount of time the log can have dirty data before a flush is forced
+ * @param retentionSize The approximate total number of bytes this log can use
+ * @param retentionMs The age approximate maximum age of the last segment that is retained
+ * @param maxIndexSize The maximum size of an index file
+ * @param indexInterval The approximate number of bytes between index entries
+ * @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem
+ * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
+ * @param dedupe Should old segments in this log be deleted or deduplicated?
+ */
+case class LogConfig(val segmentSize: Int = 1024*1024, 
+                     val segmentMs: Long = Long.MaxValue,
+                     val flushInterval: Long = Long.MaxValue, 
+                     val flushMs: Long = Long.MaxValue,
+                     val retentionSize: Long = Long.MaxValue,
+                     val retentionMs: Long = Long.MaxValue,
+                     val maxMessageSize: Int = Int.MaxValue,
+                     val maxIndexSize: Int = 1024*1024,
+                     val indexInterval: Int = 4096,
+                     val fileDeleteDelayMs: Long = 60*1000,
+                     val minCleanableRatio: Double = 0.5,
+                     val dedupe: Boolean = false)
+                      
+                     
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index c5ab8a2..438d802 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -36,32 +36,31 @@ import kafka.server.KafkaConfig
  * A background thread handles log retention by periodically truncating excess log segments.
  */
 @threadsafe
-class LogManager(val config: KafkaConfig,
+class LogManager(val logDirs: Array[File],
+                 val topicConfigs: Map[String, LogConfig],
+                 val defaultConfig: LogConfig,
+                 val cleanerConfig: CleanerConfig,
+                 val flushCheckMs: Long,
+                 val retentionCheckMs: Long,
                  scheduler: Scheduler,
                  private val time: Time) extends Logging {
 
   val CleanShutdownFile = ".kafka_cleanshutdown"
   val LockFile = ".lock"
   val InitialTaskDelayMs = 30*1000
-  val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray
-  private val logFileSizeMap = config.logSegmentBytesPerTopicMap
-  private val logFlushInterval = config.logFlushIntervalMessages
-  private val logFlushIntervals = config.logFlushIntervalMsPerTopicMap
   private val logCreationLock = new Object
-  private val logRetentionSizeMap = config.logRetentionBytesPerTopicMap
-  private val logRetentionMsMap = config.logRetentionHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
-  private val logRollMsMap = config.logRollHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
-  private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config.logRollHours
-  private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMins
-  private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config.logRetentionHours
-
-  this.logIdent = "[Log Manager on Broker " + config.brokerId + "] "
   private val logs = new Pool[TopicAndPartition, Log]()
   
   createAndValidateLogDirs(logDirs)
   private var dirLocks = lockLogDirs(logDirs)
   loadLogs(logDirs)
   
+  private val cleaner: LogCleaner = 
+    if(cleanerConfig.enableCleaner)
+      new LogCleaner(cleanerConfig, logDirs, logs, time = time)
+    else
+      null
+  
   /**
    * Create and check validity of the given directories, specifically:
    * <ol>
@@ -114,18 +113,11 @@ class LogManager(val config: KafkaConfig,
           if(dir.isDirectory){
             info("Loading log '" + dir.getName + "'")
             val topicPartition = parseTopicPartitionName(dir.getName)
-            val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
-            val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logSegmentBytes)
-            val log = new Log(dir,
+            val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
+            val log = new Log(dir, 
+                              config,
+                              needsRecovery,
                               scheduler,
-                              maxLogFileSize,
-                              config.messageMaxBytes,
-                              logFlushInterval, 
-                              rollIntervalMs, 
-                              needsRecovery, 
-                              config.logIndexSizeMaxBytes,
-                              config.logIndexIntervalBytes,
-                              config.logDeleteDelayMs,
                               time)
             val previous = this.logs.put(topicPartition, log)
             if(previous != null)
@@ -142,20 +134,41 @@ class LogManager(val config: KafkaConfig,
   def startup() {
     /* Schedule the cleanup task to delete old logs */
     if(scheduler != null) {
-      info("Starting log cleanup with a period of %d ms.".format(logCleanupIntervalMs))
-      scheduler.schedule("kafka-log-cleaner", 
+      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
+      scheduler.schedule("kafka-log-retention", 
                          cleanupLogs, 
                          delay = InitialTaskDelayMs, 
-                         period = logCleanupIntervalMs, 
+                         period = retentionCheckMs, 
                          TimeUnit.MILLISECONDS)
-      info("Starting log flusher with a default period of %d ms with the following overrides: %s."
-          .format(config.logFlushIntervalMs, logFlushIntervals.map(e => e._1.toString + "=" + e._2.toString).mkString(", ")))
+      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
       scheduler.schedule("kafka-log-flusher", 
                          flushDirtyLogs, 
                          delay = InitialTaskDelayMs, 
-                         period = config.logFlushSchedulerIntervalMs,
+                         period = flushCheckMs, 
                          TimeUnit.MILLISECONDS)
     }
+    if(cleanerConfig.enableCleaner)
+      cleaner.startup()
+  }
+  
+  /**
+   * Close all the logs
+   */
+  def shutdown() {
+    debug("Shutting down.")
+    try {
+      // stop the cleaner first
+      if(cleaner != null)
+        Utils.swallow(cleaner.shutdown())
+      // close the logs
+      allLogs.foreach(_.close())
+      // mark that the shutdown was clean by creating the clean shutdown marker file
+      logDirs.foreach(dir => Utils.swallow(new File(dir, CleanShutdownFile).createNewFile()))
+    } finally {
+      // regardless of whether the close succeeded, we need to unlock the data directories
+      dirLocks.foreach(_.destroy())
+    }
+    debug("Shutdown complete.")
   }
   
   /**
@@ -197,18 +210,10 @@ class LogManager(val config: KafkaConfig,
       val dataDir = nextLogDir()
       val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
       dir.mkdirs()
-      val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
-      val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logSegmentBytes)
       log = new Log(dir, 
+                    defaultConfig,
+                    needsRecovery = false,
                     scheduler,
-                    maxLogFileSize, 
-                    config.messageMaxBytes,
-                    logFlushInterval, 
-                    rollIntervalMs, 
-                    needsRecovery = false, 
-                    config.logIndexSizeMaxBytes,
-                    config.logIndexIntervalBytes, 
-                    config.logDeleteDelayMs,
                     time)
       info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath))
       logs.put(topicAndPartition, log)
@@ -242,8 +247,7 @@ class LogManager(val config: KafkaConfig,
   private def cleanupExpiredSegments(log: Log): Int = {
     val startMs = time.milliseconds
     val topic = parseTopicPartitionName(log.name).topic
-    val logCleanupThresholdMs = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
-    log.deleteOldSegments(startMs - _.lastModified > logCleanupThresholdMs)
+    log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
   }
 
   /**
@@ -252,10 +256,9 @@ class LogManager(val config: KafkaConfig,
    */
   private def cleanupSegmentsToMaintainSize(log: Log): Int = {
     val topic = parseTopicPartitionName(log.dir.getName).topic
-    val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionBytes)
-    if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize)
+    if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
       return 0
-    var diff = log.size - maxLogRetentionSize
+    var diff = log.size - log.config.retentionSize
     def shouldDelete(segment: LogSegment) = {
       if(diff - segment.size >= 0) {
         diff -= segment.size
@@ -274,7 +277,7 @@ class LogManager(val config: KafkaConfig,
     debug("Beginning log cleanup...")
     var total = 0
     val startMs = time.milliseconds
-    for(log <- allLogs) {
+    for(log <- allLogs; if !log.config.dedupe) {
       debug("Garbage collecting '" + log.name + "'")
       total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
     }
@@ -283,23 +286,6 @@ class LogManager(val config: KafkaConfig,
   }
 
   /**
-   * Close all the logs
-   */
-  def shutdown() {
-    debug("Shutting down.")
-    try {
-      // close the logs
-      allLogs.foreach(_.close())
-      // mark that the shutdown was clean by creating the clean shutdown marker file
-      logDirs.foreach(dir => Utils.swallow(new File(dir, CleanShutdownFile).createNewFile()))
-    } finally {
-      // regardless of whether the close succeeded, we need to unlock the data directories
-      dirLocks.foreach(_.destroy())
-    }
-    debug("Shutdown complete.")
-  }
-
-  /**
    * Get all the partition logs
    */
   def allLogs(): Iterable[Log] = logs.values
@@ -312,13 +298,9 @@ class LogManager(val config: KafkaConfig,
     for ((topicAndPartition, log) <- logs) {
       try {
         val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
-        
-        var logFlushInterval = config.logFlushIntervalMs
-        if(logFlushIntervals.contains(topicAndPartition.topic))
-          logFlushInterval = logFlushIntervals(topicAndPartition.topic)
-        debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval  " + logFlushInterval +
+        debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval  " + log.config.flushMs +
               " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
-        if(timeSinceLastFlush >= logFlushInterval)
+        if(timeSinceLastFlush >= log.config.flushMs)
           log.flush
       } catch {
         case e =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 39dd9c2..120ebeb 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -79,7 +79,7 @@ class LogSegment(val log: FileMessageSet,
    * @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.
    */
   @threadsafe
-  private def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
+  private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
     val mapping = index.lookup(offset)
     log.searchFor(offset, max(mapping.position, startingFilePosition))
   }
@@ -168,18 +168,20 @@ class LogSegment(val log: FileMessageSet,
    * Truncate off all index and log entries with offsets >= the given offset.
    * If the given offset is larger than the largest message in this segment, do nothing.
    * @param offset The offset to truncate to
+   * @return The number of log bytes truncated
    */
   @nonthreadsafe
-  def truncateTo(offset: Long) {
+  def truncateTo(offset: Long): Int = {
     val mapping = translateOffset(offset)
     if(mapping == null)
-      return
+      return 0
     index.truncateTo(offset)
     // after truncation, reset and allocate more space for the (new currently  active) index
     index.resize(index.maxIndexSize)
-    log.truncateTo(mapping.position)
-    if (log.sizeInBytes == 0)
+    val bytesTruncated = log.truncateTo(mapping.position)
+    if(log.sizeInBytes == 0)
       created = time.milliseconds
+    bytesTruncated
   }
   
   /**
@@ -211,6 +213,18 @@ class LogSegment(val log: FileMessageSet,
   }
   
   /**
+   * Change the suffix for the index and log file for this log segment
+   */
+  def changeFileSuffixes(oldSuffix: String, newSuffix: String) {
+    val logRenamed = log.renameTo(new File(Utils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
+    if(!logRenamed)
+      throw new KafkaStorageException("Failed to change the log file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset))
+    val indexRenamed = index.renameTo(new File(Utils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix)))
+    if(!indexRenamed)
+      throw new KafkaStorageException("Failed to change the index file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset))
+  }
+  
+  /**
    * Close this log segment
    */
   def close() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 1662f10..23e659f 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -49,7 +49,7 @@ import kafka.utils._
  * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal 
  * storage format.
  */
-class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
+class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
   
   /* initialize the memory mapping for this index */
   private var mmap: MappedByteBuffer = 
@@ -83,20 +83,15 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
         Utils.swallow(raf.close())
       }
     }
-
-  info("Created index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d"
-    .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset))
-
-  /**
-   * The maximum number of eight-byte entries this index can hold
-   */
-  def maxEntries = mmap.limit / 8
   
   /* the number of eight-byte entries currently in the index */
   private var size = new AtomicInteger(mmap.position / 8)
   
   /* the last offset in the index */
   var lastOffset = readLastOffset()
+  
+  info("Created index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d"
+       .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset))
 
   /**
    * The last offset written to the index
@@ -109,6 +104,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
       }
     baseOffset + offset
   }
+  
+  /**
+   * The maximum number of eight-byte entries this index can hold
+   */
+  def maxEntries = mmap.limit / 8
 
   /**
    * Find the largest offset less than or equal to the given targetOffset 
@@ -284,12 +284,27 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
   /** The number of entries in this index */
   def entries() = size.get
   
+  /**
+   * The number of bytes actually used by this index
+   */
+  def sizeInBytes() = 8 * entries
+  
   /** Close the index */
   def close() {
     trimToValidSize()
   }
   
   /**
+   * Rename the file that backs this offset index
+   * @return true iff the rename was successful
+   */
+  def renameTo(f: File): Boolean = {
+    val success = this.file.renameTo(f)
+    this.file = f
+    success
+  }
+  
+  /**
    * Round a number to the greatest exact multiple of the given factor less than the given number.
    * E.g. roundToExactMultiple(67, 8) == 64
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/OffsetMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala
new file mode 100644
index 0000000..6236813
--- /dev/null
+++ b/core/src/main/scala/kafka/log/OffsetMap.scala
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.util.Arrays
+import java.security.MessageDigest
+import java.nio.ByteBuffer
+import kafka.utils._
+
+trait OffsetMap {
+  def capacity: Int
+  def put(key: ByteBuffer, offset: Long)
+  def get(key: ByteBuffer): Long
+  def clear()
+  def size: Int
+  def utilization: Double = size.toDouble / capacity
+}
+
+/**
+ * An approximate map used for deduplicating the log.
+ * @param memory The amount of memory this map can use
+ * @param maxLoadFactor The maximum percent full this offset map can be
+ * @param hashAlgorithm The hash algorithm instance to use: MD2, MD5, SHA-1, SHA-256, SHA-384, SHA-512
+ */
+@nonthreadsafe
+class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgorithm: String = "MD5") extends OffsetMap {
+  private val bytes = ByteBuffer.allocate(memory)
+  
+  /* the hash algorithm instance to use, defualt is MD5 */
+  private val digest = MessageDigest.getInstance(hashAlgorithm)
+  
+  /* the number of bytes for this hash algorithm */
+  private val hashSize = digest.getDigestLength
+  
+  /* create some hash buffers to avoid reallocating each time */
+  private val hash1 = new Array[Byte](hashSize)
+  private val hash2 = new Array[Byte](hashSize)
+  
+  /* number of entries put into the map */
+  private var entries = 0
+  
+  /* a byte added as a prefix to all keys to make collisions non-static in repeated uses. Changed in clear(). */
+  private var salt: Byte = 0
+  
+  /**
+   * The number of bytes of space each entry uses (the number of bytes in the hash plus an 8 byte offset)
+   */
+  val bytesPerEntry = hashSize + 8
+  
+  /**
+   * The maximum number of entries this map can contain before it exceeds the max load factor
+   */
+  override val capacity: Int = (maxLoadFactor * memory / bytesPerEntry).toInt
+  
+  /**
+   * Associate a offset with a key.
+   * @param key The key
+   * @param offset The offset
+   */
+  override def put(key: ByteBuffer, offset: Long) {
+    if(size + 1 > capacity)
+      throw new IllegalStateException("Attempt to add to a full offset map with a maximum capacity of %d.".format(capacity))
+    hash(key, hash1)
+    bytes.position(offsetFor(hash1))
+    bytes.put(hash1)
+    bytes.putLong(offset)
+    entries += 1
+  }
+  
+  /**
+   * Get the offset associated with this key. This method is approximate,
+   * it may not find an offset previously stored, but cannot give a wrong offset.
+   * @param key The key
+   * @return The offset associated with this key or -1 if the key is not found
+   */
+  override def get(key: ByteBuffer): Long = {
+    hash(key, hash1)
+    bytes.position(offsetFor(hash1))
+    bytes.get(hash2)
+    // if the computed hash equals the stored hash return the stored offset
+    if(Arrays.equals(hash1, hash2))
+      bytes.getLong()
+    else
+      -1L
+  }
+  
+  /**
+   * Change the salt used for key hashing making all existing keys unfindable.
+   * Doesn't actually zero out the array.
+   */
+  override def clear() {
+    this.entries = 0
+    this.salt = (this.salt + 1).toByte
+    Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit, 0.toByte)
+  }
+  
+  /**
+   * The number of entries put into the map (note that not all may remain)
+   */
+  override def size: Int = entries
+  
+  /**
+   * Choose a slot in the array for this hash
+   */
+  private def offsetFor(hash: Array[Byte]): Int = 
+    bytesPerEntry * (Utils.abs(Utils.readInt(hash, 0)) % capacity)
+  
+  /**
+   * The offset at which we have stored the given key
+   * @param key The key to hash
+   * @param buffer The buffer to store the hash into
+   */
+  private def hash(key: ByteBuffer, buffer: Array[Byte]) {
+    key.mark()
+    digest.update(salt)
+    digest.update(key)
+    key.reset()
+    digest.digest(buffer, 0, hashSize)
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 873699f..1b9c8f8 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -73,7 +73,7 @@ object ByteBufferMessageSet {
     new ByteBufferMessageSet(outputBuffer)
   }
     
-  private def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {
+  private[kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {
     buffer.putLong(offset)
     buffer.putInt(message.size)
     buffer.put(message.buffer)
@@ -150,7 +150,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message
           return allDone()
         val offset = topIter.getLong()
         val size = topIter.getInt()
-        if(size < 0)
+        if(size < 0 || size < Message.MinHeaderSize)
           throw new InvalidMessageException("Message found with corrupt size (" + size + ")")
         
         // we have an incomplete message

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
deleted file mode 100644
index 5aa0141..0000000
--- a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import kafka.utils.Logging
-import kafka.common._
-import java.util.concurrent.locks.ReentrantLock
-import java.io._
-
-/**
- * This class handles the read/write to the highwaterMark checkpoint file. The file stores the high watermark value for
- * all topics and partitions that this broker hosts. The format of this file is as follows -
- * version
- * number of entries
- * topic partition highwatermark
- */
-
-object HighwaterMarkCheckpoint {
-  val highWatermarkFileName = "replication-offset-checkpoint"
-  val currentHighwaterMarkFileVersion = 0
-}
-
-class HighwaterMarkCheckpoint(val path: String) extends Logging {
-  /* create the highwatermark file handle for all partitions */
-  val name = path + "/" + HighwaterMarkCheckpoint.highWatermarkFileName
-  private val hwFile = new File(name)
-  private val hwFileLock = new ReentrantLock()
-  // recover from previous tmp file, if required
-
-  def write(highwaterMarksPerPartition: Map[TopicAndPartition, Long]) {
-    hwFileLock.lock()
-    try {
-      // write to temp file and then swap with the highwatermark file
-      val tempHwFile = new File(hwFile + ".tmp")
-
-      val hwFileWriter = new BufferedWriter(new FileWriter(tempHwFile))
-      // checkpoint highwatermark for all partitions
-      // write the current version
-      hwFileWriter.write(HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion.toString)
-      hwFileWriter.newLine()
-      // write the number of entries in the highwatermark file
-      hwFileWriter.write(highwaterMarksPerPartition.size.toString)
-      hwFileWriter.newLine()
-
-      highwaterMarksPerPartition.foreach { partitionAndHw =>
-        hwFileWriter.write("%s %s %s".format(partitionAndHw._1.topic, partitionAndHw._1.partition, partitionAndHw._2))
-        hwFileWriter.newLine()
-      }
-      hwFileWriter.flush()
-      hwFileWriter.close()
-      // swap new high watermark file with previous one
-      if(!tempHwFile.renameTo(hwFile)) {
-        fatal("Attempt to swap the new high watermark file with the old one failed")
-        System.exit(1)
-      }
-    }finally {
-      hwFileLock.unlock()
-    }
-  }
-
-  def read(topic: String, partition: Int): Long = {
-    hwFileLock.lock()
-    try {
-      hwFile.length() match {
-        case 0 => 
-          warn("No highwatermark file is found. Returning 0 as the highwatermark for topic %s partition %d."
-            .format(topic, partition))
-          0L
-        case _ =>
-          val hwFileReader = new BufferedReader(new FileReader(hwFile))
-          val version = hwFileReader.readLine().toShort
-          version match {
-            case HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion =>
-              val numberOfHighWatermarks = hwFileReader.readLine().toInt
-              val partitionHighWatermarks =
-                for(i <- 0 until numberOfHighWatermarks) yield {
-                  val nextHwEntry = hwFileReader.readLine()
-                  val partitionHwInfo = nextHwEntry.split(" ")
-                  val topic = partitionHwInfo(0)
-                  val partitionId = partitionHwInfo(1).toInt
-                  val highWatermark = partitionHwInfo(2).toLong
-                  (TopicAndPartition(topic, partitionId) -> highWatermark)
-                }
-              hwFileReader.close()
-              val hwOpt = partitionHighWatermarks.toMap.get(TopicAndPartition(topic, partition))
-              hwOpt match {
-                case Some(hw) => 
-                  debug("Read hw %d for topic %s partition %d from highwatermark checkpoint file".format(hw, topic, partition))
-                  hw
-                case None => 
-                  warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
-                       "partition %d. Returning 0 as the highwatermark".format(partition))
-                  0L
-              }
-            case _ => fatal("Unrecognized version of the highwatermark checkpoint file " + version)
-            System.exit(1)
-            -1L
-          }
-      }
-    }finally {
-      hwFileLock.unlock()
-    }
-  }
-}
\ No newline at end of file