You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/12 03:30:10 UTC
svn commit: r1383730 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/common/ main/scala/kafka/log/ main/scala/kafka/producer/
main/scala/kafka/producer/async/ main/scala/kafka/server/
test/scala/other/kafka/ test/scala/unit/kafka/log/ tes...
Author: junrao
Date: Wed Sep 12 01:30:09 2012
New Revision: 1383730
URL: http://svn.apache.org/viewvc?rev=1383730&view=rev
Log:
Check max message size on server; patched by Swapnil Ghike; reviewed by Joel Koshy and Jun Rao; KAFKA-490
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Wed Sep 12 01:30:09 2012
@@ -40,6 +40,7 @@ object ErrorMapping {
val RequestTimedOutCode: Short = 8
val BrokerNotAvailableCode: Short = 9
val ReplicaNotAvailableCode: Short = 10
+ val MessageSizeTooLargeCode: Short = 11
private val exceptionToCode =
Map[Class[Throwable], Short](
@@ -52,7 +53,8 @@ object ErrorMapping {
classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
- classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode
+ classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
+ classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode
).withDefaultValue(UnknownCode)
/* invert the mapping */
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Wed Sep 12 01:30:09 2012
@@ -128,9 +128,9 @@ class LogSegment(val file: File, val mes
* An append-only log for storing messages.
*/
@threadsafe
-private[kafka] class Log( val dir: File, val maxSize: Long,
- val flushInterval: Int, val rollIntervalMs: Long, val needRecovery: Boolean,
- time: Time, brokerId: Int = 0) extends Logging {
+private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessageSize: Int, val flushInterval: Int,
+ val rollIntervalMs: Long, val needRecovery: Boolean, time: Time,
+ brokerId: Int = 0) extends Logging {
this.logIdent = "[Kafka Log on Broker " + brokerId + "], "
import kafka.log.Log._
@@ -235,6 +235,7 @@ private[kafka] class Log( val dir: File,
*/
def append(messages: ByteBufferMessageSet): Unit = {
// validate the messages
+ messages.verifyMessageSize(maxMessageSize)
var numberOfMessages = 0
for(messageAndOffset <- messages) {
if(!messageAndOffset.message.isValid)
@@ -328,7 +329,7 @@ private[kafka] class Log( val dir: File,
* Roll the log over if necessary
*/
private def maybeRoll(segment: LogSegment) {
- if ((segment.messageSet.sizeInBytes > maxSize) ||
+ if ((segment.messageSet.sizeInBytes > maxLogFileSize) ||
((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)))
roll()
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Wed Sep 12 01:30:09 2012
@@ -66,7 +66,7 @@ private[kafka] class LogManager(val conf
val topic = Utils.getTopicPartition(dir.getName)._1
val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
- val log = new Log(dir, maxLogFileSize, flushInterval, rollIntervalMs, needRecovery, time, config.brokerId)
+ val log = new Log(dir, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needRecovery, time, config.brokerId)
val topicPartition = Utils.getTopicPartition(dir.getName)
logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]())
val parts = logs.get(topicPartition._1)
@@ -108,7 +108,7 @@ private[kafka] class LogManager(val conf
d.mkdirs()
val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
- new Log(d, maxLogFileSize, flushInterval, rollIntervalMs, false, time, config.brokerId)
+ new Log(d, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, false, time, config.brokerId)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Wed Sep 12 01:30:09 2012
@@ -101,13 +101,6 @@ class SyncProducer(val config: SyncProdu
* Send a message
*/
def send(producerRequest: ProducerRequest): ProducerResponse = {
- for( topicData <- producerRequest.data ) {
- for( partitionData <- topicData.partitionDataArray ) {
- verifyMessageSize(partitionData.messages)
- val setSize = partitionData.messages.sizeInBytes.asInstanceOf[Int]
- trace("Got message set with " + setSize + " bytes to send")
- }
- }
val response = doSend(producerRequest)
ProducerResponse.readFrom(response.buffer)
}
@@ -127,12 +120,6 @@ class SyncProducer(val config: SyncProdu
}
}
- private def verifyMessageSize(messages: MessageSet) {
- for (messageAndOffset <- messages)
- if (messageAndOffset.message.payloadSize > config.maxMessageSize)
- throw new MessageSizeTooLargeException
- }
-
private def reconnect() {
disconnect()
connect()
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Wed Sep 12 01:30:09 2012
@@ -192,8 +192,12 @@ class DefaultEventHandler[K,V](config: P
val errors = new ListBuffer[(String, Int)]
for( topic <- topicData; partition <- topic.partitionDataArray ) {
msgIdx += 1
- if(msgIdx > response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError)
+ if (msgIdx >= response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError) {
errors.append((topic.topic, partition.partition))
+ if (msgIdx < response.errors.size)
+ warn("Received error " + ErrorMapping.exceptionFor(response.errors(msgIdx)) +
+ "from broker %d on %s:%d".format(brokerId, topic.topic, partition.partition))
+ }
}
errors
} catch {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Sep 12 01:30:09 2012
@@ -127,13 +127,13 @@ class KafkaApis(val requestChannel: Requ
val response = produceToLocalLog(produceRequest)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+ val partitionsInError = response.errors.count(_ != ErrorMapping.NoError)
for (topicData <- produceRequest.data)
maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray)
- if (produceRequest.requiredAcks == 0 ||
- produceRequest.requiredAcks == 1 ||
- produceRequest.data.size <= 0)
+ if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1 ||
+ produceRequest.data.size <= 0 || partitionsInError == response.errors.size)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
else {
// create a list of (topic, partition) pairs to use as keys for this delayed request
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Wed Sep 12 01:30:09 2012
@@ -51,6 +51,9 @@ class KafkaConfig private (val props: Ve
/* the maximum number of bytes in a socket request */
val maxSocketRequestSize: Int = props.getIntInRange("max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
+
+ /* the maximum size of message that the server can receive */
+ val maxMessageSize = props.getIntInRange("max.message.size", 1000000, (0, Int.MaxValue))
/* the number of network threads that the server uses for handling network requests */
val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue))
Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala Wed Sep 12 01:30:09 2012
@@ -19,6 +19,7 @@ package kafka.log
import kafka.message._
import kafka.utils.{SystemTime, TestUtils, Utils}
+import kafka.server.KafkaConfig
object TestLogPerformance {
@@ -29,8 +30,10 @@ object TestLogPerformance {
val messageSize = args(1).toInt
val batchSize = args(2).toInt
val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
+ val props = TestUtils.createBrokerConfig(0, -1)
+ val config = new KafkaConfig(props)
val dir = TestUtils.tempDir()
- val log = new Log(dir, 50*1024*1024, 5000000, 24*7*60*60*1000L, false, SystemTime)
+ val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, false, SystemTime)
val bytes = new Array[Byte](messageSize)
new java.util.Random().nextBytes(bytes)
val message = new Message(bytes)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Wed Sep 12 01:30:09 2012
@@ -23,18 +23,22 @@ import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
-import kafka.common.{KafkaException, OffsetOutOfRangeException}
+import kafka.common.{MessageSizeTooLargeException, KafkaException, OffsetOutOfRangeException}
import kafka.utils._
import scala.Some
+import kafka.server.KafkaConfig
class LogTest extends JUnitSuite {
var logDir: File = null
val time = new MockTime
+ var config: KafkaConfig = null
@Before
def setUp() {
logDir = TestUtils.tempDir()
+ val props = TestUtils.createBrokerConfig(0, -1)
+ config = new KafkaConfig(props)
}
@After
@@ -55,7 +59,7 @@ class LogTest extends JUnitSuite {
val time: MockTime = new MockTime()
// create a log
- val log = new Log(logDir, 1000, 1000, rollMs, false, time)
+ val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, false, time)
time.currentMs += rollMs + 1
// segment age is less than its limit
@@ -89,7 +93,7 @@ class LogTest extends JUnitSuite {
val logFileSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be 10 messages
// create a log
- val log = new Log(logDir, logFileSize, 1000, 10000, false, time)
+ val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, false, time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
// segments expire in size
@@ -102,14 +106,14 @@ class LogTest extends JUnitSuite {
@Test
def testLoadEmptyLog() {
createEmptyLogs(logDir, 0)
- new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
+ new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
}
@Test
def testLoadInvalidLogsFails() {
createEmptyLogs(logDir, 0, 15)
try {
- new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
+ new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
fail("Allowed load of corrupt logs without complaint.")
} catch {
case e: KafkaException => "This is good"
@@ -118,7 +122,7 @@ class LogTest extends JUnitSuite {
@Test
def testAppendAndRead() {
- val log = new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
+ val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 10)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -135,7 +139,7 @@ class LogTest extends JUnitSuite {
@Test
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
- val log = new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
+ val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
try {
log.read(0, 1024)
@@ -155,7 +159,7 @@ class LogTest extends JUnitSuite {
@Test
def testLogRolls() {
/* create a multipart log with 100 messages */
- val log = new Log(logDir, 100, 1000, 24*7*60*60*1000L, false, time)
+ val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
val numMessages = 100
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -210,7 +214,7 @@ class LogTest extends JUnitSuite {
def testEdgeLogRolls() {
{
// first test a log segment starting at 0
- val log = new Log(logDir, 100, 1000, 24*7*60*60*1000L, false, time)
+ val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
val curOffset = log.logEndOffset
assertEquals(curOffset, 0)
@@ -223,7 +227,7 @@ class LogTest extends JUnitSuite {
{
// second test an empty log segment starting at none-zero
- val log = new Log(logDir, 100, 1000, 24*7*60*60*1000L, false, time)
+ val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
val numMessages = 1
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -246,6 +250,35 @@ class LogTest extends JUnitSuite {
}
}
+ @Test
+ def testMessageSizeCheck() {
+ val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes()), new Message("bethe".getBytes()))
+ val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
+
+ // append messages to log
+ val log = new Log(logDir, 100, 5, 1000, config.logRollHours*60*60*1000L, false, time)
+
+ var ret =
+ try {
+ log.append(first)
+ true
+ }
+ catch {
+ case e: MessageSizeTooLargeException => false
+ }
+ assert(ret, "First messageset should pass.")
+
+ ret =
+ try {
+ log.append(second)
+ false
+ }
+ catch {
+ case e:MessageSizeTooLargeException => true
+ }
+ assert(ret, "Second message set should throw MessageSizeTooLargeException.")
+ }
+
def assertContains(ranges: Array[Range], offset: Long) = {
Log.findRange(ranges, offset) match {
case Some(range) =>
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1383730&r1=1383729&r2=1383730&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Wed Sep 12 01:30:09 2012
@@ -93,7 +93,7 @@ class SyncProducerTest extends JUnit3Sui
}
@Test
- def testSingleMessageSizeTooLarge() {
+ def testMessageSizeTooLarge() {
val server = servers.head
val props = new Properties()
props.put("host", "localhost")
@@ -101,35 +101,25 @@ class SyncProducerTest extends JUnit3Sui
props.put("buffer.size", "102400")
props.put("connect.timeout.ms", "300")
props.put("reconnect.interval", "500")
- props.put("max.message.size", "100")
val producer = new SyncProducer(new SyncProducerConfig(props))
- val bytes = new Array[Byte](101)
- try {
- producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(bytes))))
- Assert.fail("Message was too large to send, SyncProducer should have thrown exception.")
- } catch {
- case e: MessageSizeTooLargeException => /* success */
- }
- }
+ CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)
- @Test
- def testCompressedMessageSizeTooLarge() {
- val server = servers.head
- val props = new Properties()
- props.put("host", "localhost")
- props.put("port", server.socketServer.port.toString)
- props.put("buffer.size", "102400")
- props.put("connect.timeout.ms", "300")
- props.put("reconnect.interval", "500")
- props.put("max.message.size", "100")
- val producer = new SyncProducer(new SyncProducerConfig(props))
- val bytes = new Array[Byte](101)
- try {
- producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = new Message(bytes))))
- Assert.fail("Message was too large to send, SyncProducer should have thrown exception for DefaultCompressionCodec.")
- } catch {
- case e: MessageSizeTooLargeException => /* success */
- }
+ val message1 = new Message(new Array[Byte](1000001))
+ val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
+ val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1))
+
+ Assert.assertEquals(1, response1.errors.length)
+ Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.errors(0))
+ Assert.assertEquals(-1L, response1.offsets(0))
+
+ val message2 = new Message(new Array[Byte](1000000))
+ val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
+ val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2))
+
+ Assert.assertEquals(1, response2.errors.length)
+ Assert.assertEquals(ErrorMapping.NoError, response2.errors(0))
+ Assert.assertEquals(messageSet2.sizeInBytes, response2.offsets(0))
}
@Test