You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/23 23:57:40 UTC
svn commit: r1293010 - in /incubator/kafka/trunk/core/src:
main/scala/kafka/message/ByteBufferMessageSet.scala
main/scala/kafka/producer/SyncProducer.scala
test/scala/unit/kafka/producer/SyncProducerTest.scala
Author: junrao
Date: Thu Feb 23 22:57:40 2012
New Revision: 1293010
URL: http://svn.apache.org/viewvc?rev=1293010&view=rev
Log:
Add a shallow iterator to the ByteBufferMessageSet; patched by Yang Ye; reviewed by Jun Rao; KAFKA-277
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1293010&r1=1293009&r2=1293010&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Thu Feb 23 22:57:40 2012
@@ -18,10 +18,10 @@
package kafka.message
import kafka.utils.Logging
-import kafka.common.{InvalidMessageSizeException, ErrorMapping}
import java.nio.ByteBuffer
import java.nio.channels._
import kafka.utils.IteratorTemplate
+import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException, ErrorMapping}
/**
* A sequence of messages stored in a byte buffer
@@ -61,7 +61,7 @@ class ByteBufferMessageSet(private val b
private def shallowValidBytes: Long = {
if(shallowValidByteCount < 0) {
- val iter = deepIterator
+ val iter = this.internalIterator()
while(iter.hasNext) {
val messageAndOffset = iter.next
shallowValidByteCount = messageAndOffset.offset
@@ -75,9 +75,21 @@ class ByteBufferMessageSet(private val b
def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long =
channel.write(buffer.duplicate)
- override def iterator: Iterator[MessageAndOffset] = deepIterator
+ override def iterator: Iterator[MessageAndOffset] = internalIterator()
- private def deepIterator(): Iterator[MessageAndOffset] = {
+
+ def verifyMessageSize(maxMessageSize: Int){
+ var shallowIter = internalIterator(true)
+ while(shallowIter.hasNext){
+ var messageAndOffset = shallowIter.next
+ if (messageAndOffset.message.payloadSize > maxMessageSize)
+ throw new MessageSizeTooLargeException
+ }
+ }
+
+
+ /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/
+ private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
ErrorMapping.maybeThrowException(errorCode)
new IteratorTemplate[MessageAndOffset] {
var topIter = buffer.slice()
@@ -108,38 +120,51 @@ class ByteBufferMessageSet(private val b
message.limit(size)
topIter.position(topIter.position + size)
val newMessage = new Message(message)
- newMessage.compressionCodec match {
- case NoCompressionCodec =>
- if(!newMessage.isValid)
- throw new InvalidMessageException("Uncompressed essage is invalid")
- debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
- innerIter = null
- currValidBytes += 4 + size
- trace("currValidBytes = " + currValidBytes)
- new MessageAndOffset(newMessage, currValidBytes)
- case _ =>
- if(!newMessage.isValid)
- throw new InvalidMessageException("Compressed message is invalid")
- debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
- innerIter = CompressionUtils.decompress(newMessage).deepIterator
- if (!innerIter.hasNext) {
- currValidBytes += 4 + lastMessageSize
+
+ if(isShallow){
+ currValidBytes += 4 + size
+ trace("shallow iterator currValidBytes = " + currValidBytes)
+ new MessageAndOffset(newMessage, currValidBytes)
+ }
+ else{
+ newMessage.compressionCodec match {
+ case NoCompressionCodec =>
+ if(!newMessage.isValid)
+ throw new InvalidMessageException("Uncompressed essage is invalid")
+ debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
innerIter = null
- }
- makeNext()
+ currValidBytes += 4 + size
+ trace("currValidBytes = " + currValidBytes)
+ new MessageAndOffset(newMessage, currValidBytes)
+ case _ =>
+ if(!newMessage.isValid)
+ throw new InvalidMessageException("Compressed message is invalid")
+ debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
+ innerIter = CompressionUtils.decompress(newMessage).internalIterator()
+ if (!innerIter.hasNext) {
+ currValidBytes += 4 + lastMessageSize
+ innerIter = null
+ }
+ makeNext()
+ }
}
}
override def makeNext(): MessageAndOffset = {
- val isInnerDone = innerDone()
- debug("makeNext() in deepIterator: innerDone = " + isInnerDone)
- isInnerDone match {
- case true => makeNextOuter
- case false => {
- val messageAndOffset = innerIter.next
- if (!innerIter.hasNext)
- currValidBytes += 4 + lastMessageSize
- new MessageAndOffset(messageAndOffset.message, currValidBytes)
+ if(isShallow){
+ makeNextOuter
+ }
+ else{
+ val isInnerDone = innerDone()
+ debug("makeNext() in internalIterator: innerDone = " + isInnerDone)
+ isInnerDone match {
+ case true => makeNextOuter
+ case false => {
+ val messageAndOffset = innerIter.next
+ if (!innerIter.hasNext)
+ currValidBytes += 4 + lastMessageSize
+ new MessageAndOffset(messageAndOffset.message, currValidBytes)
+ }
}
}
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1293010&r1=1293009&r2=1293010&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala Thu Feb 23 22:57:40 2012
@@ -24,7 +24,6 @@ import kafka.network._
import kafka.utils._
import kafka.api._
import scala.math._
-import kafka.common.MessageSizeTooLargeException
import java.nio.ByteBuffer
import java.util.Random
@@ -120,7 +119,7 @@ class SyncProducer(val config: SyncProdu
* Send a message
*/
def send(topic: String, partition: Int, messages: ByteBufferMessageSet) {
- verifyMessageSize(messages)
+ messages.verifyMessageSize(config.maxMessageSize)
val setSize = messages.sizeInBytes.asInstanceOf[Int]
trace("Got message set with " + setSize + " bytes to send")
send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages)))
@@ -130,7 +129,7 @@ class SyncProducer(val config: SyncProdu
def multiSend(produces: Array[ProducerRequest]) {
for (request <- produces)
- verifyMessageSize(request.messages)
+ request.messages.verifyMessageSize(config.maxMessageSize)
val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes)
trace("Got multi message sets with " + setSize + " bytes to send")
send(new BoundedByteBufferSend(new MultiProducerRequest(produces)))
@@ -143,11 +142,6 @@ class SyncProducer(val config: SyncProdu
}
}
- private def verifyMessageSize(messages: ByteBufferMessageSet) {
- for (messageAndOffset <- messages)
- if (messageAndOffset.message.payloadSize > config.maxMessageSize)
- throw new MessageSizeTooLargeException
- }
/**
* Disconnect from current channel, closing connection.
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1293010&r1=1293009&r2=1293010&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Thu Feb 23 22:57:40 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -27,7 +27,7 @@ import org.junit.{After, Before, Test}
import kafka.common.MessageSizeTooLargeException
import java.util.Properties
import kafka.api.ProducerRequest
-import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, Message, ByteBufferMessageSet}
class SyncProducerTest extends JUnitSuite {
private var messageBytes = new Array[Byte](2);
@@ -86,7 +86,7 @@ class SyncProducerTest extends JUnitSuit
}
@Test
- def testMessageSizeTooLarge() {
+ def testSingleMessageSizeTooLarge() {
val props = new Properties()
props.put("host", "localhost")
props.put("port", server.socketServer.port.toString)
@@ -104,4 +104,31 @@ class SyncProducerTest extends JUnitSuit
}
Assert.assertTrue(failed)
}
-}
+
+ @Test
+ def testCompressedMessageSizeTooLarge() {
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", server.socketServer.port.toString)
+ props.put("buffer.size", "102400")
+ props.put("connect.timeout.ms", "300")
+ props.put("reconnect.interval", "500")
+ props.put("max.message.size", "100")
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ val messages = new Array[Message](10)
+ import Array.fill
+ var a = 0
+ for( a <- 0 to 9){
+ val bytes = fill(20){a.asInstanceOf[Byte]}
+ messages(a) = new Message(bytes)
+ }
+ var failed = false
+ /** After compression, the compressed message has size 118 **/
+ try {
+ producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = messages: _*))
+ }catch {
+ case e: MessageSizeTooLargeException => failed = true
+ }
+ Assert.assertTrue(failed)
+ }
+}
\ No newline at end of file