You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/02/17 03:33:07 UTC

svn commit: r1245299 - in /incubator/kafka/trunk/core/src/main/scala/kafka: message/ByteBufferMessageSet.scala message/InvalidMessageException.scala producer/SyncProducer.scala

Author: nehanarkhede
Date: Fri Feb 17 02:33:07 2012
New Revision: 1245299

URL: http://svn.apache.org/viewvc?rev=1245299&view=rev
Log:
KAFKA-274 Handle corrupted messages cleanly; patched by nehanarkhede; reviewed by junrao

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1245299&r1=1245298&r2=1245299&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Fri Feb 17 02:33:07 2012
@@ -38,6 +38,8 @@ class ByteBufferMessageSet(private val b
                            private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with Logging {
   private var validByteCount = -1L
   private var shallowValidByteCount = -1L
+  if(sizeInBytes > Int.MaxValue)
+    throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
 
   def this(compressionCodec: CompressionCodec, messages: Message*) {
     this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
@@ -108,12 +110,16 @@ class ByteBufferMessageSet(private val b
         val newMessage = new Message(message)
         newMessage.compressionCodec match {
           case NoCompressionCodec =>
+            if(!newMessage.isValid)
+              throw new InvalidMessageException("Uncompressed essage is invalid")
             debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
             innerIter = null
             currValidBytes += 4 + size
             trace("currValidBytes = " + currValidBytes)
             new MessageAndOffset(newMessage, currValidBytes)
           case _ =>
+            if(!newMessage.isValid)
+              throw new InvalidMessageException("Compressed message is invalid")
             debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
             innerIter = CompressionUtils.decompress(newMessage).deepIterator
             if (!innerIter.hasNext) {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala?rev=1245299&r1=1245298&r2=1245299&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala Fri Feb 17 02:33:07 2012
@@ -20,4 +20,6 @@ package kafka.message
 /**
  * Indicates that a message failed its checksum and is corrupt
  */
-class InvalidMessageException extends RuntimeException
+class InvalidMessageException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1245299&r1=1245298&r2=1245299&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala Fri Feb 17 02:33:07 2012
@@ -46,10 +46,15 @@ class SyncProducer(val config: SyncProdu
   @volatile
   private var shutdown: Boolean = false
 
-  debug("Instantiating Scala Sync Producer")
+  trace("Instantiating Scala Sync Producer")
 
   private def verifySendBuffer(buffer : ByteBuffer) = {
-    if (logger.isTraceEnabled) {
+    /**
+     * This seems a little convoluted, but the idea is to turn on verification simply changing log4j settings
+     * Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary
+     * data. So, leaving the rest of the logging at TRACE, while errors should be logged at ERROR level
+     */
+    if (logger.isDebugEnabled) {
       trace("verifying sendbuffer of size " + buffer.limit)
       val requestTypeId = buffer.getShort()
       if (requestTypeId == RequestKeys.MultiProduce) {
@@ -59,17 +64,17 @@ class SyncProducer(val config: SyncProdu
             try {
               for (messageAndOffset <- produce.messages)
                 if (!messageAndOffset.message.isValid)
-                  trace("topic " + produce.topic + " is invalid")
+                  throw new InvalidMessageException("Message for topic " + produce.topic + " is invalid")
             }
             catch {
               case e: Throwable =>
-                trace("error iterating messages ", e)
+                error("error iterating messages ", e)
             }
           }
         }
         catch {
           case e: Throwable =>
-            trace("error verifying sendbuffer ", e)
+            error("error verifying sendbuffer ", e)
         }
       }
     }