You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/02/17 03:33:07 UTC
svn commit: r1245299 - in /incubator/kafka/trunk/core/src/main/scala/kafka:
message/ByteBufferMessageSet.scala message/InvalidMessageException.scala
producer/SyncProducer.scala
Author: nehanarkhede
Date: Fri Feb 17 02:33:07 2012
New Revision: 1245299
URL: http://svn.apache.org/viewvc?rev=1245299&view=rev
Log:
KAFKA-274 Handle corrupted messages cleanly; patched by nehanarkhede; reviewed by junrao
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1245299&r1=1245298&r2=1245299&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Fri Feb 17 02:33:07 2012
@@ -38,6 +38,8 @@ class ByteBufferMessageSet(private val b
private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with Logging {
private var validByteCount = -1L
private var shallowValidByteCount = -1L
+ if(sizeInBytes > Int.MaxValue)
+ throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
def this(compressionCodec: CompressionCodec, messages: Message*) {
this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
@@ -108,12 +110,16 @@ class ByteBufferMessageSet(private val b
val newMessage = new Message(message)
newMessage.compressionCodec match {
case NoCompressionCodec =>
+ if(!newMessage.isValid)
+ throw new InvalidMessageException("Uncompressed essage is invalid")
debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
innerIter = null
currValidBytes += 4 + size
trace("currValidBytes = " + currValidBytes)
new MessageAndOffset(newMessage, currValidBytes)
case _ =>
+ if(!newMessage.isValid)
+ throw new InvalidMessageException("Compressed message is invalid")
debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
innerIter = CompressionUtils.decompress(newMessage).deepIterator
if (!innerIter.hasNext) {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala?rev=1245299&r1=1245298&r2=1245299&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala Fri Feb 17 02:33:07 2012
@@ -20,4 +20,6 @@ package kafka.message
/**
* Indicates that a message failed its checksum and is corrupt
*/
-class InvalidMessageException extends RuntimeException
+class InvalidMessageException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1245299&r1=1245298&r2=1245299&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala Fri Feb 17 02:33:07 2012
@@ -46,10 +46,15 @@ class SyncProducer(val config: SyncProdu
@volatile
private var shutdown: Boolean = false
- debug("Instantiating Scala Sync Producer")
+ trace("Instantiating Scala Sync Producer")
private def verifySendBuffer(buffer : ByteBuffer) = {
- if (logger.isTraceEnabled) {
+ /**
+ * This seems a little convoluted, but the idea is to turn on verification simply changing log4j settings
+ * Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary
+ * data. So, leaving the rest of the logging at TRACE, while errors should be logged at ERROR level
+ */
+ if (logger.isDebugEnabled) {
trace("verifying sendbuffer of size " + buffer.limit)
val requestTypeId = buffer.getShort()
if (requestTypeId == RequestKeys.MultiProduce) {
@@ -59,17 +64,17 @@ class SyncProducer(val config: SyncProdu
try {
for (messageAndOffset <- produce.messages)
if (!messageAndOffset.message.isValid)
- trace("topic " + produce.topic + " is invalid")
+ throw new InvalidMessageException("Message for topic " + produce.topic + " is invalid")
}
catch {
case e: Throwable =>
- trace("error iterating messages ", e)
+ error("error iterating messages ", e)
}
}
}
catch {
case e: Throwable =>
- trace("error verifying sendbuffer ", e)
+ error("error verifying sendbuffer ", e)
}
}
}