You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/08/20 06:08:06 UTC
svn commit: r1159837 - in /incubator/kafka/trunk/core/src:
main/scala/kafka/network/ main/scala/kafka/server/
test/scala/unit/kafka/network/ test/scala/unit/kafka/utils/
Author: jkreps
Date: Sat Aug 20 04:08:06 2011
New Revision: 1159837
URL: http://svn.apache.org/viewvc?rev=1159837&view=rev
Log:
KAFKA-99 Enforce a max request size in socket server to avoid running out of memory with very large requests.
Added:
incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/
incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala?rev=1159837&r1=1159836&r2=1159837&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala Sat Aug 20 04:08:06 2011
@@ -56,8 +56,10 @@ private[kafka] class BoundedByteBufferRe
if(contentBuffer == null && !sizeBuffer.hasRemaining) {
sizeBuffer.rewind()
val size = sizeBuffer.getInt()
- if(size <= 0 || size > maxSize)
- throw new InvalidRequestException(size + " is not a valid message size")
+ if(size <= 0)
+ throw new InvalidRequestException("%d is not a valid request size.".format(size))
+ if(size > maxSize)
+ throw new InvalidRequestException("Request of length %d is not valid, it is larger than the maximum size of %d bytes.".format(size, maxSize))
contentBuffer = byteBufferAllocate(size)
}
// if we have a buffer read some stuff into it
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala?rev=1159837&r1=1159836&r2=1159837&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala Sat Aug 20 04:08:06 2011
@@ -34,10 +34,11 @@ import kafka.api.RequestKeys
* 1 Acceptor thread that handles new connections
* N Processor threads that each have their own selectors and handle all requests from their connections synchronously
*/
-private[kafka] class SocketServer(val port: Int,
+class SocketServer(val port: Int,
val numProcessorThreads: Int,
monitoringPeriodSecs: Int,
- private val handlerFactory: Handler.HandlerMapping) {
+ private val handlerFactory: Handler.HandlerMapping,
+ val maxRequestSize: Int = Int.MaxValue) {
private val logger = Logger.getLogger(classOf[SocketServer])
private val time = SystemTime
@@ -50,7 +51,7 @@ private[kafka] class SocketServer(val po
*/
def startup() {
for(i <- 0 until numProcessorThreads) {
- processors(i) = new Processor(handlerFactory, time, stats)
+ processors(i) = new Processor(handlerFactory, time, stats, maxRequestSize)
Utils.newThread("kafka-processor-" + i, processors(i), false).start()
}
Utils.newThread("kafka-acceptor", acceptor, false).start()
@@ -179,8 +180,9 @@ private[kafka] class Acceptor(val port:
* each of which has its own selectors
*/
private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
- val time: Time,
- val stats: SocketServerStats) extends AbstractServerThread {
+ val time: Time,
+ val stats: SocketServerStats,
+ val maxRequestSize: Int) extends AbstractServerThread {
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();
private val requestLogger = Logger.getLogger("kafka.request.logger")
@@ -211,11 +213,14 @@ private[kafka] class Processor(val handl
throw new IllegalStateException("Unrecognized key state for processor thread.")
} catch {
case e: EOFException => {
- logger.info("Closing socket for " + channelFor(key).socket.getInetAddress + ".")
+ logger.info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
close(key)
+ }
+ case e: InvalidRequestException => {
+ logger.info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
+ close(key)
} case e: Throwable => {
- logger.info("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error")
- logger.error(e, e)
+ logger.error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
close(key)
}
}
@@ -293,7 +298,7 @@ private[kafka] class Processor(val handl
val socketChannel = channelFor(key)
var request = key.attachment.asInstanceOf[Receive]
if(key.attachment == null) {
- request = new BoundedByteBufferReceive()
+ request = new BoundedByteBufferReceive(maxRequestSize)
key.attach(request)
}
val read = request.readFrom(socketChannel)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1159837&r1=1159836&r2=1159837&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Sat Aug 20 04:08:06 2011
@@ -68,7 +68,8 @@ class KafkaServer(val config: KafkaConfi
socketServer = new SocketServer(config.port,
config.numThreads,
config.monitoringPeriodSecs,
- handlers.handlerFor)
+ handlers.handlerFor,
+ config.maxSocketRequestSize)
Utils.swallow(logger.warn, Utils.registerMBean(socketServer.stats, statsMBeanName))
socketServer.startup
/**
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1159837&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Sat Aug 20 04:08:06 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network;
+
+import java.net._
+import java.io._
+import java.nio._
+import java.nio.channels._
+import org.junit._
+import junit.framework.Assert._
+import org.scalatest.junit.JUnitSuite
+import kafka.utils.TestUtils
+import kafka.network._
+import java.util.Random
+import org.apache.log4j._
+
+class SocketServerTest extends JUnitSuite {
+
+ Logger.getLogger("kafka").setLevel(Level.INFO)
+
+ def echo(receive: Receive): Option[Send] = {
+ val id = receive.buffer.getShort
+ Some(new BoundedByteBufferSend(receive.buffer.slice))
+ }
+
+ val server = new SocketServer(port = TestUtils.choosePort,
+ numProcessorThreads = 1,
+ monitoringPeriodSecs = 30,
+ handlerFactory = (requestId: Short, receive: Receive) => echo,
+ maxRequestSize = 50)
+ server.startup()
+
+ def sendRequest(id: Short, request: Array[Byte]): Array[Byte] = {
+ val socket = new Socket("localhost", server.port)
+ val outgoing = new DataOutputStream(socket.getOutputStream)
+ outgoing.writeInt(request.length + 2)
+ outgoing.writeShort(id)
+ outgoing.write(request)
+ outgoing.flush()
+ val incoming = new DataInputStream(socket.getInputStream)
+ val len = incoming.readInt()
+ val response = new Array[Byte](len)
+ incoming.readFully(response)
+ socket.close()
+ response
+ }
+
+ @After
+ def cleanup() {
+ server.shutdown()
+ }
+
+ @Test
+ def simpleRequest() {
+ val response = new String(sendRequest(0, "hello".getBytes))
+
+ }
+
+ @Test(expected=classOf[EOFException])
+ def tooBigRequestIsRejected() {
+ val tooManyBytes = new Array[Byte](server.maxRequestSize + 1)
+ new Random().nextBytes(tooManyBytes)
+ sendRequest(0, tooManyBytes)
+ }
+
+}
\ No newline at end of file
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1159837&r1=1159836&r2=1159837&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala Sat Aug 20 04:08:06 2011
@@ -88,6 +88,7 @@ object TestUtils {
/**
* Create a kafka server instance with appropriate test settings
+ * USING THIS IS A SIGN YOU ARE NOT WRITING A REAL UNIT TEST
* @param config The configuration of the server
*/
def createServer(config: KafkaConfig): KafkaServer = {