You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2013/03/12 19:18:35 UTC

git commit: KAFKA-739 Handle null message payloads in messages and in the log cleaner. Reviewed by Jun and Neha.

Updated Branches:
  refs/heads/trunk c1ed12e44 -> 9ff4e8eb1


KAFKA-739 Handle null message payloads in messages and in the log cleaner. Reviewed by Jun and Neha.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ff4e8eb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ff4e8eb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ff4e8eb

Branch: refs/heads/trunk
Commit: 9ff4e8eb10e0ddd86f257e99d55971a132426605
Parents: c1ed12e
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Mar 12 11:17:12 2013 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Mar 12 11:17:12 2013 -0700

----------------------------------------------------------------------
 .../java/kafka/etl/impl/SimpleKafkaETLMapper.java  |    2 +
 .../scala/kafka/consumer/ConsumerIterator.scala    |    2 +-
 core/src/main/scala/kafka/log/CleanerConfig.scala  |    4 +-
 core/src/main/scala/kafka/log/FileMessageSet.scala |    3 +-
 core/src/main/scala/kafka/log/LogCleaner.scala     |  104 ++++++---
 core/src/main/scala/kafka/log/LogConfig.scala      |    6 +
 core/src/main/scala/kafka/log/LogSegment.scala     |    9 +-
 core/src/main/scala/kafka/log/OffsetMap.scala      |  103 ++++++---
 core/src/main/scala/kafka/message/Message.scala    |   16 +-
 .../kafka/producer/async/DefaultEventHandler.scala |    5 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   14 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |    2 +
 .../main/scala/kafka/tools/DumpLogSegments.scala   |    5 +-
 .../scala/kafka/tools/SimpleConsumerShell.scala    |    2 +-
 .../main/scala/kafka/utils/IteratorTemplate.scala  |   17 +-
 .../scala/kafka/utils/VerifiableProperties.scala   |    6 +-
 .../test/scala/other/kafka/TestLogCleaning.scala   |  198 +++++++++++----
 .../test/scala/unit/kafka/log/CleanerTest.scala    |   66 +++--
 core/src/test/scala/unit/kafka/log/LogTest.scala   |   13 +
 .../test/scala/unit/kafka/log/OffsetMapTest.scala  |   43 ++--
 .../scala/unit/kafka/message/MessageTest.scala     |    9 +-
 .../scala/unit/kafka/producer/ProducerTest.scala   |   25 ++-
 .../unit/kafka/utils/IteratorTemplateTest.scala    |   41 +++
 23 files changed, 504 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
index b0aadff..51b6adf 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
@@ -41,6 +41,8 @@ Mapper<KafkaETLKey, BytesWritable, LongWritable, Text> {
     
 	protected Text getData(Message message) throws IOException {
 		ByteBuffer buf = message.payload();
+		if(buf == null)
+		  return new Text();
 		
 		byte[] array = new byte[buf.limit()];
 		buf.get(array);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index 746a4bd..963a3a9 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -108,7 +108,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
 
     val keyBuffer = item.message.key
     val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer))
-    val value = valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
+    val value = if(item.message.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
     new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/log/CleanerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/CleanerConfig.scala b/core/src/main/scala/kafka/log/CleanerConfig.scala
index 999fee8..fa946ad 100644
--- a/core/src/main/scala/kafka/log/CleanerConfig.scala
+++ b/core/src/main/scala/kafka/log/CleanerConfig.scala
@@ -30,8 +30,8 @@ package kafka.log
  * @param hashAlgorithm The hash algorithm to use in key comparison.
  */
 case class CleanerConfig(val numThreads: Int = 1, 
-                         val dedupeBufferSize: Int = 4*1024*1024,
-                         val dedupeBufferLoadFactor: Double = 0.75,
+                         val dedupeBufferSize: Long = 4*1024*1024L,
+                         val dedupeBufferLoadFactor: Double = 0.9d,
                          val ioBufferSize: Int = 1024*1024,
                          val maxMessageSize: Int = 32*1024*1024,
                          val maxIoBytesPerSecond: Double = Double.MaxValue,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 0eef33e..abb160c 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -159,13 +159,14 @@ class FileMessageSet private[kafka](@volatile var file: File,
   def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = {
     new IteratorTemplate[MessageAndOffset] {
       var location = start
+      val sizeOffsetBuffer = ByteBuffer.allocate(12)
       
       override def makeNext(): MessageAndOffset = {
         if(location >= end)
           return allDone()
           
         // read the size of the item
-        val sizeOffsetBuffer = ByteBuffer.allocate(12)
+        sizeOffsetBuffer.rewind()
         channel.read(sizeOffsetBuffer, location)
         if(sizeOffsetBuffer.hasRemaining)
           return allDone()

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 368a12b..ccde2ab 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -23,6 +23,7 @@ import java.nio._
 import java.util.concurrent.Semaphore
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
+import java.util.Date
 import java.io.File
 import kafka.common._
 import kafka.message._
@@ -39,8 +40,7 @@ import kafka.utils._
  * The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "dedupe" retention policy 
  * and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log. 
  * 
- * To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. For memory efficiency this mapping
- * is approximate. That is allowed to lose some key=>offset pairs, but never to return a wrong answer. See kafka.log.OffsetMap for details of
+ * To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. See kafka.log.OffsetMap for details of
  * the implementation of the mapping. 
  * 
  * Once the key=>offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a 
@@ -53,6 +53,11 @@ import kafka.utils._
  * 
  * One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted.
  * 
+ * Messages with null payload are treated as deletes for the purpose of log compaction. This means that they receive special treatment by the cleaner. 
+ * The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic
+ * basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed).
+ * Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning.
+ * 
  * @param config Configuration parameters for the cleaner
  * @param logDirs The directories where offset checkpoints reside
  * @param logs The pool of logs
@@ -62,7 +67,7 @@ class LogCleaner(val config: CleanerConfig,
                  val logDirs: Array[File],
                  val logs: Pool[TopicAndPartition, Log], 
                  time: Time = SystemTime) extends Logging {
-  
+    
   /* the offset checkpoints holding the last cleaned point for each log */
   private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap
   
@@ -160,12 +165,14 @@ class LogCleaner(val config: CleanerConfig,
    * choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments.
    */
   private class CleanerThread extends Thread {
+    if(config.dedupeBufferSize / config.numThreads > Int.MaxValue)
+      warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...")
     val cleaner = new Cleaner(id = threadId.getAndIncrement(),
-                              offsetMap = new SkimpyOffsetMap(memory = config.dedupeBufferSize / config.numThreads, 
-                                                              maxLoadFactor = config.dedupeBufferLoadFactor, 
+                              offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt, 
                                                               hashAlgorithm = config.hashAlgorithm),
                               ioBufferSize = config.ioBufferSize / config.numThreads / 2,
                               maxIoBufferSize = config.maxMessageSize,
+                              dupBufferLoadFactor = config.dedupeBufferLoadFactor,
                               throttler = throttler,
                               time = time)
     
@@ -251,13 +258,20 @@ private[log] class Cleaner(val id: Int,
                            offsetMap: OffsetMap,
                            ioBufferSize: Int,
                            maxIoBufferSize: Int,
+                           dupBufferLoadFactor: Double,
                            throttler: Throttler,
                            time: Time) extends Logging {
 
-  this.logIdent = "Cleaner " + id + ":"
+  this.logIdent = "Cleaner " + id + ": "
+  
+  /* stats on this cleaning */
   val stats = new CleanerStats(time)
-  private var readBuffer = ByteBuffer.allocate(ioBufferSize) // buffer for disk read I/O
-  private var writeBuffer = ByteBuffer.allocate(ioBufferSize) // buffer for disk write I/O
+  
+  /* buffer used for read i/o */
+  private var readBuffer = ByteBuffer.allocate(ioBufferSize)
+  
+  /* buffer used for write i/o */
+  private var writeBuffer = ByteBuffer.allocate(ioBufferSize)
 
   /**
    * Clean the given log
@@ -268,22 +282,29 @@ private[log] class Cleaner(val id: Int,
    */
   private[log] def clean(cleanable: LogToClean): Long = {
     stats.clear()
-    val topic = cleanable.topicPartition.topic
-    val part = cleanable.topicPartition.partition
-    info("Beginning cleaning of %s-%d.".format(topic, part))
+    info("Beginning cleaning of log %s.".format(cleanable.log.name))
     val log = cleanable.log
     val truncateCount = log.numberOfTruncates
     
     // build the offset map
-    val upperBoundOffset = math.min(log.activeSegment.baseOffset, cleanable.firstDirtyOffset + offsetMap.capacity)
+    info("Building offset map for %s...".format(cleanable.log.name))
+    val upperBoundOffset = log.activeSegment.baseOffset
     val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + 1
     stats.indexDone()
     
-    // group the segments and clean the groups
-    for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) {
-      info("Cleaning segments %s for log %s...".format(group.map(_.baseOffset).mkString(","), log.name))
-      cleanSegments(log, group, offsetMap, truncateCount)
+    // figure out the timestamp below which it is safe to remove delete tombstones
+    // this position is defined to be a configurable time beneath the last modified time of the last clean segment
+    val deleteHorizonMs = 
+      log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
+        case None => 0L
+        case Some(seg) => seg.lastModified - log.config.deleteRetentionMs
     }
+        
+    // group the segments and clean the groups
+    info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name, new Date(deleteHorizonMs)))
+    for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))
+      cleanSegments(log, group, offsetMap, truncateCount, deleteHorizonMs)
+    
     stats.allDone()
     endOffset
   }
@@ -295,8 +316,13 @@ private[log] class Cleaner(val id: Int,
    * @param segments The group of segments being cleaned
    * @param map The offset map to use for cleaning segments
    * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet
+   * @param deleteHorizonMs The time to retain delete tombstones
    */
-  private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, expectedTruncateCount: Int) {
+  private[log] def cleanSegments(log: Log, 
+                                 segments: Seq[LogSegment], 
+                                 map: OffsetMap, 
+                                 expectedTruncateCount: Int, 
+                                 deleteHorizonMs: Long) {
     // create a new segment with the suffix .cleaned appended to both the log and index name
     val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
     logFile.delete()
@@ -307,17 +333,25 @@ private[log] class Cleaner(val id: Int,
     val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, time)
 
     // clean segments into the new destination segment
-    for (old <- segments)
-      cleanInto(old, cleaned, map)
+    for (old <- segments) {
+      val retainDeletes = old.lastModified > deleteHorizonMs
+      info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
+          .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
+      cleanInto(old, cleaned, map, retainDeletes)
+    }
       
     // trim excess index
     index.trimToValidSize()
     
     // flush new segment to disk before swap
     cleaned.flush()
+    
+    // update the modification date to retain the last modified date of the original files
+    val modified = segments.last.lastModified
+    cleaned.lastModified = modified
 
     // swap in new segment  
-    info("Swapping in cleaned segment %d for %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name))
+    info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name))
     try {
       log.replaceSegments(cleaned, segments, expectedTruncateCount)
     } catch {
@@ -334,10 +368,11 @@ private[log] class Cleaner(val id: Int,
    * @param source The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
+   * @param retainDeletes Should delete tombstones be retained while cleaning this segment
    *
    * TODO: Implement proper compression support
    */
-  private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap) {
+  private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) {
     var position = 0
     while (position < source.log.sizeInBytes) {
       checkDone()
@@ -355,10 +390,14 @@ private[log] class Cleaner(val id: Int,
         stats.readMessage(size)
         val key = entry.message.key
         require(key != null, "Found null key in log segment %s which is marked as dedupe.".format(source.log.file.getAbsolutePath))
-        val lastOffset = map.get(key)
-        /* retain the record if it isn't present in the map OR it is present but this offset is the highest (and it's not a delete) */
-        val retainRecord = lastOffset < 0 || (entry.offset >= lastOffset && entry.message.payload != null)
-        if (retainRecord) {
+        val foundOffset = map.get(key)
+        /* two cases in which we can get rid of a message:
+         *   1) if there exists a message with the same key but higher offset
+         *   2) if the message is a delete "tombstone" marker and enough time has passed
+         */
+        val redundant = foundOffset >= 0 && entry.offset < foundOffset
+        val obsoleteDelete = !retainDeletes && entry.message.isNull
+        if (!redundant && !obsoleteDelete) {
           ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
           stats.recopyMessage(size)
         }
@@ -443,13 +482,18 @@ private[log] class Cleaner(val id: Int,
    */
   private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap): Long = {
     map.clear()
-    val segments = log.logSegments(start, end)
-    info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, segments.size, start, end))
-    var offset = segments.head.baseOffset
+    val dirty = log.logSegments(start, end).toSeq
+    info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end))
+    
+    // Add all the dirty segments. We must take at least map.slots * load_factor,
+    // but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
+    var offset = dirty.head.baseOffset
     require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name))
-    for (segment <- segments) {
+    val minStopOffset = (start + map.slots * this.dupBufferLoadFactor).toLong
+    for (segment <- dirty) {
       checkDone()
-      offset = buildOffsetMap(segment, map)
+      if(segment.baseOffset <= minStopOffset || map.utilization < this.dupBufferLoadFactor)
+        offset = buildOffsetMap(segment, map)
     }
     info("Offset map for log %s complete.".format(log.name))
     offset

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index dc42a74..48660bc 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -32,6 +32,7 @@ import kafka.common._
  * @param maxIndexSize The maximum size of an index file
  * @param indexInterval The approximate number of bytes between index entries
  * @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem
+ * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted.
  * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
  * @param dedupe Should old segments in this log be deleted or deduplicated?
  */
@@ -45,6 +46,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024,
                      val maxIndexSize: Int = 1024*1024,
                      val indexInterval: Int = 4096,
                      val fileDeleteDelayMs: Long = 60*1000,
+                     val deleteRetentionMs: Long = 24 * 60 * 60 * 1000L,
                      val minCleanableRatio: Double = 0.5,
                      val dedupe: Boolean = false) {
   
@@ -60,6 +62,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024,
     props.put(RententionMsProp, retentionMs.toString)
     props.put(MaxMessageBytesProp, maxMessageSize.toString)
     props.put(IndexIntervalBytesProp, indexInterval.toString)
+    props.put(DeleteRetentionMsProp, deleteRetentionMs.toString)
     props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString)
     props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
     props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete")
@@ -78,6 +81,7 @@ object LogConfig {
   val RententionMsProp = "retention.ms"
   val MaxMessageBytesProp = "max.message.bytes"
   val IndexIntervalBytesProp = "index.interval.bytes"
+  val DeleteRetentionMsProp = "delete.retention.ms"
   val FileDeleteDelayMsProp = "file.delete.delay.ms"
   val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
   val CleanupPolicyProp = "cleanup.policy"
@@ -92,6 +96,7 @@ object LogConfig {
                         MaxMessageBytesProp,
                         IndexIntervalBytesProp,
                         FileDeleteDelayMsProp,
+                        DeleteRetentionMsProp,
                         MinCleanableDirtyRatioProp,
                         CleanupPolicyProp)
     
@@ -110,6 +115,7 @@ object LogConfig {
                   maxMessageSize = props.getProperty(MaxMessageBytesProp).toInt,
                   indexInterval = props.getProperty(IndexIntervalBytesProp).toInt,
                   fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt,
+                  deleteRetentionMs = props.getProperty(DeleteRetentionMsProp).toLong,
                   minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble,
                   dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 120ebeb..30d2e91 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -249,5 +249,12 @@ class LogSegment(val log: FileMessageSet,
    * The last modified time of this log segment as a unix time stamp
    */
   def lastModified = log.file.lastModified
-
+  
+  /**
+   * Change the last modified time for this log segment
+   */
+  def lastModified_=(ms: Long) = {
+    log.file.setLastModified(ms)
+    index.file.setLastModified(ms)
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/log/OffsetMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala
index 6236813..42cdfbb 100644
--- a/core/src/main/scala/kafka/log/OffsetMap.scala
+++ b/core/src/main/scala/kafka/log/OffsetMap.scala
@@ -23,22 +23,22 @@ import java.nio.ByteBuffer
 import kafka.utils._
 
 trait OffsetMap {
-  def capacity: Int
+  def slots: Int
   def put(key: ByteBuffer, offset: Long)
   def get(key: ByteBuffer): Long
   def clear()
   def size: Int
-  def utilization: Double = size.toDouble / capacity
+  def utilization: Double = size.toDouble / slots
 }
 
 /**
- * An approximate map used for deduplicating the log.
+ * An hash table used for deduplicating the log. This hash table uses a cryptographicly secure hash of the key as a proxy for the key
+ * for comparisons and to save space on object overhead. Collisions are resolved by probing. This hash table does not support deletes.
  * @param memory The amount of memory this map can use
- * @param maxLoadFactor The maximum percent full this offset map can be
  * @param hashAlgorithm The hash algorithm instance to use: MD2, MD5, SHA-1, SHA-256, SHA-384, SHA-512
  */
 @nonthreadsafe
-class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgorithm: String = "MD5") extends OffsetMap {
+class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extends OffsetMap {
   private val bytes = ByteBuffer.allocate(memory)
   
   /* the hash algorithm instance to use, defualt is MD5 */
@@ -54,8 +54,11 @@ class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgori
   /* number of entries put into the map */
   private var entries = 0
   
-  /* a byte added as a prefix to all keys to make collisions non-static in repeated uses. Changed in clear(). */
-  private var salt: Byte = 0
+  /* number of lookups on the map */
+  private var lookups = 0L
+  
+  /* the number of probes for all lookups */
+  private var probes = 0L
   
   /**
    * The number of bytes of space each entry uses (the number of bytes in the hash plus an 8 byte offset)
@@ -63,40 +66,66 @@ class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgori
   val bytesPerEntry = hashSize + 8
   
   /**
-   * The maximum number of entries this map can contain before it exceeds the max load factor
+   * The maximum number of entries this map can contain
    */
-  override val capacity: Int = (maxLoadFactor * memory / bytesPerEntry).toInt
+  val slots: Int = (memory / bytesPerEntry).toInt
   
   /**
-   * Associate a offset with a key.
+   * Associate this offset to the given key.
    * @param key The key
    * @param offset The offset
    */
   override def put(key: ByteBuffer, offset: Long) {
-    if(size + 1 > capacity)
-      throw new IllegalStateException("Attempt to add to a full offset map with a maximum capacity of %d.".format(capacity))
-    hash(key, hash1)
-    bytes.position(offsetFor(hash1))
+    require(entries < slots, "Attempt to add a new entry to a full offset map.")
+    lookups += 1
+    hashInto(key, hash1)
+    // probe until we find the first empty slot
+    var attempt = 0
+    var pos = positionOf(hash1, attempt)  
+    while(!isEmpty(pos)) {
+      bytes.position(pos)
+      bytes.get(hash2)
+      if(Arrays.equals(hash1, hash2)) {
+        // we found an existing entry, overwrite it and return (size does not change)
+        bytes.putLong(offset)
+        return
+      }
+      attempt += 1
+      pos = positionOf(hash1, attempt)
+    }
+    // found an empty slot, update it--size grows by 1
+    bytes.position(pos)
     bytes.put(hash1)
     bytes.putLong(offset)
     entries += 1
   }
   
   /**
-   * Get the offset associated with this key. This method is approximate,
-   * it may not find an offset previously stored, but cannot give a wrong offset.
+   * Check that there is no entry at the given position
+   */
+  private def isEmpty(position: Int): Boolean = 
+    bytes.getLong(position) == 0 && bytes.getLong(position + 8) == 0 && bytes.getLong(position + 16) == 0
+  
+  /**
+   * Get the offset associated with this key.
    * @param key The key
    * @return The offset associated with this key or -1 if the key is not found
    */
   override def get(key: ByteBuffer): Long = {
-    hash(key, hash1)
-    bytes.position(offsetFor(hash1))
-    bytes.get(hash2)
-    // if the computed hash equals the stored hash return the stored offset
-    if(Arrays.equals(hash1, hash2))
-      bytes.getLong()
-    else
-      -1L
+    lookups += 1
+    hashInto(key, hash1)
+    // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot
+    var attempt = 0
+    var pos = 0
+    do {
+      pos = positionOf(hash1, attempt)
+      bytes.position(pos)
+      if(isEmpty(pos))
+        return -1L
+      bytes.get(hash2)
+      attempt += 1
+    } while(!Arrays.equals(hash1, hash2))
+    bytes.getLong()
   }
   
   /**
@@ -105,7 +134,8 @@ class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgori
    */
   override def clear() {
     this.entries = 0
-    this.salt = (this.salt + 1).toByte
+    this.lookups = 0L
+    this.probes = 0L
     Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit, 0.toByte)
   }
   
@@ -115,19 +145,32 @@ class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgori
   override def size: Int = entries
   
   /**
-   * Choose a slot in the array for this hash
+   * The rate of collisions in the lookups
    */
-  private def offsetFor(hash: Array[Byte]): Int = 
-    bytesPerEntry * (Utils.abs(Utils.readInt(hash, 0)) % capacity)
+  def collisionRate: Double = 
+    (this.probes - this.lookups) / this.lookups.toDouble
+  
+  /**
+   * Calculate the ith probe position. We first try reading successive integers from the hash itself
+   * then if all of those fail we degrade to linear probing.
+   * @param hash The hash of the key to find the position for
+   * @param attempt The ith probe
+   * @return The byte offset in the buffer at which the ith probing for the given hash would reside
+   */
+  private def positionOf(hash: Array[Byte], attempt: Int): Int = {
+    val probe = Utils.readInt(hash, math.min(attempt, hashSize - 4)) + math.max(0, attempt - hashSize + 4)
+    val slot = Utils.abs(probe) % slots
+    this.probes += 1
+    slot * bytesPerEntry
+  }
   
   /**
    * The offset at which we have stored the given key
    * @param key The key to hash
    * @param buffer The buffer to store the hash into
    */
-  private def hash(key: ByteBuffer, buffer: Array[Byte]) {
+  private def hashInto(key: ByteBuffer, buffer: Array[Byte]) {
     key.mark()
-    digest.update(salt)
     digest.update(key)
     key.reset()
     digest.digest(buffer, 0, hashSize)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala
index 12a8368..1a43fdf 100644
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -101,7 +101,9 @@ class Message(val buffer: ByteBuffer) {
                              Message.KeySizeLength + 
                              (if(key == null) 0 else key.length) + 
                              Message.ValueSizeLength + 
-                             (if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset)))
+                             (if(bytes == null) 0 
+                              else if(payloadSize >= 0) payloadSize 
+                              else bytes.length - payloadOffset)))
     // skip crc, we will fill that in at the end
     buffer.position(MagicOffset)
     buffer.put(CurrentMagicValue)
@@ -115,9 +117,12 @@ class Message(val buffer: ByteBuffer) {
       buffer.putInt(key.length)
       buffer.put(key, 0, key.length)
     }
-    val size = if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset
+    val size = if(bytes == null) -1
+               else if(payloadSize >= 0) payloadSize 
+               else bytes.length - payloadOffset
     buffer.putInt(size)
-    buffer.put(bytes, payloadOffset, size)
+    if(bytes != null)
+      buffer.put(bytes, payloadOffset, size)
     buffer.rewind()
     
     // now compute the checksum and fill it in
@@ -186,6 +191,11 @@ class Message(val buffer: ByteBuffer) {
   def payloadSize: Int = buffer.getInt(payloadSizeOffset)
   
   /**
+   * Is the payload of this message null
+   */
+  def isNull(): Boolean = payloadSize < 0
+  
+  /**
    * The magic version of this message
    */
   def magic: Byte = buffer.get(MagicOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 27b16e3..2e3e383 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -249,7 +249,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           if (logger.isTraceEnabled) {
             val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
             successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
-              trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))
+              trace("Successfully sent message: %s".format(if(message.message.isNull) null else Utils.readString(message.message.payload)))))
           }
           failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
             .map(partitionStatus => partitionStatus._1)
@@ -257,8 +257,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
             error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s"
               .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(",")))
           failedTopicPartitions
-        } else
+        } else {
           Seq.empty[TopicAndPartition]
+        }
       } catch {
         case t: Throwable =>
           warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 006caa7..5e4c9ca 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -109,9 +109,6 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "dedupe" */
   val logCleanupPolicy = props.getString("log.cleanup.policy", "delete")
   
-  /* a per-topic override for the cleanup policy for segments beyond the retention window */
-  val logCleanupPolicyMap = props.getMap("topic.log.cleanup.policy")
-  
   /* the number of background threads to use for log cleaning */
   val logCleanerThreads = props.getIntInRange("log.cleaner.threads", 1, (0, Int.MaxValue))
   
@@ -119,11 +116,15 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val logCleanerIoMaxBytesPerSecond = props.getDouble("log.cleaner.io.max.bytes.per.second", Double.MaxValue)
   
   /* the total memory used for log deduplication across all cleaner threads */
-  val logCleanerDedupeBufferSize = props.getIntInRange("log.cleaner.dedupe.buffer.size", 500*1024*1024, (0, Int.MaxValue))
+  val logCleanerDedupeBufferSize = props.getLongInRange("log.cleaner.dedupe.buffer.size", 500*1024*1024L, (0, Long.MaxValue))
   require(logCleanerDedupeBufferSize / logCleanerThreads > 1024*1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
   
   /* the total memory used for log cleaner I/O buffers across all cleaner threads */
-  val logCleanerIoBufferSize = props.getIntInRange("log.cleaner.io.buffer.size", 4*1024*1024, (0, Int.MaxValue))
+  val logCleanerIoBufferSize = props.getIntInRange("log.cleaner.io.buffer.size", 512*1024, (0, Int.MaxValue))
+  
+  /* log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value
+   * will allow more log to be cleaned at once but will lead to more hash collisions */
+  val logCleanerDedupeBufferLoadFactor = props.getDouble("log.cleaner.io.buffer.load.factor", 0.9d)
   
   /* the amount of time to sleep when there are no logs to clean */
   val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 30*1000, (0L, Long.MaxValue))
@@ -134,6 +135,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* should we enable log cleaning? */
   val logCleanerEnable = props.getBoolean("log.cleaner.enable", false)
   
+  /* how long are delete records retained? */
+  val logCleanerDeleteRetentionMs = props.getLong("log.cleaner.delete.retention.ms", 24 * 60 * 60 * 1000L)
+  
   /* the maximum size in bytes of the offset index */
   val logIndexSizeMaxBytes = props.getIntInRange("log.index.size.max.bytes", 10*1024*1024, (4, Int.MaxValue))
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9fa432d..e2f4e91 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -163,6 +163,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                                      maxMessageSize = config.messageMaxBytes,
                                      maxIndexSize = config.logIndexSizeMaxBytes,
                                      indexInterval = config.logIndexIntervalBytes,
+                                     deleteRetentionMs = config.logCleanerDeleteRetentionMs,
                                      fileDeleteDelayMs = config.logDeleteDelayMs,
                                      minCleanableRatio = config.logCleanerMinCleanRatio,
                                      dedupe = config.logCleanupPolicy.trim.toLowerCase == "dedupe")
@@ -171,6 +172,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     // read the log configurations from zookeeper
     val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
                                       dedupeBufferSize = config.logCleanerDedupeBufferSize,
+                                      dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
                                       ioBufferSize = config.logCleanerIoBufferSize,
                                       maxMessageSize = config.messageMaxBytes,
                                       maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 31333e7..d9546ca 100644
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -140,8 +140,9 @@ object DumpLogSegments {
         print(" keysize: " + msg.keySize)
       if(printContents) {
         if(msg.hasKey)
-          print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
-        print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
+          print(" key: " + Utils.readString(messageAndOffset.message.key, "UTF-8"))
+        val payload = if(messageAndOffset.message.isNull) null else Utils.readString(messageAndOffset.message.payload, "UTF-8")
+        print(" payload: " + payload)
       }
       println()
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index d8127a8..aa5e661 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -196,7 +196,7 @@ object SimpleConsumerShell extends Logging {
                   System.out.println("next offset = " + offset)
                 val message = messageAndOffset.message
                 val key = if(message.hasKey) Utils.readBytes(message.key) else null
-                formatter.writeTo(key, Utils.readBytes(message.payload), System.out)
+                formatter.writeTo(key, if(message.isNull) null else Utils.readBytes(message.payload), System.out)
               } catch {
                 case e =>
                   if (skipMessageOnError)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/utils/IteratorTemplate.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/IteratorTemplate.scala b/core/src/main/scala/kafka/utils/IteratorTemplate.scala
index 301f934..fd952f3 100644
--- a/core/src/main/scala/kafka/utils/IteratorTemplate.scala
+++ b/core/src/main/scala/kafka/utils/IteratorTemplate.scala
@@ -32,16 +32,21 @@ object FAILED extends State
 abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T] {
   
   private var state: State = NOT_READY
-  private var nextItem: Option[T] = None
+  private var nextItem = null.asInstanceOf[T]
 
   def next(): T = {
     if(!hasNext())
       throw new NoSuchElementException()
     state = NOT_READY
-    nextItem match {
-      case Some(item) => item
-      case None => throw new IllegalStateException("Expected item but none found.")
-    }
+    if(nextItem == null)
+      throw new IllegalStateException("Expected item but none found.")
+    nextItem
+  }
+  
+  def peek(): T = {
+    if(!hasNext())
+      throw new NoSuchElementException()
+    nextItem
   }
   
   def hasNext(): Boolean = {
@@ -58,7 +63,7 @@ abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T
   
   def maybeComputeNext(): Boolean = {
     state = FAILED
-    nextItem = Some(makeNext())
+    nextItem = makeNext()
     if(state == DONE) {
       false
     } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/utils/VerifiableProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index a2ac55c..9009a9d 100644
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -18,6 +18,7 @@
 package kafka.utils
 
 import java.util.Properties
+import java.util.Collections
 import scala.collection._
 
 class VerifiableProperties(val props: Properties) extends Logging {
@@ -194,9 +195,8 @@ class VerifiableProperties(val props: Properties) extends Logging {
 
   def verify() {
     info("Verifying properties")
-    val specifiedProperties = props.propertyNames()
-    while (specifiedProperties.hasMoreElements) {
-      val key = specifiedProperties.nextElement().asInstanceOf[String]
+    val propNames = JavaConversions.asBuffer(Collections.list(props.propertyNames)).map(_.toString).sorted
+    for(key <- propNames) {
       if (!referenceSet.contains(key))
         warn("Property %s is not valid".format(key))
       else

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/test/scala/other/kafka/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLogCleaning.scala b/core/src/test/scala/other/kafka/TestLogCleaning.scala
index d9c721b..0bef218 100644
--- a/core/src/test/scala/other/kafka/TestLogCleaning.scala
+++ b/core/src/test/scala/other/kafka/TestLogCleaning.scala
@@ -27,6 +27,8 @@ import kafka.producer._
 import kafka.consumer._
 import kafka.serializer._
 import kafka.utils._
+import kafka.log.FileMessageSet
+import kafka.log.Log
 
 /**
  * This is a torture test that runs against an existing broker. Here is how it works:
@@ -66,6 +68,11 @@ object TestLogCleaning {
                           .describedAs("count")
                           .ofType(classOf[java.lang.Integer])
                           .defaultsTo(1)
+    val percentDeletesOpt = parser.accepts("percent-deletes", "The percentage of updates that are deletes.")
+    		                      .withRequiredArg
+    		                      .describedAs("percent")
+    		                      .ofType(classOf[java.lang.Integer])
+    		                      .defaultsTo(0)
     val zkConnectOpt = parser.accepts("zk", "Zk url.")
                              .withRequiredArg
                              .describedAs("url")
@@ -75,10 +82,18 @@ object TestLogCleaning {
                              .describedAs("ms")
                              .ofType(classOf[java.lang.Integer])
                              .defaultsTo(0)
-    val cleanupOpt = parser.accepts("cleanup", "Delete temp files when done.")
+    val dumpOpt = parser.accepts("dump", "Dump the message contents of a topic partition that contains test data from this test to standard out.")
+                        .withRequiredArg
+                        .describedAs("directory")
+                        .ofType(classOf[String])
     
     val options = parser.parse(args:_*)
     
+    if(options.has(dumpOpt)) {
+      dumpLog(new File(options.valueOf(dumpOpt)))
+      System.exit(0)
+    }
+    
     if(!options.has(brokerOpt) || !options.has(zkConnectOpt) || !options.has(numMessagesOpt)) {
       parser.printHelpOn(System.err)
       System.exit(1)
@@ -86,83 +101,146 @@ object TestLogCleaning {
     
     // parse options
     val messages = options.valueOf(numMessagesOpt).longValue
+    val percentDeletes = options.valueOf(percentDeletesOpt).intValue
     val dups = options.valueOf(numDupsOpt).intValue
     val brokerUrl = options.valueOf(brokerOpt)
     val topicCount = options.valueOf(topicsOpt).intValue
     val zkUrl = options.valueOf(zkConnectOpt)
     val sleepSecs = options.valueOf(sleepSecsOpt).intValue
-    val cleanup = options.has(cleanupOpt)
     
     val testId = new Random().nextInt(Int.MaxValue)
     val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray
     
     println("Producing %d messages...".format(messages))
-    val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, cleanup)
+    val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, percentDeletes)
     println("Sleeping for %d seconds...".format(sleepSecs))
     Thread.sleep(sleepSecs * 1000)
     println("Consuming messages...")
-    val consumedDataFile = consumeMessages(zkUrl, topics, cleanup)
+    val consumedDataFile = consumeMessages(zkUrl, topics)
     
     val producedLines = lineCount(producedDataFile)
     val consumedLines = lineCount(consumedDataFile)
     val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble
     println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction))
     
-    println("Validating output files...")
-    validateOutput(externalSort(producedDataFile), externalSort(consumedDataFile))
-    println("All done.")
+    println("Deduplicating and validating output files...")
+    validateOutput(producedDataFile, consumedDataFile)
+    producedDataFile.delete()
+    consumedDataFile.delete()
+  }
+  
+  def dumpLog(dir: File) {
+    require(dir.exists, "Non-existant directory: " + dir.getAbsolutePath)
+    for(file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
+      val ms = new FileMessageSet(new File(dir, file))
+      for(entry <- ms) {
+        val key = Utils.readString(entry.message.key)
+        val content = 
+          if(entry.message.isNull)
+            null
+          else
+            Utils.readString(entry.message.payload)
+        println("offset = %s, key = %s, content = %s".format(entry.offset, key, content))
+      }
+    }
   }
   
   def lineCount(file: File): Int = io.Source.fromFile(file).getLines.size
   
-  def validateOutput(produced: BufferedReader, consumed: BufferedReader) {
-    while(true) {
-      val prod = readFinalValue(produced)
-      val cons = readFinalValue(consumed)
-      if(prod == null && cons == null) {
-        return
-      } else if(prod != cons) {
-        System.err.println("Validation failed prod = %s, cons = %s!".format(prod, cons))
-        System.exit(1)
-      }
+  def validateOutput(producedDataFile: File, consumedDataFile: File) {
+    val producedReader = externalSort(producedDataFile)
+    val consumedReader = externalSort(consumedDataFile)
+    val produced = valuesIterator(producedReader)
+    val consumed = valuesIterator(consumedReader)
+    val producedDedupedFile = new File(producedDataFile.getAbsolutePath + ".deduped")
+    val producedDeduped = new BufferedWriter(new FileWriter(producedDedupedFile), 1024*1024)
+    val consumedDedupedFile = new File(consumedDataFile.getAbsolutePath + ".deduped")
+    val consumedDeduped = new BufferedWriter(new FileWriter(consumedDedupedFile), 1024*1024)
+    var total = 0
+    var mismatched = 0
+    while(produced.hasNext && consumed.hasNext) {
+      val p = produced.next()
+      producedDeduped.write(p.toString)
+      producedDeduped.newLine()
+      val c = consumed.next()
+      consumedDeduped.write(c.toString)
+      consumedDeduped.newLine()
+      if(p != c)
+        mismatched += 1
+      total += 1
     }
+    producedDeduped.close()
+    consumedDeduped.close()
+    require(!produced.hasNext, "Additional values produced not found in consumer log.")
+    require(!consumed.hasNext, "Additional values consumed not found in producer log.")
+    println("Validated " + total + " values, " + mismatched + " mismatches.")
+    require(mismatched == 0, "Non-zero number of row mismatches.")
+    // if all the checks worked out we can delete the deduped files
+    producedDedupedFile.delete()
+    consumedDedupedFile.delete()
   }
   
-  def readFinalValue(reader: BufferedReader): (String, Int, Int) = {
-    def readTuple() = {
-      val line = reader.readLine
-      if(line == null)
-        null
-      else
-        line.split("\t")
+  def valuesIterator(reader: BufferedReader) = {
+    new IteratorTemplate[TestRecord] {
+      def makeNext(): TestRecord = {
+        var next = readNext(reader)
+        while(next != null && next.delete)
+          next = readNext(reader)
+        if(next == null)
+          allDone()
+        else
+          next
+      }
     }
-    var prev = readTuple()
-    if(prev == null)
+  }
+  
+  def readNext(reader: BufferedReader): TestRecord = {
+    var line = reader.readLine()
+    if(line == null)
       return null
+    var curr = new TestRecord(line)
     while(true) {
-      reader.mark(1024)
-      val curr = readTuple()
-      if(curr == null || curr(0) != prev(0) || curr(1) != prev(1)) {
-        reader.reset()
-        return (prev(0), prev(1).toInt, prev(2).toInt)
-      } else {
-        prev = curr
-      }
+      line = peekLine(reader)
+      if(line == null)
+        return curr
+      val next = new TestRecord(line)
+      if(next == null || next.topicAndKey != curr.topicAndKey)
+        return curr
+      curr = next
+      reader.readLine()
     }
-    return null
+    null
+  }
+  
+  def peekLine(reader: BufferedReader) = {
+    reader.mark(4096)
+    val line = reader.readLine
+    reader.reset()
+    line
   }
   
   def externalSort(file: File): BufferedReader = {
-    val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", file.getAbsolutePath)
+    val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", "--temporary-directory=" + System.getProperty("java.io.tmpdir"), file.getAbsolutePath)
     val process = builder.start()
-    new BufferedReader(new InputStreamReader(process.getInputStream()))
+    new Thread() {
+      override def run() {
+        val exitCode = process.waitFor()
+        if(exitCode != 0) {
+          System.err.println("Process exited abnormally.")
+          while(process.getErrorStream.available > 0) {
+            System.err.write(process.getErrorStream().read())
+          }
+        }
+      }
+    }.start()
+    new BufferedReader(new InputStreamReader(process.getInputStream()), 10*1024*1024)
   }
   
   def produceMessages(brokerUrl: String, 
                       topics: Array[String], 
                       messages: Long, 
-                      dups: Int, 
-                      cleanup: Boolean): File = {
+                      dups: Int,
+                      percentDeletes: Int): File = {
     val producerProps = new Properties
     producerProps.setProperty("producer.type", "async")
     producerProps.setProperty("broker.list", brokerUrl)
@@ -174,36 +252,49 @@ object TestLogCleaning {
     val rand = new Random(1)
     val keyCount = (messages / dups).toInt
     val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt")
-    if(cleanup)
-      producedFile.deleteOnExit()
+    println("Logging produce requests to " + producedFile.getAbsolutePath)
     val producedWriter = new BufferedWriter(new FileWriter(producedFile), 1024*1024)
     for(i <- 0L until (messages * topics.length)) {
       val topic = topics((i % topics.length).toInt)
       val key = rand.nextInt(keyCount)
-      producer.send(KeyedMessage(topic = topic, key = key.toString, message = i.toString))
-      producedWriter.write("%s\t%s\t%s\n".format(topic, key, i))
+      val delete = i % 100 < percentDeletes
+      val msg = 
+        if(delete)
+          KeyedMessage[String, String](topic = topic, key = key.toString, message = null)
+        else
+          KeyedMessage[String, String](topic = topic, key = key.toString, message = i.toString)
+      producer.send(msg)
+      producedWriter.write(TestRecord(topic, key, i, delete).toString)
+      producedWriter.newLine()
     }
     producedWriter.close()
     producer.close()
     producedFile
   }
   
-  def consumeMessages(zkUrl: String, topics: Array[String], cleanup: Boolean): File = {
+  def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = {
     val consumerProps = new Properties
     consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
     consumerProps.setProperty("zk.connect", zkUrl)
-    consumerProps.setProperty("consumer.timeout.ms", (5*1000).toString)
-    val connector = new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps))
+    consumerProps.setProperty("consumer.timeout.ms", (10*1000).toString)
+    new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps))
+  }
+  
+  def consumeMessages(zkUrl: String, topics: Array[String]): File = {
+    val connector = makeConsumer(zkUrl, topics)
     val streams = connector.createMessageStreams(topics.map(topic => (topic, 1)).toMap, new StringDecoder, new StringDecoder)
     val consumedFile = File.createTempFile("kafka-log-cleaner-consumed-", ".txt")
-    if(cleanup)
-      consumedFile.deleteOnExit()
+    println("Logging consumed messages to " + consumedFile.getAbsolutePath)
     val consumedWriter = new BufferedWriter(new FileWriter(consumedFile))
     for(topic <- topics) {
       val stream = streams(topic).head
       try {
-        for(item <- stream)
-          consumedWriter.write("%s\t%s\t%s\n".format(topic, item.key, item.message))
+        for(item <- stream) {
+          val delete = item.message == null
+          val value = if(delete) -1L else item.message.toLong
+          consumedWriter.write(TestRecord(topic, item.key.toInt, value, delete).toString)
+          consumedWriter.newLine()
+        }
       } catch {
         case e: ConsumerTimeoutException => 
       }
@@ -213,4 +304,11 @@ object TestLogCleaning {
     consumedFile
   }
   
+}
+
+case class TestRecord(val topic: String, val key: Int, val value: Long, val delete: Boolean) {
+  def this(pieces: Array[String]) = this(pieces(0), pieces(1).toInt, pieces(2).toLong, pieces(3) == "d")
+  def this(line: String) = this(line.split("\t"))
+  override def toString() = topic + "\t" +  key + "\t" + value + "\t" + (if(delete) "d" else "u")
+  def topicAndKey = topic + key
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index cce2319..4619d86 100644
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -52,7 +52,7 @@ class CleanerTest extends JUnitSuite {
     
     // append messages to the log until we have four segments
     while(log.numberOfSegments < 4)
-      log.append(messages(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
     val keysFound = keysInLog(log)
     assertEquals((0L until log.logEndOffset), keysFound)
     
@@ -62,14 +62,38 @@ class CleanerTest extends JUnitSuite {
     keys.foreach(k => map.put(key(k), Long.MaxValue))
     
     // clean the log
-    cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0)
+    cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0, 0L)
     val shouldRemain = keysInLog(log).filter(!keys.contains(_))
     assertEquals(shouldRemain, keysInLog(log))
   }
   
+  @Test
+  def testCleaningWithDeletes() {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val log = makeLog(config = logConfig.copy(segmentSize = 1024))
+    
+    // append messages with the keys 0 through N
+    while(log.numberOfSegments < 2)
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      
+    // delete all even keys between 0 and N
+    val leo = log.logEndOffset
+    for(key <- 0 until leo.toInt by 2)
+      log.append(deleteMessage(key))
+      
+    // append some new unique keys to pad out to a new active segment
+    while(log.numberOfSegments < 4)
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0))
+    val keys = keysInLog(log).toSet
+    assertTrue("None of the keys we deleted should still exist.", 
+               (0 until leo.toInt by 2).forall(!keys.contains(_)))
+  }
+  
   /* extract all the keys from a log */
   def keysInLog(log: Log): Iterable[Int] = 
-    log.logSegments.flatMap(s => s.log.map(m => Utils.readString(m.message.key).toInt))
+    log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).map(m => Utils.readString(m.message.key).toInt))
   
   
   /**
@@ -82,14 +106,14 @@ class CleanerTest extends JUnitSuite {
     
     // append messages to the log until we have four segments
     while(log.numberOfSegments < 2)
-      log.append(messages(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
       
     log.truncateTo(log.logEndOffset-2)
     val keys = keysInLog(log)
     val map = new FakeOffsetMap(Int.MaxValue)
     keys.foreach(k => map.put(key(k), Long.MaxValue))
     intercept[OptimisticLockFailureException] {
-      cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0)
+      cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0, 0L)
     }
   }
   
@@ -170,40 +194,34 @@ class CleanerTest extends JUnitSuite {
     checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt)
   }
   
-  /**
-   * Test that we don't exceed the maximum capacity of the offset map, that is that an offset map
-   * with a max size of 1000 will only clean 1000 new entries even if more than that are available.
-   */
-  @Test
-  def testBuildOffsetMapOverCapacity() {
-    val map = new FakeOffsetMap(1000)
-    val log = makeLog()
-    val cleaner = makeCleaner(Int.MaxValue)
-    val vals = 0 until 1001
-    val offsets = writeToLog(log, vals zip vals)
-    val lastOffset = cleaner.buildOffsetMap(log, vals.start, vals.end, map)
-    assertEquals("Shouldn't go beyond the capacity of the offset map.", 1000, lastOffset)
-  }
-  
   def makeLog(dir: File = dir, config: LogConfig = logConfig) =
     new Log(dir = dir, config = config, needsRecovery = false, scheduler = time.scheduler, time = time)
   
   def makeCleaner(capacity: Int) = 
-    new Cleaner(id = 0, new FakeOffsetMap(capacity), ioBufferSize = 64*1024, maxIoBufferSize = 64*1024, throttler = throttler, time = time)
+    new Cleaner(id = 0, 
+                offsetMap = new FakeOffsetMap(capacity), 
+                ioBufferSize = 64*1024, 
+                maxIoBufferSize = 64*1024,
+                dupBufferLoadFactor = 0.75,                
+                throttler = throttler, 
+                time = time)
   
   def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
     for((key, value) <- seq)
-      yield log.append(messages(key, value)).firstOffset
+      yield log.append(message(key, value)).firstOffset
   }
   
   def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
   
-  def messages(key: Int, value: Int) = 
+  def message(key: Int, value: Int) = 
     new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes))
   
+  def deleteMessage(key: Int) =
+    new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=null))
+  
 }
 
-class FakeOffsetMap(val capacity: Int) extends OffsetMap {
+class FakeOffsetMap(val slots: Int) extends OffsetMap {
   val map = new java.util.HashMap[String, Long]()
   
   private def keyFor(key: ByteBuffer) = 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 23e0e65..5658ed4 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -569,4 +569,17 @@ class LogTest extends JUnitSuite {
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
   }
   
+  @Test
+  def testAppendMessageWithNullPayload() {
+    var log = new Log(logDir,
+                      LogConfig(),
+                      needsRecovery = false,
+                      time.scheduler,
+                      time)
+    log.append(new ByteBufferMessageSet(new Message(bytes = null)))
+    val ms = log.read(0, 4096, None)
+    assertEquals(0, ms.head.offset)
+    assertTrue("Message payload should be null.", ms.head.message.isNull)
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
index 99a0c4b..12ce39e 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
@@ -29,11 +29,12 @@ class OffsetMapTest extends JUnitSuite {
     validateMap(10)
     validateMap(100)
     validateMap(1000)
+    validateMap(5000)
   }
   
   @Test
   def testClear() {
-    val map = new SkimpyOffsetMap(4000, 0.75)
+    val map = new SkimpyOffsetMap(4000)
     for(i <- 0 until 10)
       map.put(key(i), i)
     for(i <- 0 until 10)
@@ -43,45 +44,33 @@ class OffsetMapTest extends JUnitSuite {
       assertEquals(map.get(key(i)), -1L)
   }
   
-  @Test
-  def testCapacity() {
-    val map = new SkimpyOffsetMap(1024, 0.75)
-    var i = 0
-    while(map.size < map.capacity) {
-      map.put(key(i), i)
-      i += 1
-    }
-    // now the map is full, it should throw an exception
-    intercept[IllegalStateException] {
-      map.put(key(i), i)
-    }
-  }
-  
   def key(key: Int) = ByteBuffer.wrap(key.toString.getBytes)
   
-  def validateMap(items: Int) {
-    val map = new SkimpyOffsetMap(items * 2 * 24, 0.75)
+  def validateMap(items: Int, loadFactor: Double = 0.5): SkimpyOffsetMap = {
+    val map = new SkimpyOffsetMap((items/loadFactor * 24).toInt)
     for(i <- 0 until items)
       map.put(key(i), i)
     var misses = 0
-    for(i <- 0 until items) {
-      map.get(key(i)) match {
-        case -1L => misses += 1
-        case offset => assertEquals(i.toLong, offset) 
-      }
-    }
-    println("Miss rate: " + (misses.toDouble / items))
+    for(i <- 0 until items)
+      assertEquals(map.get(key(i)), i.toLong)
+    map
   }
   
 }
 
 object OffsetMapTest {
   def main(args: Array[String]) {
-    if(args.length != 1) {
-      System.err.println("USAGE: java OffsetMapTest size")
+    if(args.length != 2) {
+      System.err.println("USAGE: java OffsetMapTest size load")
       System.exit(1)
     }
     val test = new OffsetMapTest()
-    test.validateMap(args(0).toInt)
+    val size = args(0).toInt
+    val load = args(1).toDouble
+    val start = System.nanoTime
+    val map = test.validateMap(size, load)
+    val ellapsedMs = (System.nanoTime - start) / 1000.0 / 1000.0
+    println(map.size + " entries in map of size " + map.slots + " in " + ellapsedMs + " ms")
+    println("Collision rate: %.1f%%".format(100*map.collisionRate))
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index 883442d..4837585 100644
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -38,7 +38,7 @@ class MessageTest extends JUnitSuite {
   @Before
   def setUp(): Unit = {
     val keys = Array(null, "key".getBytes, "".getBytes)
-    val vals = Array("value".getBytes, "".getBytes)
+    val vals = Array("value".getBytes, "".getBytes, null)
     val codecs = Array(NoCompressionCodec, GZIPCompressionCodec)
     for(k <- keys; v <- vals; codec <- codecs)
       messages += new MessageTestVal(k, v, codec, new Message(v, k, codec))
@@ -47,7 +47,12 @@ class MessageTest extends JUnitSuite {
   @Test
   def testFieldValues {
     for(v <- messages) {
-      TestUtils.checkEquals(ByteBuffer.wrap(v.payload), v.message.payload)
+      if(v.payload == null) {
+        assertTrue(v.message.isNull)
+        assertEquals("Payload should be null", null, v.message.payload)
+      } else {
+        TestUtils.checkEquals(ByteBuffer.wrap(v.payload), v.message.payload)
+      }
       assertEquals(Message.CurrentMagicValue, v.message.magic)
       if(v.message.hasKey)
         TestUtils.checkEquals(ByteBuffer.wrap(v.key), v.message.key)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index e4b057e..507e6a8 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -288,7 +288,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
       case e: FailedToSendMessageException => /* success */
       case e: Exception => fail("Not expected", e)
     } finally {
-      producer.close
+      producer.close()
     }
     val t2 = SystemTime.milliseconds
 
@@ -296,5 +296,28 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     // we do this because the DefaultEventHandler retries a number of times
     assertTrue((t2-t1) >= timeoutMs*config.messageSendMaxRetries)
   }
+  
+  @Test
+  def testSendNullMessage() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
+    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+    
+    val config = new ProducerConfig(props)
+    val producer = new Producer[String, String](config)
+    try {
+
+      // create topic
+      AdminUtils.createTopic(zkClient, "new-topic", 2, 1)
+      assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
+        AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
+    
+      producer.send(new KeyedMessage[String, String]("new-topic", "key", null))
+    } finally {
+      producer.close()
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
new file mode 100644
index 0000000..02054c6
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
@@ -0,0 +1,41 @@
+package kafka.utils
+
+import junit.framework.Assert._
+import org.scalatest.Assertions
+import org.junit.{Test, After, Before}
+
+class IteratorTemplateTest extends Assertions {
+  
+  val lst = (0 until 10).toSeq
+  val iterator = new IteratorTemplate[Int]() {
+    var i = 0
+    override def makeNext() = {
+      if(i >= lst.size) {
+        allDone()
+      } else {
+        val item = lst(i)
+        i += 1
+        item
+      }
+    }
+  }
+
+  @Test
+  def testIterator() {
+    for(i <- 0 until 10) {
+      assertEquals("We should have an item to read.", true, iterator.hasNext)
+      assertEquals("Checking again shouldn't change anything.", true, iterator.hasNext)
+      assertEquals("Peeking at the item should show the right thing.", i, iterator.peek)
+      assertEquals("Peeking again shouldn't change anything", i, iterator.peek)
+      assertEquals("Getting the item should give the right thing.", i, iterator.next)
+    }
+    assertEquals("All gone!", false, iterator.hasNext)
+    intercept[NoSuchElementException] {
+      iterator.peek
+    }
+    intercept[NoSuchElementException] {
+      iterator.next
+    }
+  }
+  
+}
\ No newline at end of file