You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/03/22 17:03:49 UTC

svn commit: r1303861 - in /incubator/kafka/trunk/core/src/main/scala/kafka: log/Log.scala message/ByteBufferMessageSet.scala

Author: nehanarkhede
Date: Thu Mar 22 16:03:49 2012
New Revision: 1303861

URL: http://svn.apache.org/viewvc?rev=1303861&view=rev
Log:
KAFKA-310 Incomplete message set validation checks in Log's append API can corrupt on disk log segment; patched by nehanarkhede; reviewed by junrao

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1303861&r1=1303860&r2=1303861&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Thu Mar 22 16:03:49 2012
@@ -199,7 +199,7 @@ private[log] class Log(val dir: File, va
    * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
    * Returns the offset at which the messages are written.
    */
-  def append(messages: MessageSet): Unit = {
+  def append(messages: ByteBufferMessageSet): Unit = {
     // validate the messages
     var numberOfMessages = 0
     for(messageAndOffset <- messages) {
@@ -211,12 +211,22 @@ private[log] class Log(val dir: File, va
     BrokerTopicStat.getBrokerTopicStat(getTopicName).recordMessagesIn(numberOfMessages)
     BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
     logStats.recordAppendedMessages(numberOfMessages)
-    
+
+    // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
+    val validByteBuffer = messages.getBuffer.duplicate()
+    val messageSetValidBytes = messages.validBytes
+    if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
+      throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes +
+        " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+
+    validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
+    val validMessages = new ByteBufferMessageSet(validByteBuffer)
+
     // they are valid, insert them in the log
     lock synchronized {
       try {
         val segment = segments.view.last
-        segment.messageSet.append(messages)
+        segment.messageSet.append(validMessages)
         maybeFlush(numberOfMessages)
         maybeRoll(segment)
       }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1303861&r1=1303860&r2=1303861&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Thu Mar 22 16:03:49 2012
@@ -61,7 +61,7 @@ class ByteBufferMessageSet(private val b
 
   private def shallowValidBytes: Long = {
     if(shallowValidByteCount < 0) {
-      val iter = this.internalIterator()
+      val iter = this.internalIterator(true)
       while(iter.hasNext) {
         val messageAndOffset = iter.next
         shallowValidByteCount = messageAndOffset.offset
@@ -88,7 +88,6 @@ class ByteBufferMessageSet(private val b
     }
   }
 
-
   /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/
   private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
     ErrorMapping.maybeThrowException(errorCode)