You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/03/22 17:03:49 UTC
svn commit: r1303861 - in /incubator/kafka/trunk/core/src/main/scala/kafka:
log/Log.scala message/ByteBufferMessageSet.scala
Author: nehanarkhede
Date: Thu Mar 22 16:03:49 2012
New Revision: 1303861
URL: http://svn.apache.org/viewvc?rev=1303861&view=rev
Log:
KAFKA-310 Incomplete message set validation checks in Log's append API can corrupt on disk log segment; patched by nehanarkhede; reviewed by junrao
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1303861&r1=1303860&r2=1303861&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Thu Mar 22 16:03:49 2012
@@ -199,7 +199,7 @@ private[log] class Log(val dir: File, va
* Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
* Returns the offset at which the messages are written.
*/
- def append(messages: MessageSet): Unit = {
+ def append(messages: ByteBufferMessageSet): Unit = {
// validate the messages
var numberOfMessages = 0
for(messageAndOffset <- messages) {
@@ -211,12 +211,22 @@ private[log] class Log(val dir: File, va
BrokerTopicStat.getBrokerTopicStat(getTopicName).recordMessagesIn(numberOfMessages)
BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
logStats.recordAppendedMessages(numberOfMessages)
-
+
+ // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
+ val validByteBuffer = messages.getBuffer.duplicate()
+ val messageSetValidBytes = messages.validBytes
+ if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
+ throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes +
+ " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+
+ validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
+ val validMessages = new ByteBufferMessageSet(validByteBuffer)
+
// they are valid, insert them in the log
lock synchronized {
try {
val segment = segments.view.last
- segment.messageSet.append(messages)
+ segment.messageSet.append(validMessages)
maybeFlush(numberOfMessages)
maybeRoll(segment)
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1303861&r1=1303860&r2=1303861&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Thu Mar 22 16:03:49 2012
@@ -61,7 +61,7 @@ class ByteBufferMessageSet(private val b
private def shallowValidBytes: Long = {
if(shallowValidByteCount < 0) {
- val iter = this.internalIterator()
+ val iter = this.internalIterator(true)
while(iter.hasNext) {
val messageAndOffset = iter.next
shallowValidByteCount = messageAndOffset.offset
@@ -88,7 +88,6 @@ class ByteBufferMessageSet(private val b
}
}
-
/** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/
private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
ErrorMapping.maybeThrowException(errorCode)