You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/03/03 19:43:50 UTC

kafka git commit: KAFKA-1755; Reject compressed and unkeyed messages sent to compacted topics; reviewed by Mayuresh Gharat, Neha Narkhede and Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk 0636928d9 -> 1cd6ed9e2


KAFKA-1755; Reject compressed and unkeyed messages sent to compacted topics; reviewed by Mayuresh Gharat, Neha Narkhede and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cd6ed9e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cd6ed9e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cd6ed9e

Branch: refs/heads/trunk
Commit: 1cd6ed9e2c07a63474ed80a8224bd431d5d4243c
Parents: 0636928
Author: Joel Koshy <jj...@gmail.com>
Authored: Tue Mar 3 10:44:04 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Mar 3 10:44:04 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         |   2 +-
 core/src/main/scala/kafka/log/LogCleaner.scala  |  49 +++--
 .../scala/kafka/log/LogCleanerManager.scala     |  12 +-
 .../kafka/message/ByteBufferMessageSet.scala    |  44 ++++-
 .../main/scala/kafka/server/OffsetManager.scala |   1 +
 .../test/scala/unit/kafka/log/CleanerTest.scala |  43 ++++-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 186 ++++++++++++-------
 .../message/ByteBufferMessageSetTest.scala      |   4 +-
 8 files changed, 235 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0c4efa8..06b8ecc 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -288,7 +288,7 @@ class Log(val dir: File,
           // assign offsets to the message set
           val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
           try {
-            validMessages = validMessages.assignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec)
+            validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact)
           } catch {
             case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index f8e7cd5..5991428 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -133,7 +133,7 @@ class LogCleaner(val config: CleanerConfig,
    * Update checkpoint file, removing topics and partitions that no longer exist
    */
   def updateCheckpoints(dataDir: File) {
-    cleanerManager.updateCheckpoints(dataDir, update=None);
+    cleanerManager.updateCheckpoints(dataDir, update=None)
   }
 
   /**
@@ -152,8 +152,7 @@ class LogCleaner(val config: CleanerConfig,
   }
 
   /**
-   * TODO:
-   * For testing, a way to know when work has completed. This method blocks until the 
+   * For testing, a way to know when work has completed. This method blocks until the
    * cleaner has processed up to the given offset on the specified topic/partition
    */
   def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit = {
@@ -243,7 +242,7 @@ class LogCleaner(val config: CleanerConfig,
         "\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead), 
                                                                                            stats.elapsedIndexSecs, 
                                                                                            mb(stats.mapBytesRead)/stats.elapsedIndexSecs, 
-                                                                                           100 * stats.elapsedIndexSecs.toDouble/stats.elapsedSecs) + 
+                                                                                           100 * stats.elapsedIndexSecs/stats.elapsedSecs) +
         "\tBuffer utilization: %.1f%%%n".format(100 * stats.bufferUtilization) +
         "\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead), 
                                                                                            stats.elapsedSecs - stats.elapsedIndexSecs, 
@@ -253,6 +252,9 @@ class LogCleaner(val config: CleanerConfig,
         "\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead), 
                                                                    100.0 * (1.0 - stats.messagesWritten.toDouble/stats.messagesRead))
       info(message)
+      if (stats.invalidMessagesRead > 0) {
+        warn("\tFound %d invalid messages during compaction.".format(stats.invalidMessagesRead))
+      }
     }
    
   }
@@ -374,7 +376,7 @@ private[log] class Cleaner(val id: Int,
     } catch {
       case e: LogCleaningAbortedException =>
         cleaned.delete()
-        throw  e
+        throw e
     }
   }
 
@@ -407,17 +409,20 @@ private[log] class Cleaner(val id: Int,
         position += size
         stats.readMessage(size)
         val key = entry.message.key
-        require(key != null, "Found null key in log segment %s which is marked as dedupe.".format(source.log.file.getAbsolutePath))
-        val foundOffset = map.get(key)
-        /* two cases in which we can get rid of a message:
-         *   1) if there exists a message with the same key but higher offset
-         *   2) if the message is a delete "tombstone" marker and enough time has passed
-         */
-        val redundant = foundOffset >= 0 && entry.offset < foundOffset
-        val obsoleteDelete = !retainDeletes && entry.message.isNull
-        if (!redundant && !obsoleteDelete) {
-          ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
-          stats.recopyMessage(size)
+        if (key != null) {
+          val foundOffset = map.get(key)
+          /* two cases in which we can get rid of a message:
+           *   1) if there exists a message with the same key but higher offset
+           *   2) if the message is a delete "tombstone" marker and enough time has passed
+           */
+          val redundant = foundOffset >= 0 && entry.offset < foundOffset
+          val obsoleteDelete = !retainDeletes && entry.message.isNull
+          if (!redundant && !obsoleteDelete) {
+            ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
+            stats.recopyMessage(size)
+          }
+        } else {
+          stats.invalidMessage()
         }
       }
       // if any messages are to be retained, write them out
@@ -536,10 +541,10 @@ private[log] class Cleaner(val id: Int,
       val startPosition = position
       for (entry <- messages) {
         val message = entry.message
-        require(message.hasKey)
         val size = MessageSet.entrySize(message)
         position += size
-        map.put(message.key, entry.offset)
+        if (message.hasKey)
+          map.put(message.key, entry.offset)
         offset = entry.offset
         stats.indexMessage(size)
       }
@@ -556,7 +561,8 @@ private[log] class Cleaner(val id: Int,
  * A simple struct for collecting stats about log cleaning
  */
 private case class CleanerStats(time: Time = SystemTime) {
-  var startTime, mapCompleteTime, endTime, bytesRead, bytesWritten, mapBytesRead, mapMessagesRead, messagesRead, messagesWritten = 0L
+  var startTime, mapCompleteTime, endTime, bytesRead, bytesWritten, mapBytesRead, mapMessagesRead, messagesRead,
+      messagesWritten, invalidMessagesRead = 0L
   var bufferUtilization = 0.0d
   clear()
   
@@ -564,6 +570,10 @@ private case class CleanerStats(time: Time = SystemTime) {
     messagesRead += 1
     bytesRead += size
   }
+
+  def invalidMessage() {
+    invalidMessagesRead += 1
+  }
   
   def recopyMessage(size: Int) {
     messagesWritten += 1
@@ -596,6 +606,7 @@ private case class CleanerStats(time: Time = SystemTime) {
     mapBytesRead = 0L
     mapMessagesRead = 0L
     messagesRead = 0L
+    invalidMessagesRead = 0L
     messagesWritten = 0L
     bufferUtilization = 0.0d
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index fd87d90..351824b 100644
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -122,8 +122,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
     inLock(lock) {
       abortAndPauseCleaning(topicAndPartition)
       resumeCleaning(topicAndPartition)
-      info("The cleaning for partition %s is aborted".format(topicAndPartition))
     }
+    info("The cleaning for partition %s is aborted".format(topicAndPartition))
   }
 
   /**
@@ -152,8 +152,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
       }
       while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
         pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
-      info("The cleaning for partition %s is aborted and paused".format(topicAndPartition))
     }
+    info("The cleaning for partition %s is aborted and paused".format(topicAndPartition))
   }
 
   /**
@@ -181,14 +181,14 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
   /**
    *  Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call.
    */
-  def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = {
+  private def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = {
     inProgress.get(topicAndPartition) match {
-      case None => return false
+      case None => false
       case Some(state) =>
         if (state == expectedState)
-          return true
+          true
         else
-          return false
+          false
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index f46ad5c..9c69471 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -199,24 +199,48 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
   }
 
   /**
-   * Update the offsets for this message set. This method attempts to do an in-place conversion
-   * if there is no compression, but otherwise recopies the messages
+   * Update the offsets for this message set and do further validation on messages. This method attempts to do an
+   * in-place conversion if there is no compression, but otherwise recopies the messages
    */
-  private[kafka] def assignOffsets(offsetCounter: AtomicLong, sourceCodec: CompressionCodec, targetCodec: CompressionCodec): ByteBufferMessageSet = {
+  private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: AtomicLong,
+                                                      sourceCodec: CompressionCodec,
+                                                      targetCodec: CompressionCodec,
+                                                      compactedTopic: Boolean = false): ByteBufferMessageSet = {
     if(sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
-      // do an in-place conversion
-      var position = 0
+      // do in-place validation and offset assignment
+      var messagePosition = 0
       buffer.mark()
-      while(position < sizeInBytes - MessageSet.LogOverhead) {
-        buffer.position(position)
+      while(messagePosition < sizeInBytes - MessageSet.LogOverhead) {
+        buffer.position(messagePosition)
         buffer.putLong(offsetCounter.getAndIncrement())
-        position += MessageSet.LogOverhead + buffer.getInt()
+        val messageSize = buffer.getInt()
+        val positionAfterKeySize = buffer.position + Message.KeySizeOffset + Message.KeySizeLength
+        if (compactedTopic && positionAfterKeySize < sizeInBytes) {
+          buffer.position(buffer.position() + Message.KeySizeOffset)
+          val keySize = buffer.getInt()
+          if (keySize <= 0) {
+            buffer.reset()
+            throw new InvalidMessageException("Compacted topic cannot accept message without key.")
+          }
+        }
+        messagePosition += MessageSet.LogOverhead + messageSize
       }
       buffer.reset()
       this
     } else {
-      // messages are compressed, crack open the messageset and recompress with correct offset
-      val messages = this.internalIterator(isShallow = false).map(_.message)
+      if (compactedTopic && targetCodec != NoCompressionCodec)
+        throw new InvalidMessageException("Compacted topic cannot accept compressed messages. " +
+          "Either the producer sent a compressed message or the topic has been configured with a broker-side compression codec.")
+      // We need to crack open the message-set if any of these are true:
+      // (i) messages are compressed,
+      // (ii) this message-set is sent to a compacted topic (and so we need to verify that each message has a key)
+      // If the broker is configured with a target compression codec then we need to recompress regardless of original codec
+      val messages = this.internalIterator(isShallow = false).map(messageAndOffset => {
+        if (compactedTopic && !messageAndOffset.message.hasKey)
+          throw new InvalidMessageException("Compacted topic cannot accept message without key.")
+
+        messageAndOffset.message
+      })
       new ByteBufferMessageSet(compressionCodec = targetCodec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 83d5264..c602a80 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -168,6 +168,7 @@ class OffsetManager(val config: OffsetManagerConfig,
     val props = new Properties
     props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString)
     props.put(LogConfig.CleanupPolicyProp, "compact")
+    props.put(LogConfig.CompressionTypeProp, "none")
     props
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index d10e4f4..70f0488 100644
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -90,10 +90,42 @@ class CleanerTest extends JUnitSuite {
     assertTrue("None of the keys we deleted should still exist.", 
                (0 until leo.toInt by 2).forall(!keys.contains(_)))
   }
+
+  @Test
+  def testCleaningWithUnkeyedMessages {
+    val cleaner = makeCleaner(Int.MaxValue)
+
+    // create a log with compaction turned off so we can append unkeyed messages
+    val log = makeLog(config = logConfig.copy(segmentSize = 1024, compact = false))
+
+    // append messages with unkeyed messages
+    while(log.numberOfSegments < 2)
+      log.append(unkeyedMessage(log.logEndOffset.toInt))
+    val numInvalidMessages = unkeyedMessageCountInLog(log)
+
+    val sizeWithUnkeyedMessages = log.size
+
+    // append messages with unkeyed messages
+    while(log.numberOfSegments < 3)
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+
+    val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
+
+    // turn on compaction and compact the log
+    val compactedLog = makeLog(config = logConfig.copy(segmentSize = 1024))
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0))
+
+    assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log))
+    assertEquals("Log should only contain keyed messages after cleaning.", expectedSizeAfterCleaning, log.size)
+    assertEquals("Cleaner should have seen %d invalid messages.", numInvalidMessages, cleaner.stats.invalidMessagesRead)
+  }
   
   /* extract all the keys from a log */
-  def keysInLog(log: Log): Iterable[Int] = 
-    log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).map(m => Utils.readString(m.message.key).toInt))
+  def keysInLog(log: Log): Iterable[Int] =
+    log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => Utils.readString(m.message.key).toInt))
+
+  def unkeyedMessageCountInLog(log: Log) =
+    log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => !m.message.hasKey)).sum
 
   def abortCheckDone(topicAndPartition: TopicAndPartition) {
     throw new LogCleaningAbortedException()
@@ -130,7 +162,7 @@ class CleanerTest extends JUnitSuite {
     // append some messages to the log
     var i = 0
     while(log.numberOfSegments < 10) {
-      log.append(TestUtils.singleMessageSet("hello".getBytes))
+      log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
       i += 1
     }
     
@@ -220,7 +252,10 @@ class CleanerTest extends JUnitSuite {
   
   def message(key: Int, value: Int) = 
     new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes))
-  
+
+  def unkeyedMessage(value: Int) =
+    new ByteBufferMessageSet(new Message(bytes=value.toString.getBytes))
+
   def deleteMessage(key: Int) =
     new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=null))
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index c2dd8eb..1a4be70 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -62,10 +62,10 @@ class LogTest extends JUnitSuite {
     val set = TestUtils.singleMessageSet("test".getBytes())
 
     // create a log
-    val log = new Log(logDir, 
-                      logConfig.copy(segmentMs = 1 * 60 * 60L), 
-                      recoveryPoint = 0L, 
-                      scheduler = time.scheduler, 
+    val log = new Log(logDir,
+                      logConfig.copy(segmentMs = 1 * 60 * 60L),
+                      recoveryPoint = 0L,
+                      scheduler = time.scheduler,
                       time = time)
     assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
     time.sleep(log.config.segmentMs + 1)
@@ -151,7 +151,7 @@ class LogTest extends JUnitSuite {
   def testAppendAndReadWithSequentialOffsets() {
     val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time)
     val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
-    
+
     for(i <- 0 until messages.length)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i)))
     for(i <- 0 until messages.length) {
@@ -161,7 +161,7 @@ class LogTest extends JUnitSuite {
     }
     assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).messageSet.size)
   }
-  
+
   /**
    * This test appends a bunch of messages with non-sequential offsets and checks that we can read the correct message
    * from any offset less than the logEndOffset including offsets not appended.
@@ -171,7 +171,7 @@ class LogTest extends JUnitSuite {
     val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val messages = messageIds.map(id => new Message(id.toString.getBytes))
-    
+
     // now test the case that we give the offsets and use non-sequential offsets
     for(i <- 0 until messages.length)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false)
@@ -182,27 +182,27 @@ class LogTest extends JUnitSuite {
       assertEquals("Message should match appended.", messages(idx), read.message)
     }
   }
-  
+
   /**
    * This test covers an odd case where we have a gap in the offsets that falls at the end of a log segment.
    * Specifically we create a log where the last message in the first segment has offset 0. If we
-   * then read offset 1, we should expect this read to come from the second segment, even though the 
+   * then read offset 1, we should expect this read to come from the second segment, even though the
    * first segment has the greatest lower bound on the offset.
    */
   @Test
   def testReadAtLogGap() {
     val log = new Log(logDir, logConfig.copy(segmentSize = 300), recoveryPoint = 0L, time.scheduler, time = time)
-    
+
     // keep appending until we have two segments with only a single message in the second segment
     while(log.numberOfSegments == 1)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes))) 
-    
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
+
     // now manually truncate off all but one message from the first segment to create a gap in the messages
     log.logSegments.head.truncateTo(1)
-    
+
     assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).messageSet.head.offset)
   }
-  
+
   /**
    * Test reading at the boundary of the log, specifically
    * - reading from the logEndOffset should give an empty message set
@@ -250,13 +250,13 @@ class LogTest extends JUnitSuite {
     }
     val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).messageSet
     assertEquals("Should be no more messages", 0, lastRead.size)
-    
+
     // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure
     TestUtils.retry(1000L){
       assertTrue("Log role should have forced flush", log.recoveryPoint >= log.activeSegment.baseOffset)
     }
   }
-  
+
   /**
    * Test reads at offsets that fall within compressed message set boundaries.
    */
@@ -264,20 +264,20 @@ class LogTest extends JUnitSuite {
   def testCompressedMessages() {
     /* this log should roll after every messageset */
     val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time)
-    
+
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
-    
+
     def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).messageSet.head.message)
-    
+
     /* we should always get the first message in the compressed set when reading any offset in the set */
     assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset)
     assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset)
     assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset)
     assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset)
   }
-  
+
   /**
    * Test garbage collecting old segments
    */
@@ -289,7 +289,7 @@ class LogTest extends JUnitSuite {
       val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time)
       for(i <- 0 until messagesToAppend)
         log.append(TestUtils.singleMessageSet(i.toString.getBytes))
-      
+
       var currOffset = log.logEndOffset
       assertEquals(currOffset, messagesToAppend)
 
@@ -300,10 +300,10 @@ class LogTest extends JUnitSuite {
       assertEquals("We should still have one segment left", 1, log.numberOfSegments)
       assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true))
       assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
-      assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", 
+      assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append",
                    currOffset,
                    log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset)
-      
+
       // cleanup the log
       log.delete()
     }
@@ -328,6 +328,64 @@ class LogTest extends JUnitSuite {
     }
   }
 
+  @Test
+  def testCompactedTopicConstraints() {
+    val keyedMessage = new Message(bytes = "this message has a key".getBytes, key = "and here it is".getBytes)
+    val anotherKeyedMessage = new Message(bytes = "this message also has a key".getBytes, key ="another key".getBytes)
+    val unkeyedMessage = new Message(bytes = "this message does not have a key".getBytes)
+
+    val messageSetWithUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage, keyedMessage)
+    val messageSetWithOneUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage)
+    val messageSetWithCompressedKeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage)
+
+    val messageSetWithKeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage)
+    val messageSetWithKeyedMessages = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage, anotherKeyedMessage)
+
+    val log = new Log(logDir, logConfig.copy(compact = true), recoveryPoint = 0L, time.scheduler, time)
+
+    try {
+      log.append(messageSetWithUnkeyedMessage)
+      fail("Compacted topics cannot accept a message without a key.")
+    } catch {
+      case e: InvalidMessageException => // this is good
+    }
+    try {
+      log.append(messageSetWithOneUnkeyedMessage)
+      fail("Compacted topics cannot accept a message without a key.")
+    } catch {
+      case e: InvalidMessageException => // this is good
+    }
+    try {
+      log.append(messageSetWithCompressedKeyedMessage)
+      fail("Compacted topics cannot accept compressed messages.")
+    } catch {
+      case e: InvalidMessageException => // this is good
+    }
+
+    // the following should succeed without any InvalidMessageException
+    log.append(messageSetWithKeyedMessage)
+    log.append(messageSetWithKeyedMessages)
+
+    // test that a compacted topic with broker-side compression type set to uncompressed can accept compressed messages
+    val uncompressedLog = new Log(logDir, logConfig.copy(compact = true, compressionType = "uncompressed"),
+                                  recoveryPoint = 0L, time.scheduler, time)
+    uncompressedLog.append(messageSetWithCompressedKeyedMessage)
+    uncompressedLog.append(messageSetWithKeyedMessage)
+    uncompressedLog.append(messageSetWithKeyedMessages)
+    try {
+      uncompressedLog.append(messageSetWithUnkeyedMessage)
+      fail("Compacted topics cannot accept a message without a key.")
+    } catch {
+      case e: InvalidMessageException => // this is good
+    }
+    try {
+      uncompressedLog.append(messageSetWithOneUnkeyedMessage)
+      fail("Compacted topics cannot accept a message without a key.")
+    } catch {
+      case e: InvalidMessageException => // this is good
+    }
+  }
+
   /**
    * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the
    * setting and checking that an exception is thrown.
@@ -369,13 +427,13 @@ class LogTest extends JUnitSuite {
     val numIndexEntries = log.activeSegment.index.entries
     val lastOffset = log.logEndOffset
     log.close()
-    
+
     log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
     assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
     assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
     log.close()
-    
+
     // test recovery case
     log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
@@ -383,7 +441,7 @@ class LogTest extends JUnitSuite {
     assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
     log.close()
   }
-  
+
   /**
    * Test that if we manually delete an index segment it is rebuilt when the log is re-opened
    */
@@ -397,12 +455,12 @@ class LogTest extends JUnitSuite {
       log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
     val indexFiles = log.logSegments.map(_.index.file)
     log.close()
-    
+
     // delete all the index files
     indexFiles.foreach(_.delete())
-    
+
     // reopen the log
-    log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)    
+    log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages)
       assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
@@ -425,10 +483,10 @@ class LogTest extends JUnitSuite {
 
     for (i<- 1 to msgPerSeg)
       log.append(set)
-    
+
     assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
     assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset)
-    
+
     val lastOffset = log.logEndOffset
     val size = log.size
     log.truncateTo(log.logEndOffset) // keep the entire log
@@ -446,7 +504,7 @@ class LogTest extends JUnitSuite {
 
     for (i<- 1 to msgPerSeg)
       log.append(set)
-    
+
     assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
     assertEquals("Should be back to original size", log.size, size)
     log.truncateFullyAndStartAt(log.logEndOffset - (msgPerSeg - 1))
@@ -497,23 +555,23 @@ class LogTest extends JUnitSuite {
   def testBogusIndexSegmentsAreRemoved() {
     val bogusIndex1 = Log.indexFilename(logDir, 0)
     val bogusIndex2 = Log.indexFilename(logDir, 5)
-    
+
     val set = TestUtils.singleMessageSet("test".getBytes())
-    val log = new Log(logDir, 
-                      logConfig.copy(segmentSize = set.sizeInBytes * 5, 
-                                     maxIndexSize = 1000, 
+    val log = new Log(logDir,
+                      logConfig.copy(segmentSize = set.sizeInBytes * 5,
+                                     maxIndexSize = 1000,
                                      indexInterval = 1),
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
-    
+
     assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
     assertFalse("The second index file should have been deleted.", bogusIndex2.exists)
-    
+
     // check that we can append to the log
     for(i <- 0 until 10)
       log.append(set)
-      
+
     log.delete()
   }
 
@@ -523,22 +581,22 @@ class LogTest extends JUnitSuite {
   @Test
   def testReopenThenTruncate() {
     val set = TestUtils.singleMessageSet("test".getBytes())
-    val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, 
-                                maxIndexSize = 1000, 
+    val config = logConfig.copy(segmentSize = set.sizeInBytes * 5,
+                                maxIndexSize = 1000,
                                 indexInterval = 10000)
 
     // create a log
-    var log = new Log(logDir, 
+    var log = new Log(logDir,
                       config,
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
-    
+
     // add enough messages to roll over several segments then close and re-open and attempt to truncate
     for(i <- 0 until 100)
       log.append(set)
     log.close()
-    log = new Log(logDir, 
+    log = new Log(logDir,
                   config,
                   recoveryPoint = 0L,
                   time.scheduler,
@@ -547,7 +605,7 @@ class LogTest extends JUnitSuite {
     assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
     assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
   }
-  
+
   /**
    * Test that deleted files are deleted after the appropriate time.
    */
@@ -555,38 +613,38 @@ class LogTest extends JUnitSuite {
   def testAsyncDelete() {
     val set = TestUtils.singleMessageSet("test".getBytes())
     val asyncDeleteMs = 1000
-    val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, 
-                                fileDeleteDelayMs = asyncDeleteMs, 
-                                maxIndexSize = 1000, 
+    val config = logConfig.copy(segmentSize = set.sizeInBytes * 5,
+                                fileDeleteDelayMs = asyncDeleteMs,
+                                maxIndexSize = 1000,
                                 indexInterval = 10000)
     val log = new Log(logDir,
                       config,
-                      recoveryPoint = 0L,                      
+                      recoveryPoint = 0L,
                       time.scheduler,
                       time)
-    
+
     // append some messages to create some segments
     for(i <- 0 until 100)
       log.append(set)
-    
+
     // files should be renamed
     val segments = log.logSegments.toArray
     val oldFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
     log.deleteOldSegments((s) => true)
-    
+
     assertEquals("Only one segment should remain.", 1, log.numberOfSegments)
-    assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) && 
+    assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) &&
                                                                  segments.forall(_.index.file.getName.endsWith(Log.DeletedFileSuffix)))
     assertTrue("The .deleted files should still be there.", segments.forall(_.log.file.exists) &&
                                                             segments.forall(_.index.file.exists))
     assertTrue("The original file should be gone.", oldFiles.forall(!_.exists))
-    
+
     // when enough time passes the files should be deleted
     val deletedFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
     time.sleep(asyncDeleteMs + 1)
     assertTrue("Files should all be gone.", deletedFiles.forall(!_.exists))
   }
-  
+
   /**
    * Any files ending in .deleted should be removed when the log is re-opened.
    */
@@ -599,22 +657,22 @@ class LogTest extends JUnitSuite {
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
-    
+
     // append some messages to create some segments
     for(i <- 0 until 100)
       log.append(set)
-    
+
     log.deleteOldSegments((s) => true)
     log.close()
-    
-    log = new Log(logDir, 
+
+    log = new Log(logDir,
                   config,
                   recoveryPoint = 0L,
                   time.scheduler,
                   time)
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
   }
-  
+
   @Test
   def testAppendMessageWithNullPayload() {
     val log = new Log(logDir,
@@ -627,9 +685,9 @@ class LogTest extends JUnitSuite {
     assertEquals(0, messageSet.head.offset)
     assertTrue("Message payload should be null.", messageSet.head.message.isNull)
   }
-  
+
   @Test
-  def testCorruptLog() {    
+  def testCorruptLog() {
     // append some messages to create some segments
     val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000)
     val set = TestUtils.singleMessageSet("test".getBytes())
@@ -647,11 +705,11 @@ class LogTest extends JUnitSuite {
         log.append(set)
       val messages = log.logSegments.flatMap(_.log.iterator.toList)
       log.close()
-      
+
       // corrupt index and log by appending random bytes
       TestUtils.appendNonsenseToFile(log.activeSegment.index.file, TestUtils.random.nextInt(1024) + 1)
       TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1)
-      
+
       // attempt recovery
       log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
       assertEquals(numMessages, log.logEndOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 73a2637..07bc317 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -147,11 +147,11 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     // check uncompressed offsets 
     checkOffsets(messages, 0)
     var offset = 1234567
-    checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec, NoCompressionCodec), offset)
+    checkOffsets(messages.validateMessagesAndAssignOffsets(new AtomicLong(offset), NoCompressionCodec, NoCompressionCodec), offset)
 
     // check compressed messages
     checkOffsets(compressedMessages, 0)
-    checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec, DefaultCompressionCodec), offset)
+    checkOffsets(compressedMessages.validateMessagesAndAssignOffsets(new AtomicLong(offset), DefaultCompressionCodec, DefaultCompressionCodec), offset)
   }
   
   /* check that offsets are assigned based on byte offset from the given base offset */