You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/01/13 01:03:03 UTC
svn commit: r1230845 - in /incubator/kafka/branches/0.8: bin/
core/src/main/scala/kafka/ core/src/main/scala/kafka/consumer/
core/src/main/scala/kafka/log/ core/src/main/scala/kafka/message/
core/src/main/scala/kafka/network/ core/src/main/scala/kafka/...
Author: nehanarkhede
Date: Fri Jan 13 00:03:02 2012
New Revision: 1230845
URL: http://svn.apache.org/viewvc?rev=1230845&view=rev
Log:
KAFKA-202 Make request processing in kafka asynchronous; patched by jaykreps; reviewed by junrao and nehanarkhede
Modified:
incubator/kafka/branches/0.8/bin/kafka-consumer-perf-test.sh
incubator/kafka/branches/0.8/bin/kafka-producer-perf-test.sh
incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
Modified: incubator/kafka/branches/0.8/bin/kafka-consumer-perf-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-consumer-perf-test.sh?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-consumer-perf-test.sh (original)
+++ incubator/kafka/branches/0.8/bin/kafka-consumer-perf-test.sh Fri Jan 13 00:03:02 2012
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-$(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance $@
+$(dirname $0)/kafka-run-class.sh kafka.perf.ConsumerPerformance $@
Modified: incubator/kafka/branches/0.8/bin/kafka-producer-perf-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-producer-perf-test.sh?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-producer-perf-test.sh (original)
+++ incubator/kafka/branches/0.8/bin/kafka-producer-perf-test.sh Fri Jan 13 00:03:02 2012
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-$(dirname $0)/kafka-run-class.sh kafka.tools.ProducerPerformance $@
+$(dirname $0)/kafka-run-class.sh kafka.perf.ProducerPerformance $@
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala Fri Jan 13 00:03:02 2012
@@ -52,7 +52,6 @@ object Kafka extends Logging {
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
kafkaServerStartble.shutdown
- kafkaServerStartble.awaitShutdown
}
});
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Fri Jan 13 00:03:02 2012
@@ -54,8 +54,8 @@ class SimpleConsumer(val host: String,
private def close(channel: SocketChannel) = {
debug("Disconnecting from " + channel.socket.getRemoteSocketAddress())
- Utils.swallow(logger.warn, channel.close())
- Utils.swallow(logger.warn, channel.socket.close())
+ swallow(channel.close())
+ swallow(channel.socket.close())
}
def close() {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Fri Jan 13 00:03:02 2012
@@ -188,6 +188,7 @@ private[log] class Log(val dir: File, va
* Close this log
*/
def close() {
+ debug("Closing log " + name)
lock synchronized {
for(seg <- segments.view)
seg.messageSet.close()
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Fri Jan 13 00:03:02 2012
@@ -222,7 +222,7 @@ private[kafka] class LogManager(val conf
var total = 0
for(segment <- segments) {
info("Deleting log segment " + segment.file.getName() + " from " + log.name)
- Utils.swallow(logger.warn, segment.messageSet.close())
+ swallow(segment.messageSet.close())
if(!segment.file.delete()) {
warn("Delete failed.")
} else {
@@ -283,6 +283,7 @@ private[kafka] class LogManager(val conf
* Close all the logs
*/
def close() {
+ info("Closing log manager")
logFlusherScheduler.shutdown()
val iter = getLogIterator
while(iter.hasNext)
@@ -334,7 +335,7 @@ private[kafka] class LogManager(val conf
e match {
case _: IOException =>
fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e)
- Runtime.getRuntime.halt(1)
+ System.exit(1)
case _ =>
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Fri Jan 13 00:03:02 2012
@@ -127,7 +127,6 @@ class ByteBufferMessageSet(private val b
override def makeNext(): MessageAndOffset = {
val isInnerDone = innerDone()
- debug("makeNext() in deepIterator: innerDone = " + isInnerDone)
isInnerDone match {
case true => makeNextOuter
case false => {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Fri Jan 13 00:03:02 2012
@@ -26,37 +26,39 @@ import java.nio.channels._
import kafka.utils._
-import org.apache.log4j.Logger
-import kafka.api.RequestKeys
-
/**
- * An NIO socket server. The thread model is
+ * An NIO socket server. The threading model is
* 1 Acceptor thread that handles new connections
- * N Processor threads that each have their own selectors and handle all requests from their connections synchronously
+ * N Processor threads that each have their own selector and read requests from sockets
+ * M Handler threads that handle requests and produce responses back to the processor threads for writing.
*/
class SocketServer(val port: Int,
- val numProcessorThreads: Int,
- monitoringPeriodSecs: Int,
- private val handlerFactory: Handler.HandlerMapping,
- val sendBufferSize: Int,
- val receiveBufferSize: Int,
- val maxRequestSize: Int = Int.MaxValue) {
+ val numProcessorThreads: Int,
+ val monitoringPeriodSecs: Int,
+ val maxQueuedRequests: Int,
+ val maxRequestSize: Int = Int.MaxValue) extends Logging {
private val time = SystemTime
private val processors = new Array[Processor](numProcessorThreads)
- private var acceptor: Acceptor = new Acceptor(port, processors, sendBufferSize, receiveBufferSize)
+ private var acceptor: Acceptor = new Acceptor(port, processors)
val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs)
+ val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
/**
* Start the socket server
*/
def startup() {
for(i <- 0 until numProcessorThreads) {
- processors(i) = new Processor(handlerFactory, time, stats, maxRequestSize)
+ processors(i) = new Processor(i, time, maxRequestSize, requestChannel, stats)
Utils.newThread("kafka-processor-" + i, processors(i), false).start()
}
+ // register the processor threads for notification of responses
+ requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
+
+ // start accepting connections
Utils.newThread("kafka-acceptor", acceptor, false).start()
acceptor.awaitStartup
+ info("Kafka socket server started")
}
/**
@@ -66,6 +68,7 @@ class SocketServer(val port: Int,
acceptor.shutdown
for(processor <- processors)
processor.shutdown
+ info("Shut down socket server.")
}
}
@@ -73,10 +76,9 @@ class SocketServer(val port: Int,
/**
* A base class with some helper variables and methods
*/
-private[kafka] abstract class AbstractServerThread extends Runnable {
+private[kafka] abstract class AbstractServerThread extends Runnable with Logging {
protected val selector = Selector.open();
- protected val logger = Logger.getLogger(getClass())
private val startupLatch = new CountDownLatch(1)
private val shutdownLatch = new CountDownLatch(1)
private val alive = new AtomicBoolean(false)
@@ -86,7 +88,7 @@ private[kafka] abstract class AbstractSe
*/
def shutdown(): Unit = {
alive.set(false)
- selector.wakeup
+ selector.wakeup()
shutdownLatch.await
}
@@ -112,13 +114,18 @@ private[kafka] abstract class AbstractSe
* Is the server still running?
*/
protected def isRunning = alive.get
-
+
+ /**
+ * Wakeup the thread for selection.
+ */
+ def wakeup() = selector.wakeup()
+
}
/**
* Thread that accepts and configures new connections. There is only need for one of these
*/
-private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor], val sendBufferSize: Int, val receiveBufferSize: Int) extends AbstractServerThread {
+private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends AbstractServerThread {
/**
* Accept loop that checks for new connection attempts
@@ -128,9 +135,8 @@ private[kafka] class Acceptor(val port:
serverChannel.configureBlocking(false)
serverChannel.socket.bind(new InetSocketAddress(port))
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
- logger.info("Awaiting connections on port " + port)
+ info("Awaiting connections on port " + port)
startupComplete()
-
var currentProcessor = 0
while(isRunning) {
val ready = selector.select(500)
@@ -142,7 +148,6 @@ private[kafka] class Acceptor(val port:
try {
key = iter.next
iter.remove()
-
if(key.isAcceptable)
accept(key, processors(currentProcessor))
else
@@ -151,14 +156,14 @@ private[kafka] class Acceptor(val port:
// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
- case e: Throwable => logger.error("Error in acceptor", e)
+ case e: Throwable => error("Error in acceptor", e)
}
}
}
}
- logger.debug("Closing server socket and selector.")
- Utils.swallow(logger.error, serverChannel.close())
- Utils.swallow(logger.error, selector.close())
+ debug("Closing server socket and selector.")
+ swallowError(serverChannel.close())
+ swallowError(selector.close())
shutdownComplete()
}
@@ -166,42 +171,35 @@ private[kafka] class Acceptor(val port:
* Accept a new connection
*/
def accept(key: SelectionKey, processor: Processor) {
- val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
- serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize)
-
- val socketChannel = serverSocketChannel.accept()
+ val socketChannel = key.channel().asInstanceOf[ServerSocketChannel].accept()
+ debug("Accepted connection from " + socketChannel.socket.getInetAddress() + " on " + socketChannel.socket.getLocalSocketAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
- socketChannel.socket().setSendBufferSize(sendBufferSize)
-
- if (logger.isDebugEnabled()) {
- logger.debug("sendBufferSize: [" + socketChannel.socket().getSendBufferSize()
- + "] receiveBufferSize: [" + socketChannel.socket().getReceiveBufferSize() + "]")
- }
-
processor.accept(socketChannel)
}
+
}
/**
* Thread that processes all requests from a single connection. There are N of these running in parallel
* each of which has its own selectors
*/
-private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
- val time: Time,
- val stats: SocketServerStats,
- val maxRequestSize: Int) extends AbstractServerThread {
-
+private[kafka] class Processor(val id: Int,
+ val time: Time,
+ val maxRequestSize: Int,
+ val requestChannel: RequestChannel,
+ val stats: SocketServerStats) extends AbstractServerThread {
+
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();
- private val requestLogger = Logger.getLogger("kafka.request.logger")
override def run() {
startupComplete()
while(isRunning) {
// setup any new connections that have been queued up
configureNewConnections()
-
- val ready = selector.select(500)
+ // register any new responses for writing
+ processNewResponses()
+ val ready = selector.select(300)
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
@@ -210,7 +208,6 @@ private[kafka] class Processor(val handl
try {
key = iter.next
iter.remove()
-
if(key.isReadable)
read(key)
else if(key.isWritable)
@@ -221,33 +218,49 @@ private[kafka] class Processor(val handl
throw new IllegalStateException("Unrecognized key state for processor thread.")
} catch {
case e: EOFException => {
- logger.info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
+ info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
+ close(key)
+ } case e: InvalidRequestException => {
+ info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
close(key)
- }
- case e: InvalidRequestException => {
- logger.info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
- close(key)
} case e: Throwable => {
- logger.error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
+ error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
close(key)
}
}
}
}
}
- logger.debug("Closing selector.")
- Utils.swallow(logger.info, selector.close())
+ debug("Closing selector.")
+ swallowError(selector.close())
shutdownComplete()
}
+ private def processNewResponses() {
+ var curr = requestChannel.receiveResponse(id)
+ while(curr != null) {
+ trace("Socket server received response to send: " + curr)
+ val key = curr.requestKey.asInstanceOf[SelectionKey]
+ try {
+ key.interestOps(SelectionKey.OP_WRITE)
+ key.attach(curr.response)
+ curr = requestChannel.receiveResponse(id)
+ } catch {
+ case e: CancelledKeyException => {
+ debug("Ignoring response for closed socket.")
+ close(key)
+ }
+ }
+ }
+ }
+
private def close(key: SelectionKey) {
val channel = key.channel.asInstanceOf[SocketChannel]
- if(logger.isDebugEnabled)
- logger.debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
- Utils.swallow(logger.info, channel.socket().close())
- Utils.swallow(logger.info, channel.close())
+ debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
+ swallowError(channel.socket().close())
+ swallowError(channel.close())
key.attach(null)
- Utils.swallow(logger.info, key.cancel())
+ swallowError(key.cancel())
}
/**
@@ -255,7 +268,7 @@ private[kafka] class Processor(val handl
*/
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
- selector.wakeup()
+ wakeup()
}
/**
@@ -264,41 +277,11 @@ private[kafka] class Processor(val handl
private def configureNewConnections() {
while(newConnections.size() > 0) {
val channel = newConnections.poll()
- if(logger.isDebugEnabled())
- logger.debug("Listening to new connection from " + channel.socket.getRemoteSocketAddress)
+ debug("Listening to new connection from " + channel.socket.getRemoteSocketAddress)
channel.register(selector, SelectionKey.OP_READ)
}
}
- /**
- * Handle a completed request producing an optional response
- */
- private def handle(key: SelectionKey, request: Receive): Option[Send] = {
- val requestTypeId = request.buffer.getShort()
- if(requestLogger.isTraceEnabled) {
- requestTypeId match {
- case RequestKeys.Produce =>
- requestLogger.trace("Handling produce request from " + channelFor(key).socket.getRemoteSocketAddress())
- case RequestKeys.Fetch =>
- requestLogger.trace("Handling fetch request from " + channelFor(key).socket.getRemoteSocketAddress())
- case RequestKeys.MultiFetch =>
- requestLogger.trace("Handling multi-fetch request from " + channelFor(key).socket.getRemoteSocketAddress())
- case RequestKeys.MultiProduce =>
- requestLogger.trace("Handling multi-produce request from " + channelFor(key).socket.getRemoteSocketAddress())
- case RequestKeys.Offsets =>
- requestLogger.trace("Handling offset request from " + channelFor(key).socket.getRemoteSocketAddress())
- case _ => throw new InvalidRequestException("No mapping found for handler id " + requestTypeId)
- }
- }
- val handler = handlerMapping(requestTypeId, request)
- if(handler == null)
- throw new InvalidRequestException("No handler found for request")
- val start = time.nanoseconds
- val maybeSend = handler(request)
- stats.recordRequest(requestTypeId, time.nanoseconds - start)
- maybeSend
- }
-
/*
* Process reads from ready sockets
*/
@@ -311,23 +294,18 @@ private[kafka] class Processor(val handl
}
val read = request.readFrom(socketChannel)
stats.recordBytesRead(read)
- if(logger.isTraceEnabled)
- logger.trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress())
+ trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress())
if(read < 0) {
close(key)
- return
} else if(request.complete) {
- val maybeResponse = handle(key, request)
+ val req = RequestChannel.Request(processor = id, requestKey = key, request = request, start = time.nanoseconds)
+ requestChannel.sendRequest(req)
+ trace("Recieved request, sending for processing by handler: " + req)
key.attach(null)
- // if there is a response, send it, otherwise do nothing
- if(maybeResponse.isDefined) {
- key.attach(maybeResponse.getOrElse(None))
- key.interestOps(SelectionKey.OP_WRITE)
- }
} else {
// more reading to be done
key.interestOps(SelectionKey.OP_READ)
- selector.wakeup()
+ wakeup()
}
}
@@ -335,18 +313,19 @@ private[kafka] class Processor(val handl
* Process writes to ready sockets
*/
def write(key: SelectionKey) {
- val response = key.attachment().asInstanceOf[Send]
val socketChannel = channelFor(key)
+ var response = key.attachment().asInstanceOf[Send]
+ if(response == null)
+ throw new IllegalStateException("Registered for write interest but no response attached to key.")
val written = response.writeTo(socketChannel)
stats.recordBytesWritten(written)
- if(logger.isTraceEnabled)
- logger.trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress())
+ trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress())
if(response.complete) {
key.attach(null)
key.interestOps(SelectionKey.OP_READ)
} else {
key.interestOps(SelectionKey.OP_WRITE)
- selector.wakeup()
+ wakeup()
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala Fri Jan 13 00:03:02 2012
@@ -43,7 +43,7 @@ private[network] trait Transmission exte
/**
* A transmission that is being received from a channel
*/
-private[kafka] trait Receive extends Transmission {
+trait Receive extends Transmission {
def buffer: ByteBuffer
@@ -63,7 +63,7 @@ private[kafka] trait Receive extends Tra
/**
* A transmission that is being sent out to the channel
*/
-private[kafka] trait Send extends Transmission {
+trait Send extends Transmission {
def writeTo(channel: GatheringByteChannel): Int
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Fri Jan 13 00:03:02 2012
@@ -145,8 +145,8 @@ class SyncProducer(val config: SyncProdu
try {
if(channel != null) {
info("Disconnecting from " + config.host + ":" + config.port)
- Utils.swallow(logger.warn, channel.close())
- Utils.swallow(logger.warn, channel.socket.close())
+ swallow(channel.close())
+ swallow(channel.socket.close())
channel = null
}
} catch {
@@ -217,7 +217,7 @@ class SyncProducerStats extends SyncProd
object SyncProducerStats extends Logging {
private val kafkaProducerstatsMBeanName = "kafka:type=kafka.KafkaProducerStats"
private val stats = new SyncProducerStats
- Utils.swallow(logger.warn, Utils.registerMBean(stats, kafkaProducerstatsMBeanName))
+ swallow(Utils.registerMBean(stats, kafkaProducerstatsMBeanName))
def recordProduceRequest(requestMs: Long) = stats.recordProduceRequest(requestMs)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala Fri Jan 13 00:03:02 2012
@@ -52,7 +52,7 @@ private[kafka] class AsyncProducer[T](co
if(eventHandler != null) eventHandler else new DefaultEventHandler[T](new ProducerConfig(config.props), cbkHandler),
cbkHandler, config.queueTime, config.batchSize, AsyncProducer.Shutdown)
sendThread.setDaemon(false)
- Utils.swallow(logger.warn, Utils.registerMBean(
+ swallowWarn(Utils.registerMBean(
new AsyncProducerQueueSizeStats[T](queue), AsyncProducer.ProducerQueueSizeMBeanName + "-" + asyncProducerID))
def this(config: AsyncProducerConfig) {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Fri Jan 13 00:03:02 2012
@@ -43,9 +43,15 @@ class KafkaConfig(props: Properties) ext
/* the maximum number of bytes in a socket request */
val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
- /* the number of worker threads that the server uses for handling all client requests*/
- val numThreads = Utils.getIntInRange(props, "num.threads", Runtime.getRuntime().availableProcessors, (1, Int.MaxValue))
+ /* the number of network threads that the server uses for handling network requests */
+ val numNetworkThreads = Utils.getIntInRange(props, "network.threads", 3, (1, Int.MaxValue))
+
+ /* the number of io threads that the server uses for carrying out network requests */
+ val numIoThreads = Utils.getIntInRange(props, "io.threads", 8, (1, Int.MaxValue))
+ /* the number of queued requests allowed before blocking the network threads */
+ val numQueuedRequests = Utils.getIntInRange(props, "max.queued.requests", 500, (1, Int.MaxValue))
+
/* the interval in which to measure performance statistics */
val monitoringPeriodSecs = Utils.getIntInRange(props, "monitoring.period.secs", 600, (1, Int.MaxValue))
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Fri Jan 13 00:03:02 2012
@@ -18,28 +18,27 @@
package kafka.server
import scala.reflect.BeanProperty
-import kafka.log.LogManager
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.atomic.AtomicBoolean
-import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler, Logging}
-import kafka.network.{SocketServerStats, SocketServer}
+import java.util.concurrent._
+import java.util.concurrent.atomic._
import java.io.File
+import org.apache.log4j.Logger
+import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler, Logging}
+import kafka.network.{SocketServerStats, SocketServer, RequestChannel}
+import kafka.log.LogManager
/**
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
* to start up and shutdown a single Kafka node.
*/
class KafkaServer(val config: KafkaConfig) extends Logging {
- val CLEAN_SHUTDOWN_FILE = ".kafka_cleanshutdown"
- private val isShuttingDown = new AtomicBoolean(false)
-
+
+ val CleanShutdownFile = ".kafka_cleanshutdown"
+ private val isShuttingDown = new AtomicBoolean(false)
private val shutdownLatch = new CountDownLatch(1)
private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
-
var socketServer: SocketServer = null
-
+ var requestHandlerPool: KafkaRequestHandlerPool = null
val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
-
private var logManager: LogManager = null
/**
@@ -49,7 +48,7 @@ class KafkaServer(val config: KafkaConfi
def startup() {
info("Starting Kafka server...")
var needRecovery = true
- val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
+ val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
if (cleanShutDownFile.exists) {
needRecovery = false
cleanShutDownFile.delete
@@ -60,24 +59,24 @@ class KafkaServer(val config: KafkaConfi
1000L * 60 * config.logCleanupIntervalMinutes,
1000L * 60 * 60 * config.logRetentionHours,
needRecovery)
-
- val handlers = new KafkaRequestHandlers(logManager)
+
socketServer = new SocketServer(config.port,
- config.numThreads,
+ config.numNetworkThreads,
config.monitoringPeriodSecs,
- handlers.handlerFor,
- config.socketSendBuffer,
- config.socketReceiveBuffer,
+ config.numQueuedRequests,
config.maxSocketRequestSize)
Utils.registerMBean(socketServer.stats, statsMBeanName)
- socketServer.startup()
+ requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, new KafkaApis(logManager).handle, config.numIoThreads)
+ socketServer.startup
+
Mx4jLoader.maybeLoad
+
/**
* Registers this broker in ZK. After this, consumers can connect to broker.
* So this should happen after socket server start.
*/
- logManager.startup()
- info("Kafka server started.")
+ logManager.startup
+ info("Server started.")
}
/**
@@ -91,13 +90,15 @@ class KafkaServer(val config: KafkaConfi
scheduler.shutdown()
if (socketServer != null)
socketServer.shutdown()
+ if(requestHandlerPool != null)
+ requestHandlerPool.shutdown()
Utils.unregisterMBean(statsMBeanName)
- if (logManager != null)
+ if(logManager != null)
logManager.close()
- val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
+ val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
+ debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
cleanShutDownFile.createNewFile
-
shutdownLatch.countDown()
info("Kafka server shut down completed")
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala Fri Jan 13 00:03:02 2012
@@ -64,13 +64,13 @@ class KafkaServerStartable(val serverCon
catch {
case e =>
fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
- Runtime.getRuntime.halt(1)
+ System.exit(1)
}
}
- def awaitShutdown() {
+ def awaitShutdown() =
server.awaitShutdown
- }
+
}
class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala Fri Jan 13 00:03:02 2012
@@ -35,6 +35,9 @@ trait Logging {
if (logger.isTraceEnabled())
logger.trace(msg,e)
}
+ def swallowTrace(action: => Unit) {
+ Utils.swallow(logger.trace, action)
+ }
def debug(msg: => String): Unit = {
if (logger.isDebugEnabled())
@@ -48,6 +51,9 @@ trait Logging {
if (logger.isDebugEnabled())
logger.debug(msg,e)
}
+ def swallowDebug(action: => Unit) {
+ Utils.swallow(logger.debug, action)
+ }
def info(msg: => String): Unit = {
if (logger.isInfoEnabled())
@@ -61,6 +67,9 @@ trait Logging {
if (logger.isInfoEnabled())
logger.info(msg,e)
}
+ def swallowInfo(action: => Unit) {
+ Utils.swallow(logger.info, action)
+ }
def warn(msg: => String): Unit = {
logger.warn(msg)
@@ -70,7 +79,11 @@ trait Logging {
}
def warn(msg: => String, e: => Throwable) = {
logger.warn(msg,e)
- }
+ }
+ def swallowWarn(action: => Unit) {
+ Utils.swallow(logger.warn, action)
+ }
+ def swallow(action: => Unit) = swallowWarn(action)
def error(msg: => String):Unit = {
logger.error(msg)
@@ -81,6 +94,9 @@ trait Logging {
def error(msg: => String, e: => Throwable) = {
logger.error(msg,e)
}
+ def swallowError(action: => Unit) {
+ Utils.swallow(logger.error, action)
+ }
def fatal(msg: => String): Unit = {
logger.fatal(msg)
@@ -91,4 +107,5 @@ trait Logging {
def fatal(msg: => String, e: => Throwable) = {
logger.fatal(msg,e)
}
+
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Fri Jan 13 00:03:02 2012
@@ -1,3 +1,4 @@
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -20,7 +21,7 @@ package kafka.consumer
import junit.framework.Assert._
import kafka.zk.ZooKeeperTestHarness
import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
+import kafka.server._
import scala.collection._
import kafka.utils.{Utils, Logging}
import kafka.utils.{TestZKUtils, TestUtils}
@@ -51,7 +52,7 @@ class ZookeeperConsumerConnectorTest ext
val nMessages = 2
def testBasic() {
- val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+ val requestHandlerLogger = Logger.getLogger(classOf[KafkaApis])
requestHandlerLogger.setLevel(Level.FATAL)
var actualMessages: List[Message] = Nil
@@ -121,7 +122,7 @@ class ZookeeperConsumerConnectorTest ext
}
def testCompression() {
- val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+ val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)
println("Sending messages for 1st consumer")
@@ -174,7 +175,7 @@ class ZookeeperConsumerConnectorTest ext
}
def testCompressionSetConsumption() {
- val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+ val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)
var actualMessages: List[Message] = Nil
@@ -208,7 +209,7 @@ class ZookeeperConsumerConnectorTest ext
}
def testConsumerDecoder() {
- val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+ val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)
val sentMessages = sendMessages(nMessages, "batch1", NoCompressionCodec).
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala Fri Jan 13 00:03:02 2012
@@ -23,7 +23,7 @@ import java.nio.channels.ClosedByInterru
import java.util.concurrent.atomic.AtomicInteger
import kafka.utils.{ZKGroupTopicDirs, Logging}
import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
-import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import kafka.server._
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import kafka.utils.{TestUtils, TestZKUtils}
@@ -41,7 +41,7 @@ class AutoOffsetResetTest extends JUnit3
val largeOffset = 10000
val smallOffset = -1
- val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+ val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
override def setUp() {
super.setUp()
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Fri Jan 13 00:03:02 2012
@@ -21,7 +21,7 @@ import scala.collection._
import junit.framework.Assert._
import kafka.common.OffsetOutOfRangeException
import kafka.api.{ProducerRequest, FetchRequest}
-import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import kafka.utils.{TestUtils, Utils}
@@ -39,7 +39,7 @@ class LazyInitProducerTest extends JUnit
}
val configs = List(config)
var servers: List[KafkaServer] = null
- val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+ val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
override def setUp() {
super.setUp
@@ -73,16 +73,14 @@ class LazyInitProducerTest extends JUnit
TestUtils.checkEquals(sent.iterator, fetched.iterator)
// send an invalid offset
- var exceptionThrown = false
try {
val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
fetchedWithError.iterator
+ fail("Expected an OffsetOutOfRangeException exception to be thrown")
}
catch {
- case e: OffsetOutOfRangeException => exceptionThrown = true
- case e2 => throw e2
+ case e: OffsetOutOfRangeException =>
}
- assertTrue(exceptionThrown)
}
def testProduceAndMultiFetch() {
@@ -113,17 +111,15 @@ class LazyInitProducerTest extends JUnit
for(topic <- topics)
fetches += new FetchRequest(topic, 0, -1, 10000)
- var exceptionThrown = false
- try {
- val responses = consumer.multifetch(fetches: _*)
- for(resp <- responses)
+ val responses = consumer.multifetch(fetches: _*)
+ for(resp <- responses) {
+ try {
resp.iterator
+ fail("Expected an OffsetOutOfRangeException exception to be thrown")
+ } catch {
+ case e: OffsetOutOfRangeException =>
+ }
}
- catch {
- case e: OffsetOutOfRangeException => exceptionThrown = true
- case e2 => throw e2
- }
- assertTrue(exceptionThrown)
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala Fri Jan 13 00:03:02 2012
@@ -45,7 +45,7 @@ class LogCorruptionTest extends JUnit3Su
val partition = 0
def testMessageSizeTooLarge() {
- val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+ val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
val fetcherLogger = Logger.getLogger(classOf[kafka.consumer.FetcherRunnable])
requestHandlerLogger.setLevel(Level.FATAL)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Fri Jan 13 00:03:02 2012
@@ -21,7 +21,7 @@ import scala.collection._
import junit.framework.Assert._
import kafka.api.{ProducerRequest, FetchRequest}
import kafka.common.{OffsetOutOfRangeException, InvalidPartitionException}
-import kafka.server.{KafkaRequestHandlers, KafkaConfig}
+import kafka.server.{KafkaRequestHandler, KafkaConfig}
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import java.util.Properties
@@ -40,9 +40,10 @@ class PrimitiveApiTest extends JUnit3Sui
val props = TestUtils.createBrokerConfig(0, port)
val config = new KafkaConfig(props) {
override val enableZookeeper = false
+ override val flushInterval = 1
}
val configs = List(config)
- val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+ val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
def testDefaultEncoderProducerAndFetch() {
val topic = "test-topic"
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Fri Jan 13 00:03:02 2012
@@ -20,7 +20,7 @@ package kafka.javaapi.consumer
import junit.framework.Assert._
import kafka.zk.ZooKeeperTestHarness
import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
+import kafka.server._
import kafka.utils.{Utils, Logging}
import kafka.utils.{TestZKUtils, TestUtils}
import org.scalatest.junit.JUnit3Suite
@@ -28,7 +28,7 @@ import scala.collection.JavaConversions.
import kafka.javaapi.message.ByteBufferMessageSet
import kafka.consumer.{ConsumerConfig, KafkaMessageStream}
import org.apache.log4j.{Level, Logger}
-import kafka.message.{NoCompressionCodec, CompressionCodec, Message}
+import kafka.message._
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
@@ -49,7 +49,7 @@ class ZookeeperConsumerConnectorTest ext
val nMessages = 2
def testBasic() {
- val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+ val requestHandlerLogger = Logger.getLogger(classOf[KafkaApis])
requestHandlerLogger.setLevel(Level.FATAL)
var actualMessages: List[Message] = Nil
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala Fri Jan 13 00:03:02 2012
@@ -20,7 +20,7 @@ package kafka.javaapi.integration
import scala.collection._
import kafka.api.FetchRequest
import kafka.common.{InvalidPartitionException, OffsetOutOfRangeException}
-import kafka.server.{KafkaRequestHandlers, KafkaConfig}
+import kafka.server.{KafkaRequestHandler, KafkaConfig}
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import kafka.javaapi.message.ByteBufferMessageSet
@@ -39,7 +39,7 @@ class PrimitiveApiTest extends JUnit3Sui
override val enableZookeeper = false
}
val configs = List(config)
- val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+ val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
def testProduceAndFetch() {
// send some messages
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala Fri Jan 13 00:03:02 2012
@@ -19,14 +19,14 @@ package kafka.javaapi.producer
import java.util.Properties
import org.apache.log4j.{Logger, Level}
-import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
import kafka.zk.EmbeddedZookeeper
import kafka.utils.{TestZKUtils, TestUtils}
import org.junit.{After, Before, Test}
import junit.framework.Assert
import collection.mutable.HashMap
import org.easymock.EasyMock
-import kafka.utils.Utils
+import kafka.utils._
import java.util.concurrent.ConcurrentHashMap
import kafka.cluster.Partition
import kafka.common.{UnavailableProducerException, InvalidPartitionException, InvalidConfigException}
@@ -42,6 +42,7 @@ import kafka.api.FetchRequest
import kafka.message.{NoCompressionCodec, Message}
class ProducerTest extends JUnitSuite {
+
private val topic = "test-topic"
private val brokerId1 = 0
private val brokerId2 = 1
@@ -54,7 +55,7 @@ class ProducerTest extends JUnitSuite {
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null
private var zkServer:EmbeddedZookeeper = null
- private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+ private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
@Before
def setUp() {
@@ -96,7 +97,7 @@ class ProducerTest extends JUnitSuite {
// temporarily set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.FATAL)
- Thread.sleep(500)
+ Thread.sleep(300)
}
@After
@@ -107,7 +108,7 @@ class ProducerTest extends JUnitSuite {
server2.shutdown
Utils.rm(server1.config.logDir)
Utils.rm(server2.config.logDir)
- Thread.sleep(500)
+ Thread.sleep(300)
zkServer.shutdown
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Fri Jan 13 00:03:02 2012
@@ -47,7 +47,8 @@ class LogManagerTest extends JUnitSuite
@After
def tearDown() {
- logManager.close()
+ if(logManager != null)
+ logManager.close()
Utils.rm(logDir)
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Fri Jan 13 00:03:02 2012
@@ -31,37 +31,39 @@ import org.apache.log4j._
class SocketServerTest extends JUnitSuite {
- Logger.getLogger("kafka").setLevel(Level.INFO)
-
- def echo(receive: Receive): Option[Send] = {
- val id = receive.buffer.getShort
- Some(new BoundedByteBufferSend(receive.buffer.slice))
- }
-
- val server = new SocketServer(port = TestUtils.choosePort,
- numProcessorThreads = 1,
- monitoringPeriodSecs = 30,
- handlerFactory = (requestId: Short, receive: Receive) => echo,
- sendBufferSize = 300000,
- receiveBufferSize = 300000,
- maxRequestSize = 50)
+ val server: SocketServer = new SocketServer(port = TestUtils.choosePort,
+ numProcessorThreads = 1,
+ monitoringPeriodSecs = 30,
+ maxQueuedRequests = 50,
+ maxRequestSize = 50)
server.startup()
- def sendRequest(id: Short, request: Array[Byte]): Array[Byte] = {
- val socket = new Socket("localhost", server.port)
+ def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {
val outgoing = new DataOutputStream(socket.getOutputStream)
outgoing.writeInt(request.length + 2)
outgoing.writeShort(id)
outgoing.write(request)
outgoing.flush()
+ }
+
+ def receiveResponse(socket: Socket): Array[Byte] = {
val incoming = new DataInputStream(socket.getInputStream)
val len = incoming.readInt()
val response = new Array[Byte](len)
incoming.readFully(response)
- socket.close()
response
}
+ /* A simple request handler that just echos back the response */
+ def processRequest(channel: RequestChannel) {
+ val request = channel.receiveRequest
+ val id = request.request.buffer.getShort
+ val send = new BoundedByteBufferSend(request.request.buffer.slice)
+ channel.sendResponse(new RequestChannel.Response(request.processor, request.requestKey, send, request.start, 15))
+ }
+
+ def connect() = new Socket("localhost", server.port)
+
@After
def cleanup() {
server.shutdown()
@@ -69,15 +71,20 @@ class SocketServerTest extends JUnitSuit
@Test
def simpleRequest() {
- val response = new String(sendRequest(0, "hello".getBytes))
-
+ val socket = connect()
+ sendRequest(socket, 0, "hello".getBytes)
+ processRequest(server.requestChannel)
+ val response = new String(receiveResponse(socket))
+ assertEquals("hello", response)
}
@Test(expected=classOf[IOException])
def tooBigRequestIsRejected() {
val tooManyBytes = new Array[Byte](server.maxRequestSize + 1)
new Random().nextBytes(tooManyBytes)
- sendRequest(0, tooManyBytes)
+ val socket = connect()
+ sendRequest(socket, 0, tooManyBytes)
+ receiveResponse(socket)
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Fri Jan 13 00:03:02 2012
@@ -20,7 +20,7 @@ package kafka.producer
import async.{AsyncProducerConfig, AsyncProducer}
import java.util.Properties
import org.apache.log4j.{Logger, Level}
-import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
import kafka.zk.EmbeddedZookeeper
import org.junit.{After, Before, Test}
import junit.framework.Assert
@@ -49,7 +49,7 @@ class ProducerTest extends JUnitSuite {
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null
private var zkServer:EmbeddedZookeeper = null
- private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+ private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
@Before
def setUp() {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Fri Jan 13 00:03:02 2012
@@ -32,7 +32,6 @@ import kafka.message.{NoCompressionCodec
class SyncProducerTest extends JUnitSuite {
private var messageBytes = new Array[Byte](2);
private var server: KafkaServer = null
- val simpleProducerLogger = Logger.getLogger(classOf[SyncProducer])
@Before
def setUp() {
@@ -44,7 +43,8 @@ class SyncProducerTest extends JUnitSuit
@After
def tearDown() {
- server.shutdown
+ if(server != null)
+ server.shutdown
}
@Test
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Fri Jan 13 00:03:02 2012
@@ -66,7 +66,7 @@ class ServerShutdownTest extends JUnitSu
Thread.sleep(200)
// do a clean shutdown
server.shutdown()
- val cleanShutDownFile = new File(new File(config.logDir), server.CLEAN_SHUTDOWN_FILE)
+ val cleanShutDownFile = new File(new File(config.logDir), server.CleanShutdownFile)
assertTrue(cleanShutDownFile.exists)
}