You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/20 01:52:58 UTC

svn commit: r1186570 - in /incubator/kafka/trunk/core/src: main/scala/kafka/consumer/ main/scala/kafka/message/ main/scala/kafka/tools/ test/scala/unit/kafka/message/

Author: junrao
Date: Wed Oct 19 23:52:58 2011
New Revision: 1186570

URL: http://svn.apache.org/viewvc?rev=1186570&view=rev
Log:
ZK consumer gets into infinite loop if a message is larger than fetch size; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-160

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1186570&r1=1186569&r2=1186570&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Wed Oct 19 23:52:58 2011
@@ -76,7 +76,8 @@ private[consumer] class PartitionTopicIn
    *  add an empty message with the exception to the queue so that client can see the error
    */
   def enqueueError(e: Throwable, fetchOffset: Long) = {
-    val messages = new ByteBufferMessageSet(ErrorMapping.EmptyByteBuffer, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    val messages = new ByteBufferMessageSet(buffer = ErrorMapping.EmptyByteBuffer, initialOffset = 0,
+      errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
     chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
   }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1186570&r1=1186569&r2=1186570&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Wed Oct 19 23:52:58 2011
@@ -99,7 +99,7 @@ class ByteBufferMessageSet(private val b
           logger.trace("size of data = " + size)
         }
         if(size < 0 || topIter.remaining < size) {
-          if (currValidBytes == 0 || size < 0)
+          if (currValidBytes == initialOffset || size < 0)
             throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " +
               topIter.remaining + " at " + currValidBytes + "( possible causes (1) a single message larger than " +
               "the fetch size; (2) log corruption )")

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala?rev=1186570&r1=1186569&r2=1186570&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala Wed Oct 19 23:52:58 2011
@@ -98,7 +98,7 @@ class ZKConsumerThread(stream: KafkaMess
       }
     }catch {
       case e:ConsumerTimeoutException => // this is ok
-      case oe: Exception => logger.error(oe)
+      case oe: Exception => logger.error("error in ZKConsumerThread", oe)
     }
     shutdownLatch.countDown
     println("Received " + count + " messages")

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1186570&r1=1186569&r2=1186570&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Wed Oct 19 23:52:58 2011
@@ -21,6 +21,7 @@ import java.nio._
 import junit.framework.Assert._
 import org.junit.Test
 import kafka.utils.TestUtils
+import kafka.common.InvalidMessageSizeException
 
 class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
 
@@ -28,6 +29,24 @@ class ByteBufferMessageSetTest extends B
     new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
   
   @Test
+  def testSmallFetchSize() {
+    // create a ByteBufferMessageSet that doesn't contain a full message
+    // iterating it should get an InvalidMessageSizeException
+    val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("01234567890123456789".getBytes()))
+    val buffer = messages.serialized.slice
+    buffer.limit(10)
+    val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = buffer, initialOffset = 1000)
+    try {
+      for (message <- messageSetWithNoFullMessage)
+        fail("shouldn't see any message")
+    }
+    catch {
+      case e: InvalidMessageSizeException => //this is expected
+      case e2 => fail("shouldn't see any other exceptions")
+    }
+  }
+
+  @Test
   def testValidBytes() {
     {
       val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))