You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/08/24 03:27:43 UTC

svn commit: r1160952 - in /incubator/kafka/trunk/core/src: main/scala/kafka/consumer/PartitionTopicInfo.scala main/scala/kafka/message/ByteBufferMessageSet.scala test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala

Author: nehanarkhede
Date: Wed Aug 24 01:27:42 2011
New Revision: 1160952

URL: http://svn.apache.org/viewvc?rev=1160952&view=rev
Log:
The FetcherRunnable busy waits on empty fetch requests; KAFKA-117; patched by nehanarkhede; reviewed by junrao

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1160952&r1=1160951&r2=1160952&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Wed Aug 24 01:27:42 2011
@@ -59,7 +59,7 @@ private[consumer] class PartitionTopicIn
    * @return the number of valid bytes
    */
   def enqueue(messages: ByteBufferMessageSet, fetchOffset: Long): Long = {
-    val size = messages.shallowValidBytes
+    val size = messages.validBytes
     if(size > 0) {
       // update fetched offset to the compressed data chunk size, not the decompressed message set size
       if(logger.isTraceEnabled)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1160952&r1=1160951&r2=1160952&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Wed Aug 24 01:27:42 2011
@@ -40,7 +40,6 @@ class ByteBufferMessageSet(private val b
   private val logger = Logger.getLogger(getClass())  
   private var validByteCount = -1L
   private var shallowValidByteCount = -1L
-  private var deepValidByteCount = -1L
 
   def this(compressionCodec: CompressionCodec, messages: Message*) {
     this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
@@ -58,9 +57,9 @@ class ByteBufferMessageSet(private val b
 
   def serialized(): ByteBuffer = buffer
 
-  def validBytes: Long = deepValidBytes
-  
-  def shallowValidBytes: Long = {
+  def validBytes: Long = shallowValidBytes
+
+  private def shallowValidBytes: Long = {
     if(shallowValidByteCount < 0) {
       val iter = deepIterator
       while(iter.hasNext) {
@@ -68,18 +67,10 @@ class ByteBufferMessageSet(private val b
         shallowValidByteCount = messageAndOffset.offset
       }
     }
-    shallowValidByteCount - initialOffset
+    if(shallowValidByteCount < initialOffset) 0
+    else (shallowValidByteCount - initialOffset)
   }
   
-  def deepValidBytes: Long = {
-    if (deepValidByteCount < 0) {
-      val iter = deepIterator
-      while (iter.hasNext)
-        iter.next
-    }
-    deepValidByteCount
-  }
-
   /** Write the messages in this set to the given channel */
   def writeTo(channel: WritableByteChannel, offset: Long, size: Long): Long =
     channel.write(buffer.duplicate)
@@ -98,7 +89,6 @@ class ByteBufferMessageSet(private val b
 
       def makeNextOuter: MessageAndOffset = {
         if (topIter.remaining < 4) {
-          deepValidByteCount = currValidBytes
           return allDone()
         }
         val size = topIter.getInt()
@@ -109,7 +99,6 @@ class ByteBufferMessageSet(private val b
           logger.trace("size of data = " + size)
         }
         if(size < 0 || topIter.remaining < size) {
-          deepValidByteCount = currValidBytes
           if (currValidBytes == 0 || size < 0)
             throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " +
               topIter.remaining + " at " + currValidBytes + "( possible causes (1) a single message larger than " +

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1160952&r1=1160951&r2=1160952&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Wed Aug 24 01:27:42 2011
@@ -29,12 +29,20 @@ class ByteBufferMessageSetTest extends B
   
   @Test
   def testValidBytes() {
-    val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
-    val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
-    buffer.put(messages.serialized)
-    buffer.putShort(4)
-    val messagesPlus = new ByteBufferMessageSet(buffer)
-    assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
+    {
+      val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+      val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
+      buffer.put(messages.serialized)
+      buffer.putShort(4)
+      val messagesPlus = new ByteBufferMessageSet(buffer)
+      assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
+    }
+
+    // test valid bytes on empty ByteBufferMessageSet
+    {
+      assertEquals("Valid bytes on an empty ByteBufferMessageSet should return 0", 0,
+        MessageSet.Empty.asInstanceOf[ByteBufferMessageSet].validBytes)
+    }
   }
 
   @Test