You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/12 03:30:10 UTC

svn commit: r1383730 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/common/ main/scala/kafka/log/ main/scala/kafka/producer/ main/scala/kafka/producer/async/ main/scala/kafka/server/ test/scala/other/kafka/ test/scala/unit/kafka/log/ tes...

Author: junrao
Date: Wed Sep 12 01:30:09 2012
New Revision: 1383730

URL: http://svn.apache.org/viewvc?rev=1383730&view=rev
Log:
Check max message size on server; patched by Swapnil Ghike; reviewed by Joel Koshy and Jun Rao; KAFKA-490

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Wed Sep 12 01:30:09 2012
@@ -40,6 +40,7 @@ object ErrorMapping {
   val RequestTimedOutCode: Short = 8
   val BrokerNotAvailableCode: Short = 9
   val ReplicaNotAvailableCode: Short = 10
+  val MessageSizeTooLargeCode: Short = 11
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -52,7 +53,8 @@ object ErrorMapping {
       classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
       classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
-      classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode
+      classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
+      classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Wed Sep 12 01:30:09 2012
@@ -128,9 +128,9 @@ class LogSegment(val file: File, val mes
  * An append-only log for storing messages. 
  */
 @threadsafe
-private[kafka] class Log( val dir: File, val maxSize: Long,
-                          val flushInterval: Int, val rollIntervalMs: Long, val needRecovery: Boolean,
-                          time: Time, brokerId: Int = 0) extends Logging {
+private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessageSize: Int, val flushInterval: Int,
+                          val rollIntervalMs: Long, val needRecovery: Boolean, time: Time,
+                          brokerId: Int = 0) extends Logging {
   this.logIdent = "[Kafka Log on Broker " + brokerId + "], "
 
   import kafka.log.Log._
@@ -235,6 +235,7 @@ private[kafka] class Log( val dir: File,
    */
   def append(messages: ByteBufferMessageSet): Unit = {
     // validate the messages
+    messages.verifyMessageSize(maxMessageSize)
     var numberOfMessages = 0
     for(messageAndOffset <- messages) {
       if(!messageAndOffset.message.isValid)
@@ -328,7 +329,7 @@ private[kafka] class Log( val dir: File,
    * Roll the log over if necessary
    */
   private def maybeRoll(segment: LogSegment) {
-    if ((segment.messageSet.sizeInBytes > maxSize) ||
+    if ((segment.messageSet.sizeInBytes > maxLogFileSize) ||
        ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)))
       roll()
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Wed Sep 12 01:30:09 2012
@@ -66,7 +66,7 @@ private[kafka] class LogManager(val conf
         val topic = Utils.getTopicPartition(dir.getName)._1
         val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
         val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
-        val log = new Log(dir, maxLogFileSize, flushInterval, rollIntervalMs, needRecovery, time, config.brokerId)
+        val log = new Log(dir, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needRecovery, time, config.brokerId)
         val topicPartition = Utils.getTopicPartition(dir.getName)
         logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]())
         val parts = logs.get(topicPartition._1)
@@ -108,7 +108,7 @@ private[kafka] class LogManager(val conf
       d.mkdirs()
       val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
       val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
-      new Log(d, maxLogFileSize, flushInterval, rollIntervalMs, false, time, config.brokerId)
+      new Log(d, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, false, time, config.brokerId)
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Wed Sep 12 01:30:09 2012
@@ -101,13 +101,6 @@ class SyncProducer(val config: SyncProdu
    * Send a message
    */
   def send(producerRequest: ProducerRequest): ProducerResponse = {
-    for( topicData <- producerRequest.data ) {
-      for( partitionData <- topicData.partitionDataArray ) {
-	      verifyMessageSize(partitionData.messages)
-        val setSize = partitionData.messages.sizeInBytes.asInstanceOf[Int]
-        trace("Got message set with " + setSize + " bytes to send")
-      }
-    }
     val response = doSend(producerRequest)
     ProducerResponse.readFrom(response.buffer)
   }
@@ -127,12 +120,6 @@ class SyncProducer(val config: SyncProdu
     }
   }
 
-  private def verifyMessageSize(messages: MessageSet) {
-    for (messageAndOffset <- messages)
-      if (messageAndOffset.message.payloadSize > config.maxMessageSize)
-        throw new MessageSizeTooLargeException
-  }
-
   private def reconnect() {
     disconnect()
     connect()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Wed Sep 12 01:30:09 2012
@@ -192,8 +192,12 @@ class DefaultEventHandler[K,V](config: P
         val errors = new ListBuffer[(String, Int)]
         for( topic <- topicData; partition <- topic.partitionDataArray ) {
           msgIdx += 1
-          if(msgIdx > response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError)
+          if (msgIdx >= response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError) {
             errors.append((topic.topic, partition.partition))
+            if (msgIdx < response.errors.size)
+              warn("Received error " + ErrorMapping.exceptionFor(response.errors(msgIdx)) +
+                   "from broker %d on %s:%d".format(brokerId, topic.topic, partition.partition))
+          }
         }
         errors
       } catch {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Sep 12 01:30:09 2012
@@ -127,13 +127,13 @@ class KafkaApis(val requestChannel: Requ
 
     val response = produceToLocalLog(produceRequest)
     debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+    val partitionsInError = response.errors.count(_ != ErrorMapping.NoError)
     
     for (topicData <- produceRequest.data)
       maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray)
     
-    if (produceRequest.requiredAcks == 0 ||
-        produceRequest.requiredAcks == 1 ||
-        produceRequest.data.size <= 0)
+    if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1 ||
+        produceRequest.data.size <= 0 || partitionsInError == response.errors.size)
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
     else {
       // create a list of (topic, partition) pairs to use as keys for this delayed request

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Wed Sep 12 01:30:09 2012
@@ -51,6 +51,9 @@ class KafkaConfig private (val props: Ve
   
   /* the maximum number of bytes in a socket request */
   val maxSocketRequestSize: Int = props.getIntInRange("max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
+
+  /* the maximum size of message that the server can receive */
+  val maxMessageSize = props.getIntInRange("max.message.size", 1000000, (0, Int.MaxValue))
   
   /* the number of network threads that the server uses for handling network requests */
   val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala Wed Sep 12 01:30:09 2012
@@ -19,6 +19,7 @@ package kafka.log
 
 import kafka.message._
 import kafka.utils.{SystemTime, TestUtils, Utils}
+import kafka.server.KafkaConfig
 
 object TestLogPerformance {
 
@@ -29,8 +30,10 @@ object TestLogPerformance {
     val messageSize = args(1).toInt
     val batchSize = args(2).toInt
     val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
+    val props = TestUtils.createBrokerConfig(0, -1)
+    val config = new KafkaConfig(props)
     val dir = TestUtils.tempDir()
-    val log = new Log(dir, 50*1024*1024, 5000000, 24*7*60*60*1000L, false, SystemTime)
+    val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, false, SystemTime)
     val bytes = new Array[Byte](messageSize)
     new java.util.Random().nextBytes(bytes)
     val message = new Message(bytes)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Wed Sep 12 01:30:09 2012
@@ -23,18 +23,22 @@ import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
-import kafka.common.{KafkaException, OffsetOutOfRangeException}
+import kafka.common.{MessageSizeTooLargeException, KafkaException, OffsetOutOfRangeException}
 import kafka.utils._
 import scala.Some
+import kafka.server.KafkaConfig
 
 class LogTest extends JUnitSuite {
   
   var logDir: File = null
   val time = new MockTime
+  var config: KafkaConfig = null
 
   @Before
   def setUp() {
     logDir = TestUtils.tempDir()
+    val props = TestUtils.createBrokerConfig(0, -1)
+    config = new KafkaConfig(props)
   }
 
   @After
@@ -55,7 +59,7 @@ class LogTest extends JUnitSuite {
     val time: MockTime = new MockTime()
 
     // create a log
-    val log = new Log(logDir, 1000, 1000, rollMs, false, time)
+    val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, false, time)
     time.currentMs += rollMs + 1
 
     // segment age is less than its limit
@@ -89,7 +93,7 @@ class LogTest extends JUnitSuite {
     val logFileSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be 10 messages
 
     // create a log
-    val log = new Log(logDir, logFileSize, 1000, 10000, false, time)
+    val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, false, time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -102,14 +106,14 @@ class LogTest extends JUnitSuite {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
+    new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
   }
 
   @Test
   def testLoadInvalidLogsFails() {
     createEmptyLogs(logDir, 0, 15)
     try {
-      new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
+      new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
       fail("Allowed load of corrupt logs without complaint.")
     } catch {
       case e: KafkaException => "This is good"
@@ -118,7 +122,7 @@ class LogTest extends JUnitSuite {
 
   @Test
   def testAppendAndRead() {
-    val log = new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
+    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 10)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -135,7 +139,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val log = new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
+    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
     assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -155,7 +159,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val log = new Log(logDir, 100, 1000, 24*7*60*60*1000L, false, time)
+    val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
     val numMessages = 100
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -210,7 +214,7 @@ class LogTest extends JUnitSuite {
   def testEdgeLogRolls() {
     {
       // first test a log segment starting at 0
-      val log = new Log(logDir, 100, 1000, 24*7*60*60*1000L, false, time)
+      val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
       val curOffset = log.logEndOffset
       assertEquals(curOffset, 0)
 
@@ -223,7 +227,7 @@ class LogTest extends JUnitSuite {
 
     {
       // second test an empty log segment starting at none-zero
-      val log = new Log(logDir, 100, 1000, 24*7*60*60*1000L, false, time)
+      val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
       val numMessages = 1
       for(i <- 0 until numMessages)
         log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -246,6 +250,35 @@ class LogTest extends JUnitSuite {
     }
   }
 
+  @Test
+  def testMessageSizeCheck() {
+    val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes()), new Message("bethe".getBytes()))
+    val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
+
+    // append messages to log
+    val log = new Log(logDir, 100, 5, 1000, config.logRollHours*60*60*1000L, false, time)
+
+    var ret =
+      try {
+        log.append(first)
+        true
+      }
+      catch {
+        case e: MessageSizeTooLargeException => false
+      }
+    assert(ret, "First messageset should pass.")
+
+    ret =
+      try {
+        log.append(second)
+        false
+      }
+      catch {
+        case e:MessageSizeTooLargeException => true
+      }
+    assert(ret, "Second message set should throw MessageSizeTooLargeException.")
+  }
+
   def assertContains(ranges: Array[Range], offset: Long) = {
     Log.findRange(ranges, offset) match {
       case Some(range) => 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Wed Sep 12 01:30:09 2012
@@ -93,7 +93,7 @@ class SyncProducerTest extends JUnit3Sui
   }
 
   @Test
-  def testSingleMessageSizeTooLarge() {
+  def testMessageSizeTooLarge() {
     val server = servers.head
     val props = new Properties()
     props.put("host", "localhost")
@@ -101,35 +101,25 @@ class SyncProducerTest extends JUnit3Sui
     props.put("buffer.size", "102400")
     props.put("connect.timeout.ms", "300")
     props.put("reconnect.interval", "500")
-    props.put("max.message.size", "100")
     val producer = new SyncProducer(new SyncProducerConfig(props))
-    val bytes = new Array[Byte](101)
-    try {
-      producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(bytes))))
-      Assert.fail("Message was too large to send, SyncProducer should have thrown exception.")
-    } catch {
-      case e: MessageSizeTooLargeException => /* success */
-    }
-  }
+    CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)
 
-  @Test
-  def testCompressedMessageSizeTooLarge() {
-    val server = servers.head
-    val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", server.socketServer.port.toString)
-    props.put("buffer.size", "102400")
-    props.put("connect.timeout.ms", "300")
-    props.put("reconnect.interval", "500")
-    props.put("max.message.size", "100")
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-    val bytes = new Array[Byte](101)
-    try {
-      producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = new Message(bytes))))
-      Assert.fail("Message was too large to send, SyncProducer should have thrown exception for DefaultCompressionCodec.")
-    } catch {
-      case e: MessageSizeTooLargeException => /* success */
-    }
+    val message1 = new Message(new Array[Byte](1000001))
+    val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
+    val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1))
+
+    Assert.assertEquals(1, response1.errors.length)
+    Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.errors(0))
+    Assert.assertEquals(-1L, response1.offsets(0))
+
+    val message2 = new Message(new Array[Byte](1000000))
+    val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
+    val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2))
+
+    Assert.assertEquals(1, response2.errors.length)
+    Assert.assertEquals(ErrorMapping.NoError, response2.errors(0))
+    Assert.assertEquals(messageSet2.sizeInBytes, response2.offsets(0))
   }
 
   @Test