You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/03/03 19:43:50 UTC
kafka git commit: KAFKA-1755;
Reject compressed and unkeyed messages sent to compacted topics;
reviewed by Mayuresh Gharat, Neha Narkhede and Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk 0636928d9 -> 1cd6ed9e2
KAFKA-1755; Reject compressed and unkeyed messages sent to compacted topics; reviewed by Mayuresh Gharat, Neha Narkhede and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cd6ed9e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cd6ed9e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cd6ed9e
Branch: refs/heads/trunk
Commit: 1cd6ed9e2c07a63474ed80a8224bd431d5d4243c
Parents: 0636928
Author: Joel Koshy <jj...@gmail.com>
Authored: Tue Mar 3 10:44:04 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Mar 3 10:44:04 2015 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/log/Log.scala | 2 +-
core/src/main/scala/kafka/log/LogCleaner.scala | 49 +++--
.../scala/kafka/log/LogCleanerManager.scala | 12 +-
.../kafka/message/ByteBufferMessageSet.scala | 44 ++++-
.../main/scala/kafka/server/OffsetManager.scala | 1 +
.../test/scala/unit/kafka/log/CleanerTest.scala | 43 ++++-
.../src/test/scala/unit/kafka/log/LogTest.scala | 186 ++++++++++++-------
.../message/ByteBufferMessageSetTest.scala | 4 +-
8 files changed, 235 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0c4efa8..06b8ecc 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -288,7 +288,7 @@ class Log(val dir: File,
// assign offsets to the message set
val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
try {
- validMessages = validMessages.assignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec)
+ validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact)
} catch {
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index f8e7cd5..5991428 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -133,7 +133,7 @@ class LogCleaner(val config: CleanerConfig,
* Update checkpoint file, removing topics and partitions that no longer exist
*/
def updateCheckpoints(dataDir: File) {
- cleanerManager.updateCheckpoints(dataDir, update=None);
+ cleanerManager.updateCheckpoints(dataDir, update=None)
}
/**
@@ -152,8 +152,7 @@ class LogCleaner(val config: CleanerConfig,
}
/**
- * TODO:
- * For testing, a way to know when work has completed. This method blocks until the
+ * For testing, a way to know when work has completed. This method blocks until the
* cleaner has processed up to the given offset on the specified topic/partition
*/
def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit = {
@@ -243,7 +242,7 @@ class LogCleaner(val config: CleanerConfig,
"\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead),
stats.elapsedIndexSecs,
mb(stats.mapBytesRead)/stats.elapsedIndexSecs,
- 100 * stats.elapsedIndexSecs.toDouble/stats.elapsedSecs) +
+ 100 * stats.elapsedIndexSecs/stats.elapsedSecs) +
"\tBuffer utilization: %.1f%%%n".format(100 * stats.bufferUtilization) +
"\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead),
stats.elapsedSecs - stats.elapsedIndexSecs,
@@ -253,6 +252,9 @@ class LogCleaner(val config: CleanerConfig,
"\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead),
100.0 * (1.0 - stats.messagesWritten.toDouble/stats.messagesRead))
info(message)
+ if (stats.invalidMessagesRead > 0) {
+ warn("\tFound %d invalid messages during compaction.".format(stats.invalidMessagesRead))
+ }
}
}
@@ -374,7 +376,7 @@ private[log] class Cleaner(val id: Int,
} catch {
case e: LogCleaningAbortedException =>
cleaned.delete()
- throw e
+ throw e
}
}
@@ -407,17 +409,20 @@ private[log] class Cleaner(val id: Int,
position += size
stats.readMessage(size)
val key = entry.message.key
- require(key != null, "Found null key in log segment %s which is marked as dedupe.".format(source.log.file.getAbsolutePath))
- val foundOffset = map.get(key)
- /* two cases in which we can get rid of a message:
- * 1) if there exists a message with the same key but higher offset
- * 2) if the message is a delete "tombstone" marker and enough time has passed
- */
- val redundant = foundOffset >= 0 && entry.offset < foundOffset
- val obsoleteDelete = !retainDeletes && entry.message.isNull
- if (!redundant && !obsoleteDelete) {
- ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
- stats.recopyMessage(size)
+ if (key != null) {
+ val foundOffset = map.get(key)
+ /* two cases in which we can get rid of a message:
+ * 1) if there exists a message with the same key but higher offset
+ * 2) if the message is a delete "tombstone" marker and enough time has passed
+ */
+ val redundant = foundOffset >= 0 && entry.offset < foundOffset
+ val obsoleteDelete = !retainDeletes && entry.message.isNull
+ if (!redundant && !obsoleteDelete) {
+ ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
+ stats.recopyMessage(size)
+ }
+ } else {
+ stats.invalidMessage()
}
}
// if any messages are to be retained, write them out
@@ -536,10 +541,10 @@ private[log] class Cleaner(val id: Int,
val startPosition = position
for (entry <- messages) {
val message = entry.message
- require(message.hasKey)
val size = MessageSet.entrySize(message)
position += size
- map.put(message.key, entry.offset)
+ if (message.hasKey)
+ map.put(message.key, entry.offset)
offset = entry.offset
stats.indexMessage(size)
}
@@ -556,7 +561,8 @@ private[log] class Cleaner(val id: Int,
* A simple struct for collecting stats about log cleaning
*/
private case class CleanerStats(time: Time = SystemTime) {
- var startTime, mapCompleteTime, endTime, bytesRead, bytesWritten, mapBytesRead, mapMessagesRead, messagesRead, messagesWritten = 0L
+ var startTime, mapCompleteTime, endTime, bytesRead, bytesWritten, mapBytesRead, mapMessagesRead, messagesRead,
+ messagesWritten, invalidMessagesRead = 0L
var bufferUtilization = 0.0d
clear()
@@ -564,6 +570,10 @@ private case class CleanerStats(time: Time = SystemTime) {
messagesRead += 1
bytesRead += size
}
+
+ def invalidMessage() {
+ invalidMessagesRead += 1
+ }
def recopyMessage(size: Int) {
messagesWritten += 1
@@ -596,6 +606,7 @@ private case class CleanerStats(time: Time = SystemTime) {
mapBytesRead = 0L
mapMessagesRead = 0L
messagesRead = 0L
+ invalidMessagesRead = 0L
messagesWritten = 0L
bufferUtilization = 0.0d
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index fd87d90..351824b 100644
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -122,8 +122,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
inLock(lock) {
abortAndPauseCleaning(topicAndPartition)
resumeCleaning(topicAndPartition)
- info("The cleaning for partition %s is aborted".format(topicAndPartition))
}
+ info("The cleaning for partition %s is aborted".format(topicAndPartition))
}
/**
@@ -152,8 +152,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
}
while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
- info("The cleaning for partition %s is aborted and paused".format(topicAndPartition))
}
+ info("The cleaning for partition %s is aborted and paused".format(topicAndPartition))
}
/**
@@ -181,14 +181,14 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
/**
* Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call.
*/
- def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = {
+ private def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = {
inProgress.get(topicAndPartition) match {
- case None => return false
+ case None => false
case Some(state) =>
if (state == expectedState)
- return true
+ true
else
- return false
+ false
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index f46ad5c..9c69471 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -199,24 +199,48 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
}
/**
- * Update the offsets for this message set. This method attempts to do an in-place conversion
- * if there is no compression, but otherwise recopies the messages
+ * Update the offsets for this message set and do further validation on messages. This method attempts to do an
+ * in-place conversion if there is no compression, but otherwise recopies the messages
*/
- private[kafka] def assignOffsets(offsetCounter: AtomicLong, sourceCodec: CompressionCodec, targetCodec: CompressionCodec): ByteBufferMessageSet = {
+ private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: AtomicLong,
+ sourceCodec: CompressionCodec,
+ targetCodec: CompressionCodec,
+ compactedTopic: Boolean = false): ByteBufferMessageSet = {
if(sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
- // do an in-place conversion
- var position = 0
+ // do in-place validation and offset assignment
+ var messagePosition = 0
buffer.mark()
- while(position < sizeInBytes - MessageSet.LogOverhead) {
- buffer.position(position)
+ while(messagePosition < sizeInBytes - MessageSet.LogOverhead) {
+ buffer.position(messagePosition)
buffer.putLong(offsetCounter.getAndIncrement())
- position += MessageSet.LogOverhead + buffer.getInt()
+ val messageSize = buffer.getInt()
+ val positionAfterKeySize = buffer.position + Message.KeySizeOffset + Message.KeySizeLength
+ if (compactedTopic && positionAfterKeySize < sizeInBytes) {
+ buffer.position(buffer.position() + Message.KeySizeOffset)
+ val keySize = buffer.getInt()
+ if (keySize <= 0) {
+ buffer.reset()
+ throw new InvalidMessageException("Compacted topic cannot accept message without key.")
+ }
+ }
+ messagePosition += MessageSet.LogOverhead + messageSize
}
buffer.reset()
this
} else {
- // messages are compressed, crack open the messageset and recompress with correct offset
- val messages = this.internalIterator(isShallow = false).map(_.message)
+ if (compactedTopic && targetCodec != NoCompressionCodec)
+ throw new InvalidMessageException("Compacted topic cannot accept compressed messages. " +
+ "Either the producer sent a compressed message or the topic has been configured with a broker-side compression codec.")
+ // We need to crack open the message-set if any of these are true:
+ // (i) messages are compressed,
+ // (ii) this message-set is sent to a compacted topic (and so we need to verify that each message has a key)
+ // If the broker is configured with a target compression codec then we need to recompress regardless of original codec
+ val messages = this.internalIterator(isShallow = false).map(messageAndOffset => {
+ if (compactedTopic && !messageAndOffset.message.hasKey)
+ throw new InvalidMessageException("Compacted topic cannot accept message without key.")
+
+ messageAndOffset.message
+ })
new ByteBufferMessageSet(compressionCodec = targetCodec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 83d5264..c602a80 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -168,6 +168,7 @@ class OffsetManager(val config: OffsetManagerConfig,
val props = new Properties
props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString)
props.put(LogConfig.CleanupPolicyProp, "compact")
+ props.put(LogConfig.CompressionTypeProp, "none")
props
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index d10e4f4..70f0488 100644
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -90,10 +90,42 @@ class CleanerTest extends JUnitSuite {
assertTrue("None of the keys we deleted should still exist.",
(0 until leo.toInt by 2).forall(!keys.contains(_)))
}
+
+ @Test
+ def testCleaningWithUnkeyedMessages {
+ val cleaner = makeCleaner(Int.MaxValue)
+
+ // create a log with compaction turned off so we can append unkeyed messages
+ val log = makeLog(config = logConfig.copy(segmentSize = 1024, compact = false))
+
+ // append messages with unkeyed messages
+ while(log.numberOfSegments < 2)
+ log.append(unkeyedMessage(log.logEndOffset.toInt))
+ val numInvalidMessages = unkeyedMessageCountInLog(log)
+
+ val sizeWithUnkeyedMessages = log.size
+
+ // append messages with unkeyed messages
+ while(log.numberOfSegments < 3)
+ log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+
+ val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
+
+ // turn on compaction and compact the log
+ val compactedLog = makeLog(config = logConfig.copy(segmentSize = 1024))
+ cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0))
+
+ assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log))
+ assertEquals("Log should only contain keyed messages after cleaning.", expectedSizeAfterCleaning, log.size)
+ assertEquals("Cleaner should have seen %d invalid messages.", numInvalidMessages, cleaner.stats.invalidMessagesRead)
+ }
/* extract all the keys from a log */
- def keysInLog(log: Log): Iterable[Int] =
- log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).map(m => Utils.readString(m.message.key).toInt))
+ def keysInLog(log: Log): Iterable[Int] =
+ log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => Utils.readString(m.message.key).toInt))
+
+ def unkeyedMessageCountInLog(log: Log) =
+ log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => !m.message.hasKey)).sum
def abortCheckDone(topicAndPartition: TopicAndPartition) {
throw new LogCleaningAbortedException()
@@ -130,7 +162,7 @@ class CleanerTest extends JUnitSuite {
// append some messages to the log
var i = 0
while(log.numberOfSegments < 10) {
- log.append(TestUtils.singleMessageSet("hello".getBytes))
+ log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
i += 1
}
@@ -220,7 +252,10 @@ class CleanerTest extends JUnitSuite {
def message(key: Int, value: Int) =
new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes))
-
+
+ def unkeyedMessage(value: Int) =
+ new ByteBufferMessageSet(new Message(bytes=value.toString.getBytes))
+
def deleteMessage(key: Int) =
new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=null))
http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index c2dd8eb..1a4be70 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -62,10 +62,10 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes())
// create a log
- val log = new Log(logDir,
- logConfig.copy(segmentMs = 1 * 60 * 60L),
- recoveryPoint = 0L,
- scheduler = time.scheduler,
+ val log = new Log(logDir,
+ logConfig.copy(segmentMs = 1 * 60 * 60L),
+ recoveryPoint = 0L,
+ scheduler = time.scheduler,
time = time)
assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
time.sleep(log.config.segmentMs + 1)
@@ -151,7 +151,7 @@ class LogTest extends JUnitSuite {
def testAppendAndReadWithSequentialOffsets() {
val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time)
val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
-
+
for(i <- 0 until messages.length)
log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i)))
for(i <- 0 until messages.length) {
@@ -161,7 +161,7 @@ class LogTest extends JUnitSuite {
}
assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).messageSet.size)
}
-
+
/**
* This test appends a bunch of messages with non-sequential offsets and checks that we can read the correct message
* from any offset less than the logEndOffset including offsets not appended.
@@ -171,7 +171,7 @@ class LogTest extends JUnitSuite {
val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val messages = messageIds.map(id => new Message(id.toString.getBytes))
-
+
// now test the case that we give the offsets and use non-sequential offsets
for(i <- 0 until messages.length)
log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false)
@@ -182,27 +182,27 @@ class LogTest extends JUnitSuite {
assertEquals("Message should match appended.", messages(idx), read.message)
}
}
-
+
/**
* This test covers an odd case where we have a gap in the offsets that falls at the end of a log segment.
* Specifically we create a log where the last message in the first segment has offset 0. If we
- * then read offset 1, we should expect this read to come from the second segment, even though the
+ * then read offset 1, we should expect this read to come from the second segment, even though the
* first segment has the greatest lower bound on the offset.
*/
@Test
def testReadAtLogGap() {
val log = new Log(logDir, logConfig.copy(segmentSize = 300), recoveryPoint = 0L, time.scheduler, time = time)
-
+
// keep appending until we have two segments with only a single message in the second segment
while(log.numberOfSegments == 1)
- log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
-
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
+
// now manually truncate off all but one message from the first segment to create a gap in the messages
log.logSegments.head.truncateTo(1)
-
+
assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).messageSet.head.offset)
}
-
+
/**
* Test reading at the boundary of the log, specifically
* - reading from the logEndOffset should give an empty message set
@@ -250,13 +250,13 @@ class LogTest extends JUnitSuite {
}
val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).messageSet
assertEquals("Should be no more messages", 0, lastRead.size)
-
+
// check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure
TestUtils.retry(1000L){
assertTrue("Log role should have forced flush", log.recoveryPoint >= log.activeSegment.baseOffset)
}
}
-
+
/**
* Test reads at offsets that fall within compressed message set boundaries.
*/
@@ -264,20 +264,20 @@ class LogTest extends JUnitSuite {
def testCompressedMessages() {
/* this log should roll after every messageset */
val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time)
-
+
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
-
+
def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).messageSet.head.message)
-
+
/* we should always get the first message in the compressed set when reading any offset in the set */
assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset)
assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset)
assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset)
assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset)
}
-
+
/**
* Test garbage collecting old segments
*/
@@ -289,7 +289,7 @@ class LogTest extends JUnitSuite {
val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time)
for(i <- 0 until messagesToAppend)
log.append(TestUtils.singleMessageSet(i.toString.getBytes))
-
+
var currOffset = log.logEndOffset
assertEquals(currOffset, messagesToAppend)
@@ -300,10 +300,10 @@ class LogTest extends JUnitSuite {
assertEquals("We should still have one segment left", 1, log.numberOfSegments)
assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true))
assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
- assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append",
+ assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append",
currOffset,
log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset)
-
+
// cleanup the log
log.delete()
}
@@ -328,6 +328,64 @@ class LogTest extends JUnitSuite {
}
}
+ @Test
+ def testCompactedTopicConstraints() {
+ val keyedMessage = new Message(bytes = "this message has a key".getBytes, key = "and here it is".getBytes)
+ val anotherKeyedMessage = new Message(bytes = "this message also has a key".getBytes, key ="another key".getBytes)
+ val unkeyedMessage = new Message(bytes = "this message does not have a key".getBytes)
+
+ val messageSetWithUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage, keyedMessage)
+ val messageSetWithOneUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage)
+ val messageSetWithCompressedKeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage)
+
+ val messageSetWithKeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage)
+ val messageSetWithKeyedMessages = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage, anotherKeyedMessage)
+
+ val log = new Log(logDir, logConfig.copy(compact = true), recoveryPoint = 0L, time.scheduler, time)
+
+ try {
+ log.append(messageSetWithUnkeyedMessage)
+ fail("Compacted topics cannot accept a message without a key.")
+ } catch {
+ case e: InvalidMessageException => // this is good
+ }
+ try {
+ log.append(messageSetWithOneUnkeyedMessage)
+ fail("Compacted topics cannot accept a message without a key.")
+ } catch {
+ case e: InvalidMessageException => // this is good
+ }
+ try {
+ log.append(messageSetWithCompressedKeyedMessage)
+ fail("Compacted topics cannot accept compressed messages.")
+ } catch {
+ case e: InvalidMessageException => // this is good
+ }
+
+ // the following should succeed without any InvalidMessageException
+ log.append(messageSetWithKeyedMessage)
+ log.append(messageSetWithKeyedMessages)
+
+ // test that a compacted topic with broker-side compression type set to uncompressed can accept compressed messages
+ val uncompressedLog = new Log(logDir, logConfig.copy(compact = true, compressionType = "uncompressed"),
+ recoveryPoint = 0L, time.scheduler, time)
+ uncompressedLog.append(messageSetWithCompressedKeyedMessage)
+ uncompressedLog.append(messageSetWithKeyedMessage)
+ uncompressedLog.append(messageSetWithKeyedMessages)
+ try {
+ uncompressedLog.append(messageSetWithUnkeyedMessage)
+ fail("Compacted topics cannot accept a message without a key.")
+ } catch {
+ case e: InvalidMessageException => // this is good
+ }
+ try {
+ uncompressedLog.append(messageSetWithOneUnkeyedMessage)
+ fail("Compacted topics cannot accept a message without a key.")
+ } catch {
+ case e: InvalidMessageException => // this is good
+ }
+ }
+
/**
* We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the
* setting and checking that an exception is thrown.
@@ -369,13 +427,13 @@ class LogTest extends JUnitSuite {
val numIndexEntries = log.activeSegment.index.entries
val lastOffset = log.logEndOffset
log.close()
-
+
log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
log.close()
-
+
// test recovery case
log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
@@ -383,7 +441,7 @@ class LogTest extends JUnitSuite {
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
log.close()
}
-
+
/**
* Test that if we manually delete an index segment it is rebuilt when the log is re-opened
*/
@@ -397,12 +455,12 @@ class LogTest extends JUnitSuite {
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
val indexFiles = log.logSegments.map(_.index.file)
log.close()
-
+
// delete all the index files
indexFiles.foreach(_.delete())
-
+
// reopen the log
- log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+ log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
for(i <- 0 until numMessages)
assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
@@ -425,10 +483,10 @@ class LogTest extends JUnitSuite {
for (i<- 1 to msgPerSeg)
log.append(set)
-
+
assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset)
-
+
val lastOffset = log.logEndOffset
val size = log.size
log.truncateTo(log.logEndOffset) // keep the entire log
@@ -446,7 +504,7 @@ class LogTest extends JUnitSuite {
for (i<- 1 to msgPerSeg)
log.append(set)
-
+
assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
assertEquals("Should be back to original size", log.size, size)
log.truncateFullyAndStartAt(log.logEndOffset - (msgPerSeg - 1))
@@ -497,23 +555,23 @@ class LogTest extends JUnitSuite {
def testBogusIndexSegmentsAreRemoved() {
val bogusIndex1 = Log.indexFilename(logDir, 0)
val bogusIndex2 = Log.indexFilename(logDir, 5)
-
+
val set = TestUtils.singleMessageSet("test".getBytes())
- val log = new Log(logDir,
- logConfig.copy(segmentSize = set.sizeInBytes * 5,
- maxIndexSize = 1000,
+ val log = new Log(logDir,
+ logConfig.copy(segmentSize = set.sizeInBytes * 5,
+ maxIndexSize = 1000,
indexInterval = 1),
recoveryPoint = 0L,
time.scheduler,
time)
-
+
assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
assertFalse("The second index file should have been deleted.", bogusIndex2.exists)
-
+
// check that we can append to the log
for(i <- 0 until 10)
log.append(set)
-
+
log.delete()
}
@@ -523,22 +581,22 @@ class LogTest extends JUnitSuite {
@Test
def testReopenThenTruncate() {
val set = TestUtils.singleMessageSet("test".getBytes())
- val config = logConfig.copy(segmentSize = set.sizeInBytes * 5,
- maxIndexSize = 1000,
+ val config = logConfig.copy(segmentSize = set.sizeInBytes * 5,
+ maxIndexSize = 1000,
indexInterval = 10000)
// create a log
- var log = new Log(logDir,
+ var log = new Log(logDir,
config,
recoveryPoint = 0L,
time.scheduler,
time)
-
+
// add enough messages to roll over several segments then close and re-open and attempt to truncate
for(i <- 0 until 100)
log.append(set)
log.close()
- log = new Log(logDir,
+ log = new Log(logDir,
config,
recoveryPoint = 0L,
time.scheduler,
@@ -547,7 +605,7 @@ class LogTest extends JUnitSuite {
assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
}
-
+
/**
* Test that deleted files are deleted after the appropriate time.
*/
@@ -555,38 +613,38 @@ class LogTest extends JUnitSuite {
def testAsyncDelete() {
val set = TestUtils.singleMessageSet("test".getBytes())
val asyncDeleteMs = 1000
- val config = logConfig.copy(segmentSize = set.sizeInBytes * 5,
- fileDeleteDelayMs = asyncDeleteMs,
- maxIndexSize = 1000,
+ val config = logConfig.copy(segmentSize = set.sizeInBytes * 5,
+ fileDeleteDelayMs = asyncDeleteMs,
+ maxIndexSize = 1000,
indexInterval = 10000)
val log = new Log(logDir,
config,
- recoveryPoint = 0L,
+ recoveryPoint = 0L,
time.scheduler,
time)
-
+
// append some messages to create some segments
for(i <- 0 until 100)
log.append(set)
-
+
// files should be renamed
val segments = log.logSegments.toArray
val oldFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
log.deleteOldSegments((s) => true)
-
+
assertEquals("Only one segment should remain.", 1, log.numberOfSegments)
- assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) &&
+ assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) &&
segments.forall(_.index.file.getName.endsWith(Log.DeletedFileSuffix)))
assertTrue("The .deleted files should still be there.", segments.forall(_.log.file.exists) &&
segments.forall(_.index.file.exists))
assertTrue("The original file should be gone.", oldFiles.forall(!_.exists))
-
+
// when enough time passes the files should be deleted
val deletedFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
time.sleep(asyncDeleteMs + 1)
assertTrue("Files should all be gone.", deletedFiles.forall(!_.exists))
}
-
+
/**
* Any files ending in .deleted should be removed when the log is re-opened.
*/
@@ -599,22 +657,22 @@ class LogTest extends JUnitSuite {
recoveryPoint = 0L,
time.scheduler,
time)
-
+
// append some messages to create some segments
for(i <- 0 until 100)
log.append(set)
-
+
log.deleteOldSegments((s) => true)
log.close()
-
- log = new Log(logDir,
+
+ log = new Log(logDir,
config,
recoveryPoint = 0L,
time.scheduler,
time)
assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
}
-
+
@Test
def testAppendMessageWithNullPayload() {
val log = new Log(logDir,
@@ -627,9 +685,9 @@ class LogTest extends JUnitSuite {
assertEquals(0, messageSet.head.offset)
assertTrue("Message payload should be null.", messageSet.head.message.isNull)
}
-
+
@Test
- def testCorruptLog() {
+ def testCorruptLog() {
// append some messages to create some segments
val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000)
val set = TestUtils.singleMessageSet("test".getBytes())
@@ -647,11 +705,11 @@ class LogTest extends JUnitSuite {
log.append(set)
val messages = log.logSegments.flatMap(_.log.iterator.toList)
log.close()
-
+
// corrupt index and log by appending random bytes
TestUtils.appendNonsenseToFile(log.activeSegment.index.file, TestUtils.random.nextInt(1024) + 1)
TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1)
-
+
// attempt recovery
log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
assertEquals(numMessages, log.logEndOffset)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd6ed9e/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 73a2637..07bc317 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -147,11 +147,11 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
// check uncompressed offsets
checkOffsets(messages, 0)
var offset = 1234567
- checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec, NoCompressionCodec), offset)
+ checkOffsets(messages.validateMessagesAndAssignOffsets(new AtomicLong(offset), NoCompressionCodec, NoCompressionCodec), offset)
// check compressed messages
checkOffsets(compressedMessages, 0)
- checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec, DefaultCompressionCodec), offset)
+ checkOffsets(compressedMessages.validateMessagesAndAssignOffsets(new AtomicLong(offset), DefaultCompressionCodec, DefaultCompressionCodec), offset)
}
/* check that offsets are assigned based on byte offset from the given base offset */