You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/23 23:57:40 UTC

svn commit: r1293010 - in /incubator/kafka/trunk/core/src: main/scala/kafka/message/ByteBufferMessageSet.scala main/scala/kafka/producer/SyncProducer.scala test/scala/unit/kafka/producer/SyncProducerTest.scala

Author: junrao
Date: Thu Feb 23 22:57:40 2012
New Revision: 1293010

URL: http://svn.apache.org/viewvc?rev=1293010&view=rev
Log:
Add a shallow iterator to the ByteBufferMessageSet; patched by Yang Ye; reviewed by Jun Rao; KAFKA-277

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1293010&r1=1293009&r2=1293010&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Thu Feb 23 22:57:40 2012
@@ -18,10 +18,10 @@
 package kafka.message
 
 import kafka.utils.Logging
-import kafka.common.{InvalidMessageSizeException, ErrorMapping}
 import java.nio.ByteBuffer
 import java.nio.channels._
 import kafka.utils.IteratorTemplate
+import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException, ErrorMapping}
 
 /**
  * A sequence of messages stored in a byte buffer
@@ -61,7 +61,7 @@ class ByteBufferMessageSet(private val b
 
   private def shallowValidBytes: Long = {
     if(shallowValidByteCount < 0) {
-      val iter = deepIterator
+      val iter = this.internalIterator()
       while(iter.hasNext) {
         val messageAndOffset = iter.next
         shallowValidByteCount = messageAndOffset.offset
@@ -75,9 +75,21 @@ class ByteBufferMessageSet(private val b
   def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long =
     channel.write(buffer.duplicate)
   
-  override def iterator: Iterator[MessageAndOffset] = deepIterator
+  override def iterator: Iterator[MessageAndOffset] = internalIterator()
 
-  private def deepIterator(): Iterator[MessageAndOffset] = {
+
+  def verifyMessageSize(maxMessageSize: Int){
+    var shallowIter = internalIterator(true)
+    while(shallowIter.hasNext){
+      var messageAndOffset = shallowIter.next
+      if (messageAndOffset.message.payloadSize > maxMessageSize)
+        throw new MessageSizeTooLargeException
+    }
+  }
+
+
+  /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/
+  private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
     ErrorMapping.maybeThrowException(errorCode)
     new IteratorTemplate[MessageAndOffset] {
       var topIter = buffer.slice()
@@ -108,38 +120,51 @@ class ByteBufferMessageSet(private val b
         message.limit(size)
         topIter.position(topIter.position + size)
         val newMessage = new Message(message)
-        newMessage.compressionCodec match {
-          case NoCompressionCodec =>
-            if(!newMessage.isValid)
-              throw new InvalidMessageException("Uncompressed essage is invalid")
-            debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
-            innerIter = null
-            currValidBytes += 4 + size
-            trace("currValidBytes = " + currValidBytes)
-            new MessageAndOffset(newMessage, currValidBytes)
-          case _ =>
-            if(!newMessage.isValid)
-              throw new InvalidMessageException("Compressed message is invalid")
-            debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
-            innerIter = CompressionUtils.decompress(newMessage).deepIterator
-            if (!innerIter.hasNext) {
-              currValidBytes += 4 + lastMessageSize
+
+        if(isShallow){
+          currValidBytes += 4 + size
+          trace("shallow iterator currValidBytes = " + currValidBytes)
+          new MessageAndOffset(newMessage, currValidBytes)
+        }
+        else{
+          newMessage.compressionCodec match {
+            case NoCompressionCodec =>
+              if(!newMessage.isValid)
+                throw new InvalidMessageException("Uncompressed essage is invalid")
+              debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
               innerIter = null
-            }
-            makeNext()
+              currValidBytes += 4 + size
+              trace("currValidBytes = " + currValidBytes)
+              new MessageAndOffset(newMessage, currValidBytes)
+            case _ =>
+              if(!newMessage.isValid)
+                throw new InvalidMessageException("Compressed message is invalid")
+              debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
+              innerIter = CompressionUtils.decompress(newMessage).internalIterator()
+              if (!innerIter.hasNext) {
+                currValidBytes += 4 + lastMessageSize
+                innerIter = null
+              }
+              makeNext()
+          }
         }
       }
 
       override def makeNext(): MessageAndOffset = {
-        val isInnerDone = innerDone()
-        debug("makeNext() in deepIterator: innerDone = " + isInnerDone)
-        isInnerDone match {
-          case true => makeNextOuter
-          case false => {
-            val messageAndOffset = innerIter.next
-            if (!innerIter.hasNext)
-              currValidBytes += 4 + lastMessageSize
-            new MessageAndOffset(messageAndOffset.message, currValidBytes)
+        if(isShallow){
+          makeNextOuter
+        }
+        else{
+          val isInnerDone = innerDone()
+          debug("makeNext() in internalIterator: innerDone = " + isInnerDone)
+          isInnerDone match {
+            case true => makeNextOuter
+            case false => {
+              val messageAndOffset = innerIter.next
+              if (!innerIter.hasNext)
+                currValidBytes += 4 + lastMessageSize
+              new MessageAndOffset(messageAndOffset.message, currValidBytes)
+            }
           }
         }
       }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1293010&r1=1293009&r2=1293010&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala Thu Feb 23 22:57:40 2012
@@ -24,7 +24,6 @@ import kafka.network._
 import kafka.utils._
 import kafka.api._
 import scala.math._
-import kafka.common.MessageSizeTooLargeException
 import java.nio.ByteBuffer
 import java.util.Random
 
@@ -120,7 +119,7 @@ class SyncProducer(val config: SyncProdu
    * Send a message
    */
   def send(topic: String, partition: Int, messages: ByteBufferMessageSet) {
-    verifyMessageSize(messages)
+    messages.verifyMessageSize(config.maxMessageSize)
     val setSize = messages.sizeInBytes.asInstanceOf[Int]
     trace("Got message set with " + setSize + " bytes to send")
     send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages)))
@@ -130,7 +129,7 @@ class SyncProducer(val config: SyncProdu
 
   def multiSend(produces: Array[ProducerRequest]) {
     for (request <- produces)
-      verifyMessageSize(request.messages)
+      request.messages.verifyMessageSize(config.maxMessageSize)
     val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes)
     trace("Got multi message sets with " + setSize + " bytes to send")
     send(new BoundedByteBufferSend(new MultiProducerRequest(produces)))
@@ -143,11 +142,6 @@ class SyncProducer(val config: SyncProdu
     }
   }
 
-  private def verifyMessageSize(messages: ByteBufferMessageSet) {
-    for (messageAndOffset <- messages)
-      if (messageAndOffset.message.payloadSize > config.maxMessageSize)
-        throw new MessageSizeTooLargeException
-  }
 
   /**
    * Disconnect from current channel, closing connection.

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1293010&r1=1293009&r2=1293010&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Thu Feb 23 22:57:40 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -27,7 +27,7 @@ import org.junit.{After, Before, Test}
 import kafka.common.MessageSizeTooLargeException
 import java.util.Properties
 import kafka.api.ProducerRequest
-import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, Message, ByteBufferMessageSet}
 
 class SyncProducerTest extends JUnitSuite {
   private var messageBytes =  new Array[Byte](2);
@@ -86,7 +86,7 @@ class SyncProducerTest extends JUnitSuit
   }
 
   @Test
-  def testMessageSizeTooLarge() {
+  def testSingleMessageSizeTooLarge() {
     val props = new Properties()
     props.put("host", "localhost")
     props.put("port", server.socketServer.port.toString)
@@ -104,4 +104,31 @@ class SyncProducerTest extends JUnitSuit
     }
     Assert.assertTrue(failed)
   }
-}
+
+  @Test
+  def testCompressedMessageSizeTooLarge() {
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", server.socketServer.port.toString)
+    props.put("buffer.size", "102400")
+    props.put("connect.timeout.ms", "300")
+    props.put("reconnect.interval", "500")
+    props.put("max.message.size", "100")
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    val messages = new Array[Message](10)
+    import Array.fill
+    var a = 0
+    for( a <- 0 to  9){
+      val bytes = fill(20){a.asInstanceOf[Byte]}
+      messages(a) = new Message(bytes)
+    }
+    var failed = false
+    /** After compression, the compressed message has size 118 **/
+    try {
+      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = messages: _*))
+    }catch {
+      case e: MessageSizeTooLargeException => failed = true
+    }
+    Assert.assertTrue(failed)
+  }
+}
\ No newline at end of file