You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/24 16:32:11 UTC

svn commit: r1376943 - in /incubator/kafka/trunk/core/src: main/scala/kafka/log/ main/scala/kafka/server/ test/scala/other/kafka/ test/scala/unit/kafka/log/

Author: junrao
Date: Fri Aug 24 14:32:10 2012
New Revision: 1376943

URL: http://svn.apache.org/viewvc?rev=1376943&view=rev
Log:
message size not checked at the server; patched by Swapnil Ghike; reviewed by Jun Rao, Neha Narkhede; KAFKA-469

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1376943&r1=1376942&r2=1376943&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Fri Aug 24 14:32:10 2012
@@ -101,7 +101,8 @@ private[log] class LogSegment(val file: 
  * An append-only log for storing messages. 
  */
 @threadsafe
-private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging {
+private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int,
+                       val flushInterval: Int, val needRecovery: Boolean) extends Logging {
 
   /* A lock that guards all modifications to the log */
   private val lock = new Object
@@ -201,6 +202,7 @@ private[log] class Log(val dir: File, va
    */
   def append(messages: ByteBufferMessageSet): Unit = {
     // validate the messages
+    messages.verifyMessageSize(maxMessageSize)
     var numberOfMessages = 0
     for(messageAndOffset <- messages) {
       if(!messageAndOffset.message.isValid)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1376943&r1=1376942&r2=1376943&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Fri Aug 24 14:32:10 2012
@@ -67,7 +67,7 @@ private[kafka] class LogManager(val conf
         warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
       } else {
         info("Loading log '" + dir.getName() + "'")
-        val log = new Log(dir, maxSize, flushInterval, needRecovery)
+        val log = new Log(dir, maxSize, config.maxMessageSize, flushInterval, needRecovery)
         val topicPartion = Utils.getTopicPartition(dir.getName)
         logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
         val parts = logs.get(topicPartion._1)
@@ -146,7 +146,7 @@ private[kafka] class LogManager(val conf
     logCreationLock synchronized {
       val d = new File(logDir, topic + "-" + partition)
       d.mkdirs()
-      new Log(d, maxSize, flushInterval, false)
+      new Log(d, maxSize, config.maxMessageSize, flushInterval, false)
     }
   }
   

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1376943&r1=1376942&r2=1376943&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala Fri Aug 24 14:32:10 2012
@@ -42,7 +42,10 @@ class KafkaConfig(props: Properties) ext
   
   /* the maximum number of bytes in a socket request */
   val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
-  
+
+  /* the maximum size of message that the server can receive */
+  val maxMessageSize = Utils.getIntInRange(props, "max.message.size", 1000000, (0, Int.MaxValue))
+
   /* the number of worker threads that the server uses for handling all client requests*/
   val numThreads = Utils.getIntInRange(props, "num.threads", Runtime.getRuntime().availableProcessors, (1, Int.MaxValue))
   

Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1376943&r1=1376942&r2=1376943&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala Fri Aug 24 14:32:10 2012
@@ -19,10 +19,13 @@ package kafka.log
 
 import kafka.message._
 import kafka.utils.{TestUtils, Utils}
+import kafka.server.KafkaConfig
 
 object TestLogPerformance {
 
   def main(args: Array[String]): Unit = {
+    val props = TestUtils.createBrokerConfig(0, -1)
+    val config = new KafkaConfig(props)
     if(args.length < 4)
       Utils.croak("USAGE: java " + getClass().getName() + " num_messages message_size batch_size compression_codec")
     val numMessages = args(0).toInt
@@ -30,7 +33,7 @@ object TestLogPerformance {
     val batchSize = args(2).toInt
     val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
     val dir = TestUtils.tempDir()
-    val log = new Log(dir, 50*1024*1024, 5000000, false)
+    val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, false)
     val bytes = new Array[Byte](messageSize)
     new java.util.Random().nextBytes(bytes)
     val message = new Message(bytes)

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1376943&r1=1376942&r2=1376943&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Fri Aug 24 14:32:10 2012
@@ -23,16 +23,19 @@ import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils.{Utils, TestUtils, Range}
-import kafka.common.OffsetOutOfRangeException
+import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.server.KafkaConfig
 
 class LogTest extends JUnitSuite {
   
   var logDir: File = null
-
+  var config:KafkaConfig = null
   @Before
   def setUp() {
     logDir = TestUtils.tempDir()
+    val props = TestUtils.createBrokerConfig(0, -1)
+    config = new KafkaConfig(props)
   }
 
   @After
@@ -48,14 +51,14 @@ class LogTest extends JUnitSuite {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    new Log(logDir, 1024, 1000, false)
+    new Log(logDir, 1024, config.maxMessageSize, 1000, false)
   }
   
   @Test
   def testLoadInvalidLogsFails() {
     createEmptyLogs(logDir, 0, 15)
     try {
-      new Log(logDir, 1024, 1000, false)
+      new Log(logDir, 1024, config.maxMessageSize, 1000, false)
       fail("Allowed load of corrupt logs without complaint.")
     } catch {
       case e: IllegalStateException => "This is good"
@@ -64,7 +67,7 @@ class LogTest extends JUnitSuite {
   
   @Test
   def testAppendAndRead() {
-    val log = new Log(logDir, 1024, 1000, false)
+    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, false)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 10)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -81,7 +84,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val log = new Log(logDir, 1024, 1000, false)
+    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, false)
     assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -101,7 +104,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val log = new Log(logDir, 100, 1000, false)
+    val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
     val numMessages = 100
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -156,7 +159,7 @@ class LogTest extends JUnitSuite {
   def testEdgeLogRolls() {
     {
       // first test a log segment starting at 0
-      val log = new Log(logDir, 100, 1000, false)
+      val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
       val curOffset = log.nextAppendOffset
       assertEquals(curOffset, 0)
 
@@ -169,7 +172,7 @@ class LogTest extends JUnitSuite {
 
     {
       // second test an empty log segment starting at none-zero
-      val log = new Log(logDir, 100, 1000, false)
+      val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
       val numMessages = 1
       for(i <- 0 until numMessages)
         log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -192,6 +195,35 @@ class LogTest extends JUnitSuite {
     }
   }
 
+  @Test
+  def testMessageSizeCheck() {
+    val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes()), new Message("bethe".getBytes()))
+    val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
+
+    // append messages to log
+    val log = new Log(logDir, 100, 5, 1000, false)
+
+    var ret =
+    try {
+      log.append(first)
+      true
+    }
+    catch {
+      case e: MessageSizeTooLargeException => false
+    }
+    assert(ret, "First messageset should pass.")
+
+    ret =
+    try {
+      log.append(second)
+      false
+    }
+    catch {
+      case e:MessageSizeTooLargeException => true
+    }
+    assert(ret, "Second message set should throw MessageSizeTooLargeException.")
+  }
+
   def assertContains(ranges: Array[Range], offset: Long) = {
     Log.findRange(ranges, offset) match {
       case Some(range) =>