You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/08/24 03:27:43 UTC
svn commit: r1160952 - in /incubator/kafka/trunk/core/src:
main/scala/kafka/consumer/PartitionTopicInfo.scala
main/scala/kafka/message/ByteBufferMessageSet.scala
test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
Author: nehanarkhede
Date: Wed Aug 24 01:27:42 2011
New Revision: 1160952
URL: http://svn.apache.org/viewvc?rev=1160952&view=rev
Log:
The FetcherRunnable busy waits on empty fetch requests; KAFKA-117; patched by nehanarkhede; reviewed by junrao
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1160952&r1=1160951&r2=1160952&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Wed Aug 24 01:27:42 2011
@@ -59,7 +59,7 @@ private[consumer] class PartitionTopicIn
* @return the number of valid bytes
*/
def enqueue(messages: ByteBufferMessageSet, fetchOffset: Long): Long = {
- val size = messages.shallowValidBytes
+ val size = messages.validBytes
if(size > 0) {
// update fetched offset to the compressed data chunk size, not the decompressed message set size
if(logger.isTraceEnabled)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1160952&r1=1160951&r2=1160952&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Wed Aug 24 01:27:42 2011
@@ -40,7 +40,6 @@ class ByteBufferMessageSet(private val b
private val logger = Logger.getLogger(getClass())
private var validByteCount = -1L
private var shallowValidByteCount = -1L
- private var deepValidByteCount = -1L
def this(compressionCodec: CompressionCodec, messages: Message*) {
this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
@@ -58,9 +57,9 @@ class ByteBufferMessageSet(private val b
def serialized(): ByteBuffer = buffer
- def validBytes: Long = deepValidBytes
-
- def shallowValidBytes: Long = {
+ def validBytes: Long = shallowValidBytes
+
+ private def shallowValidBytes: Long = {
if(shallowValidByteCount < 0) {
val iter = deepIterator
while(iter.hasNext) {
@@ -68,18 +67,10 @@ class ByteBufferMessageSet(private val b
shallowValidByteCount = messageAndOffset.offset
}
}
- shallowValidByteCount - initialOffset
+ if(shallowValidByteCount < initialOffset) 0
+ else (shallowValidByteCount - initialOffset)
}
- def deepValidBytes: Long = {
- if (deepValidByteCount < 0) {
- val iter = deepIterator
- while (iter.hasNext)
- iter.next
- }
- deepValidByteCount
- }
-
/** Write the messages in this set to the given channel */
def writeTo(channel: WritableByteChannel, offset: Long, size: Long): Long =
channel.write(buffer.duplicate)
@@ -98,7 +89,6 @@ class ByteBufferMessageSet(private val b
def makeNextOuter: MessageAndOffset = {
if (topIter.remaining < 4) {
- deepValidByteCount = currValidBytes
return allDone()
}
val size = topIter.getInt()
@@ -109,7 +99,6 @@ class ByteBufferMessageSet(private val b
logger.trace("size of data = " + size)
}
if(size < 0 || topIter.remaining < size) {
- deepValidByteCount = currValidBytes
if (currValidBytes == 0 || size < 0)
throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " +
topIter.remaining + " at " + currValidBytes + "( possible causes (1) a single message larger than " +
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1160952&r1=1160951&r2=1160952&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Wed Aug 24 01:27:42 2011
@@ -29,12 +29,20 @@ class ByteBufferMessageSetTest extends B
@Test
def testValidBytes() {
- val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
- val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
- buffer.put(messages.serialized)
- buffer.putShort(4)
- val messagesPlus = new ByteBufferMessageSet(buffer)
- assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
+ {
+ val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+ val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
+ buffer.put(messages.serialized)
+ buffer.putShort(4)
+ val messagesPlus = new ByteBufferMessageSet(buffer)
+ assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
+ }
+
+ // test valid bytes on empty ByteBufferMessageSet
+ {
+ assertEquals("Valid bytes on an empty ByteBufferMessageSet should return 0", 0,
+ MessageSet.Empty.asInstanceOf[ByteBufferMessageSet].validBytes)
+ }
}
@Test