You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/24 16:32:11 UTC
svn commit: r1376943 - in /incubator/kafka/trunk/core/src:
main/scala/kafka/log/ main/scala/kafka/server/ test/scala/other/kafka/
test/scala/unit/kafka/log/
Author: junrao
Date: Fri Aug 24 14:32:10 2012
New Revision: 1376943
URL: http://svn.apache.org/viewvc?rev=1376943&view=rev
Log:
message size not checked at the server; patched by Swapnil Ghike; reviewed by Jun Rao, Neha Narkhede; KAFKA-469
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1376943&r1=1376942&r2=1376943&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Fri Aug 24 14:32:10 2012
@@ -101,7 +101,8 @@ private[log] class LogSegment(val file:
* An append-only log for storing messages.
*/
@threadsafe
-private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging {
+private[log] class Log(val dir: File, val maxSize: Long, val maxMessageSize: Int,
+ val flushInterval: Int, val needRecovery: Boolean) extends Logging {
/* A lock that guards all modifications to the log */
private val lock = new Object
@@ -201,6 +202,7 @@ private[log] class Log(val dir: File, va
*/
def append(messages: ByteBufferMessageSet): Unit = {
// validate the messages
+ messages.verifyMessageSize(maxMessageSize)
var numberOfMessages = 0
for(messageAndOffset <- messages) {
if(!messageAndOffset.message.isValid)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1376943&r1=1376942&r2=1376943&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Fri Aug 24 14:32:10 2012
@@ -67,7 +67,7 @@ private[kafka] class LogManager(val conf
warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
} else {
info("Loading log '" + dir.getName() + "'")
- val log = new Log(dir, maxSize, flushInterval, needRecovery)
+ val log = new Log(dir, maxSize, config.maxMessageSize, flushInterval, needRecovery)
val topicPartion = Utils.getTopicPartition(dir.getName)
logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
val parts = logs.get(topicPartion._1)
@@ -146,7 +146,7 @@ private[kafka] class LogManager(val conf
logCreationLock synchronized {
val d = new File(logDir, topic + "-" + partition)
d.mkdirs()
- new Log(d, maxSize, flushInterval, false)
+ new Log(d, maxSize, config.maxMessageSize, flushInterval, false)
}
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1376943&r1=1376942&r2=1376943&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala Fri Aug 24 14:32:10 2012
@@ -42,7 +42,10 @@ class KafkaConfig(props: Properties) ext
/* the maximum number of bytes in a socket request */
val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
-
+
+ /* the maximum size of message that the server can receive */
+ val maxMessageSize = Utils.getIntInRange(props, "max.message.size", 1000000, (0, Int.MaxValue))
+
/* the number of worker threads that the server uses for handling all client requests*/
val numThreads = Utils.getIntInRange(props, "num.threads", Runtime.getRuntime().availableProcessors, (1, Int.MaxValue))
Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1376943&r1=1376942&r2=1376943&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala Fri Aug 24 14:32:10 2012
@@ -19,10 +19,13 @@ package kafka.log
import kafka.message._
import kafka.utils.{TestUtils, Utils}
+import kafka.server.KafkaConfig
object TestLogPerformance {
def main(args: Array[String]): Unit = {
+ val props = TestUtils.createBrokerConfig(0, -1)
+ val config = new KafkaConfig(props)
if(args.length < 4)
Utils.croak("USAGE: java " + getClass().getName() + " num_messages message_size batch_size compression_codec")
val numMessages = args(0).toInt
@@ -30,7 +33,7 @@ object TestLogPerformance {
val batchSize = args(2).toInt
val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
val dir = TestUtils.tempDir()
- val log = new Log(dir, 50*1024*1024, 5000000, false)
+ val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, false)
val bytes = new Array[Byte](messageSize)
new java.util.Random().nextBytes(bytes)
val message = new Message(bytes)
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1376943&r1=1376942&r2=1376943&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Fri Aug 24 14:32:10 2012
@@ -23,16 +23,19 @@ import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.utils.{Utils, TestUtils, Range}
-import kafka.common.OffsetOutOfRangeException
+import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.server.KafkaConfig
class LogTest extends JUnitSuite {
var logDir: File = null
-
+ var config:KafkaConfig = null
@Before
def setUp() {
logDir = TestUtils.tempDir()
+ val props = TestUtils.createBrokerConfig(0, -1)
+ config = new KafkaConfig(props)
}
@After
@@ -48,14 +51,14 @@ class LogTest extends JUnitSuite {
@Test
def testLoadEmptyLog() {
createEmptyLogs(logDir, 0)
- new Log(logDir, 1024, 1000, false)
+ new Log(logDir, 1024, config.maxMessageSize, 1000, false)
}
@Test
def testLoadInvalidLogsFails() {
createEmptyLogs(logDir, 0, 15)
try {
- new Log(logDir, 1024, 1000, false)
+ new Log(logDir, 1024, config.maxMessageSize, 1000, false)
fail("Allowed load of corrupt logs without complaint.")
} catch {
case e: IllegalStateException => "This is good"
@@ -64,7 +67,7 @@ class LogTest extends JUnitSuite {
@Test
def testAppendAndRead() {
- val log = new Log(logDir, 1024, 1000, false)
+ val log = new Log(logDir, 1024, config.maxMessageSize, 1000, false)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 10)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -81,7 +84,7 @@ class LogTest extends JUnitSuite {
@Test
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
- val log = new Log(logDir, 1024, 1000, false)
+ val log = new Log(logDir, 1024, config.maxMessageSize, 1000, false)
assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
try {
log.read(0, 1024)
@@ -101,7 +104,7 @@ class LogTest extends JUnitSuite {
@Test
def testLogRolls() {
/* create a multipart log with 100 messages */
- val log = new Log(logDir, 100, 1000, false)
+ val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
val numMessages = 100
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -156,7 +159,7 @@ class LogTest extends JUnitSuite {
def testEdgeLogRolls() {
{
// first test a log segment starting at 0
- val log = new Log(logDir, 100, 1000, false)
+ val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
val curOffset = log.nextAppendOffset
assertEquals(curOffset, 0)
@@ -169,7 +172,7 @@ class LogTest extends JUnitSuite {
{
// second test an empty log segment starting at none-zero
- val log = new Log(logDir, 100, 1000, false)
+ val log = new Log(logDir, 100, config.maxMessageSize, 1000, false)
val numMessages = 1
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -192,6 +195,35 @@ class LogTest extends JUnitSuite {
}
}
+ @Test
+ def testMessageSizeCheck() {
+ val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes()), new Message("bethe".getBytes()))
+ val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
+
+ // append messages to log
+ val log = new Log(logDir, 100, 5, 1000, false)
+
+ var ret =
+ try {
+ log.append(first)
+ true
+ }
+ catch {
+ case e: MessageSizeTooLargeException => false
+ }
+ assert(ret, "First messageset should pass.")
+
+ ret =
+ try {
+ log.append(second)
+ false
+ }
+ catch {
+ case e:MessageSizeTooLargeException => true
+ }
+ assert(ret, "Second message set should throw MessageSizeTooLargeException.")
+ }
+
def assertContains(ranges: Array[Range], offset: Long) = {
Log.findRange(ranges, offset) match {
case Some(range) =>