You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/02/23 00:03:20 UTC
git commit: KAFKA-767 Message Size check should be done after
assigning the offsets; reviewed by Neha Narkhede and Jun Rao
Updated Branches:
refs/heads/0.8 b056a9a1e -> 0be45b334
KAFKA-767 Message Size check should be done after assigning the offsets; reviewed by Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0be45b33
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0be45b33
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0be45b33
Branch: refs/heads/0.8
Commit: 0be45b334b821b079d6d0336515248bf11b4cf8d
Parents: b056a9a
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Fri Feb 22 15:03:02 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 22 15:03:13 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/log/Log.scala | 19 ++++++++++++-------
1 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0be45b33/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d0b26ab..8d9a883 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -292,7 +292,15 @@ private[kafka] class Log(val dir: File,
.format(messageSetInfo.firstOffset, nextOffset.get))
(messageSetInfo.firstOffset, messageSetInfo.lastOffset)
}
-
+
+ // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison
+ // happens with the new message size (after re-compression, if any)
+ for(messageAndOffset <- validMessages.shallowIterator) {
+ if(MessageSet.entrySize(messageAndOffset.message) > maxMessageSize)
+ throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
+ .format(MessageSet.entrySize(messageAndOffset.message), maxMessageSize))
+ }
+
// now append to the log
trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s"
.format(this.name, offsets._1, nextOffset.get(), validMessages))
@@ -321,8 +329,7 @@ private[kafka] class Log(val dir: File,
/**
* Validate the following:
- * 1. each message is not too large
- * 2. each message matches its CRC
+ * 1. each message matches its CRC
*
* Also compute the following quantities:
* 1. First offset in the message set
@@ -346,12 +353,10 @@ private[kafka] class Log(val dir: File,
// update the last offset seen
lastOffset = messageAndOffset.offset
- // check the validity of the message by checking CRC and message size
+ // check the validity of the message by checking CRC
val m = messageAndOffset.message
m.ensureValid()
- if(MessageSet.entrySize(m) > maxMessageSize)
- throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(m), maxMessageSize))
-
+
messageCount += 1;
val messageCodec = m.compressionCodec