You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/02/23 00:03:20 UTC

git commit: KAFKA-767 Message Size check should be done after assigning the offsets; reviewed by Neha Narkhede and Jun Rao

Updated Branches:
  refs/heads/0.8 b056a9a1e -> 0be45b334


KAFKA-767 Message Size check should be done after assigning the offsets; reviewed by Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0be45b33
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0be45b33
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0be45b33

Branch: refs/heads/0.8
Commit: 0be45b334b821b079d6d0336515248bf11b4cf8d
Parents: b056a9a
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Fri Feb 22 15:03:02 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 22 15:03:13 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala |   19 ++++++++++++-------
 1 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0be45b33/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d0b26ab..8d9a883 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -292,7 +292,15 @@ private[kafka] class Log(val dir: File,
                       .format(messageSetInfo.firstOffset, nextOffset.get))
               (messageSetInfo.firstOffset, messageSetInfo.lastOffset)
             }
-          
+
+          // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison
+          // happens with the new message size (after re-compression, if any)
+          for(messageAndOffset <- validMessages.shallowIterator) {
+            if(MessageSet.entrySize(messageAndOffset.message) > maxMessageSize)
+              throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
+                .format(MessageSet.entrySize(messageAndOffset.message), maxMessageSize))
+          }
+
           // now append to the log
           trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s"
                 .format(this.name, offsets._1, nextOffset.get(), validMessages))
@@ -321,8 +329,7 @@ private[kafka] class Log(val dir: File,
   
   /**
    * Validate the following:
-   * 1. each message is not too large
-   * 2. each message matches its CRC
+   * 1. each message matches its CRC
    * 
    * Also compute the following quantities:
    * 1. First offset in the message set
@@ -346,12 +353,10 @@ private[kafka] class Log(val dir: File,
       // update the last offset seen
       lastOffset = messageAndOffset.offset
 
-      // check the validity of the message by checking CRC and message size
+      // check the validity of the message by checking CRC
       val m = messageAndOffset.message
       m.ensureValid()
-      if(MessageSet.entrySize(m) > maxMessageSize)
-        throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(m), maxMessageSize))
-      
+
       messageCount += 1;
       
       val messageCodec = m.compressionCodec