You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/01/13 01:03:03 UTC

svn commit: r1230845 - in /incubator/kafka/branches/0.8: bin/ core/src/main/scala/kafka/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/log/ core/src/main/scala/kafka/message/ core/src/main/scala/kafka/network/ core/src/main/scala/kafka/...

Author: nehanarkhede
Date: Fri Jan 13 00:03:02 2012
New Revision: 1230845

URL: http://svn.apache.org/viewvc?rev=1230845&view=rev
Log:
KAFKA-202 Make request processing in kafka asynchronous; patched by jaykreps; reviewed by junrao and nehanarkhede

Modified:
    incubator/kafka/branches/0.8/bin/kafka-consumer-perf-test.sh
    incubator/kafka/branches/0.8/bin/kafka-producer-perf-test.sh
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala

Modified: incubator/kafka/branches/0.8/bin/kafka-consumer-perf-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-consumer-perf-test.sh?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-consumer-perf-test.sh (original)
+++ incubator/kafka/branches/0.8/bin/kafka-consumer-perf-test.sh Fri Jan 13 00:03:02 2012
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-$(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance $@
+$(dirname $0)/kafka-run-class.sh kafka.perf.ConsumerPerformance $@

Modified: incubator/kafka/branches/0.8/bin/kafka-producer-perf-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-producer-perf-test.sh?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-producer-perf-test.sh (original)
+++ incubator/kafka/branches/0.8/bin/kafka-producer-perf-test.sh Fri Jan 13 00:03:02 2012
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-$(dirname $0)/kafka-run-class.sh kafka.tools.ProducerPerformance $@
+$(dirname $0)/kafka-run-class.sh kafka.perf.ProducerPerformance $@

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala Fri Jan 13 00:03:02 2012
@@ -52,7 +52,6 @@ object Kafka extends Logging {
       Runtime.getRuntime().addShutdownHook(new Thread() {
         override def run() = {
           kafkaServerStartble.shutdown
-          kafkaServerStartble.awaitShutdown
         }
       });
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Fri Jan 13 00:03:02 2012
@@ -54,8 +54,8 @@ class SimpleConsumer(val host: String,
 
   private def close(channel: SocketChannel) = {
     debug("Disconnecting from " + channel.socket.getRemoteSocketAddress())
-    Utils.swallow(logger.warn, channel.close())
-    Utils.swallow(logger.warn, channel.socket.close())
+    swallow(channel.close())
+    swallow(channel.socket.close())
   }
 
   def close() {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Fri Jan 13 00:03:02 2012
@@ -188,6 +188,7 @@ private[log] class Log(val dir: File, va
    * Close this log
    */
   def close() {
+    debug("Closing log " + name)
     lock synchronized {
       for(seg <- segments.view)
         seg.messageSet.close()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Fri Jan 13 00:03:02 2012
@@ -222,7 +222,7 @@ private[kafka] class LogManager(val conf
     var total = 0
     for(segment <- segments) {
       info("Deleting log segment " + segment.file.getName() + " from " + log.name)
-      Utils.swallow(logger.warn, segment.messageSet.close())
+      swallow(segment.messageSet.close())
       if(!segment.file.delete()) {
         warn("Delete failed.")
       } else {
@@ -283,6 +283,7 @@ private[kafka] class LogManager(val conf
    * Close all the logs
    */
   def close() {
+    info("Closing log manager")
     logFlusherScheduler.shutdown()
     val iter = getLogIterator
     while(iter.hasNext)
@@ -334,7 +335,7 @@ private[kafka] class LogManager(val conf
           e match {
             case _: IOException =>
               fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e)
-              Runtime.getRuntime.halt(1)
+              System.exit(1)
             case _ =>
           }
       }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Fri Jan 13 00:03:02 2012
@@ -127,7 +127,6 @@ class ByteBufferMessageSet(private val b
 
       override def makeNext(): MessageAndOffset = {
         val isInnerDone = innerDone()
-        debug("makeNext() in deepIterator: innerDone = " + isInnerDone)
         isInnerDone match {
           case true => makeNextOuter
           case false => {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Fri Jan 13 00:03:02 2012
@@ -26,37 +26,39 @@ import java.nio.channels._
 
 import kafka.utils._
 
-import org.apache.log4j.Logger
-import kafka.api.RequestKeys
-
 /**
- * An NIO socket server. The thread model is
+ * An NIO socket server. The threading model is
  *   1 Acceptor thread that handles new connections
- *   N Processor threads that each have their own selectors and handle all requests from their connections synchronously
+ *   N Processor threads that each have their own selector and read requests from sockets
+ *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
  */
 class SocketServer(val port: Int,
-                   val numProcessorThreads: Int,
-                   monitoringPeriodSecs: Int,
-                   private val handlerFactory: Handler.HandlerMapping,
-                   val sendBufferSize: Int,
-                   val receiveBufferSize: Int,
-                   val maxRequestSize: Int = Int.MaxValue) {
+                   val numProcessorThreads: Int, 
+                   val monitoringPeriodSecs: Int,
+                   val maxQueuedRequests: Int,
+                   val maxRequestSize: Int = Int.MaxValue) extends Logging {
 
   private val time = SystemTime
   private val processors = new Array[Processor](numProcessorThreads)
-  private var acceptor: Acceptor = new Acceptor(port, processors, sendBufferSize, receiveBufferSize)
+  private var acceptor: Acceptor = new Acceptor(port, processors)
   val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs)
+  val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
 
   /**
    * Start the socket server
    */
   def startup() {
     for(i <- 0 until numProcessorThreads) {
-      processors(i) = new Processor(handlerFactory, time, stats, maxRequestSize)
+      processors(i) = new Processor(i, time, maxRequestSize, requestChannel, stats)
       Utils.newThread("kafka-processor-" + i, processors(i), false).start()
     }
+    // register the processor threads for notification of responses
+    requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
+   
+    // start accepting connections
     Utils.newThread("kafka-acceptor", acceptor, false).start()
     acceptor.awaitStartup
+    info("Kafka socket server started")
   }
 
   /**
@@ -66,6 +68,7 @@ class SocketServer(val port: Int,
     acceptor.shutdown
     for(processor <- processors)
       processor.shutdown
+    info("Shut down socket server.")
   }
 
 }
@@ -73,10 +76,9 @@ class SocketServer(val port: Int,
 /**
  * A base class with some helper variables and methods
  */
-private[kafka] abstract class AbstractServerThread extends Runnable {
+private[kafka] abstract class AbstractServerThread extends Runnable with Logging {
 
   protected val selector = Selector.open();
-  protected val logger = Logger.getLogger(getClass())
   private val startupLatch = new CountDownLatch(1)
   private val shutdownLatch = new CountDownLatch(1)
   private val alive = new AtomicBoolean(false)
@@ -86,7 +88,7 @@ private[kafka] abstract class AbstractSe
    */
   def shutdown(): Unit = {
     alive.set(false)
-    selector.wakeup
+    selector.wakeup()
     shutdownLatch.await
   }
 
@@ -112,13 +114,18 @@ private[kafka] abstract class AbstractSe
    * Is the server still running?
    */
   protected def isRunning = alive.get
-
+  
+  /**
+   * Wakeup the thread for selection.
+   */
+  def wakeup() = selector.wakeup()
+  
 }
 
 /**
  * Thread that accepts and configures new connections. There is only need for one of these
  */
-private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor], val sendBufferSize: Int, val receiveBufferSize: Int) extends AbstractServerThread {
+private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends AbstractServerThread {
 
   /**
    * Accept loop that checks for new connection attempts
@@ -128,9 +135,8 @@ private[kafka] class Acceptor(val port: 
     serverChannel.configureBlocking(false)
     serverChannel.socket.bind(new InetSocketAddress(port))
     serverChannel.register(selector, SelectionKey.OP_ACCEPT);
-    logger.info("Awaiting connections on port " + port)
+    info("Awaiting connections on port " + port)
     startupComplete()
-
     var currentProcessor = 0
     while(isRunning) {
       val ready = selector.select(500)
@@ -142,7 +148,6 @@ private[kafka] class Acceptor(val port: 
           try {
             key = iter.next
             iter.remove()
-
             if(key.isAcceptable)
                 accept(key, processors(currentProcessor))
               else
@@ -151,14 +156,14 @@ private[kafka] class Acceptor(val port: 
               // round robin to the next processor thread
               currentProcessor = (currentProcessor + 1) % processors.length
           } catch {
-            case e: Throwable => logger.error("Error in acceptor", e)
+            case e: Throwable => error("Error in acceptor", e)
           }
         }
       }
     }
-    logger.debug("Closing server socket and selector.")
-    Utils.swallow(logger.error, serverChannel.close())
-    Utils.swallow(logger.error, selector.close())
+    debug("Closing server socket and selector.")
+    swallowError(serverChannel.close())
+    swallowError(selector.close())
     shutdownComplete()
   }
 
@@ -166,42 +171,35 @@ private[kafka] class Acceptor(val port: 
    * Accept a new connection
    */
   def accept(key: SelectionKey, processor: Processor) {
-    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
-    serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize)
-    
-    val socketChannel = serverSocketChannel.accept()
+    val socketChannel = key.channel().asInstanceOf[ServerSocketChannel].accept()
+    debug("Accepted connection from " + socketChannel.socket.getInetAddress() + " on " + socketChannel.socket.getLocalSocketAddress)
     socketChannel.configureBlocking(false)
     socketChannel.socket().setTcpNoDelay(true)
-    socketChannel.socket().setSendBufferSize(sendBufferSize)
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("sendBufferSize: [" + socketChannel.socket().getSendBufferSize() 
-          + "] receiveBufferSize: [" + socketChannel.socket().getReceiveBufferSize() + "]")
-    }
-
     processor.accept(socketChannel)
   }
+
 }
 
 /**
  * Thread that processes all requests from a single connection. There are N of these running in parallel
  * each of which has its own selectors
  */
-private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
-                               val time: Time,
-                               val stats: SocketServerStats,
-                               val maxRequestSize: Int) extends AbstractServerThread {
-
+private[kafka] class Processor(val id: Int,
+                               val time: Time, 
+                               val maxRequestSize: Int,
+                               val requestChannel: RequestChannel,
+                               val stats: SocketServerStats) extends AbstractServerThread {
+  
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();
-  private val requestLogger = Logger.getLogger("kafka.request.logger")
 
   override def run() {
     startupComplete()
     while(isRunning) {
       // setup any new connections that have been queued up
       configureNewConnections()
-
-      val ready = selector.select(500)
+      // register any new responses for writing
+      processNewResponses()
+      val ready = selector.select(300)
       if(ready > 0) {
         val keys = selector.selectedKeys()
         val iter = keys.iterator()
@@ -210,7 +208,6 @@ private[kafka] class Processor(val handl
           try {
             key = iter.next
             iter.remove()
-
             if(key.isReadable)
               read(key)
             else if(key.isWritable)
@@ -221,33 +218,49 @@ private[kafka] class Processor(val handl
               throw new IllegalStateException("Unrecognized key state for processor thread.")
           } catch {
             case e: EOFException => {
-              logger.info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
+              info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
+              close(key)
+            } case e: InvalidRequestException => {
+              info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
               close(key)
-        }
-        case e: InvalidRequestException => {
-          logger.info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
-          close(key)
             } case e: Throwable => {
-              logger.error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
+              error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
               close(key)
             }
           }
         }
       }
     }
-    logger.debug("Closing selector.")
-    Utils.swallow(logger.info, selector.close())
+    debug("Closing selector.")
+    swallowError(selector.close())
     shutdownComplete()
   }
 
+  private def processNewResponses() {
+    var curr = requestChannel.receiveResponse(id)
+    while(curr != null) {
+      trace("Socket server received response to send: " + curr)
+      val key = curr.requestKey.asInstanceOf[SelectionKey]
+      try {
+        key.interestOps(SelectionKey.OP_WRITE)
+        key.attach(curr.response)
+        curr = requestChannel.receiveResponse(id)
+      } catch {
+        case e: CancelledKeyException => {
+          debug("Ignoring response for closed socket.")
+          close(key)
+        }
+      }
+    }
+  }
+  
   private def close(key: SelectionKey) {
     val channel = key.channel.asInstanceOf[SocketChannel]
-    if(logger.isDebugEnabled)
-      logger.debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
-    Utils.swallow(logger.info, channel.socket().close())
-    Utils.swallow(logger.info, channel.close())
+    debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
+    swallowError(channel.socket().close())
+    swallowError(channel.close())
     key.attach(null)
-    Utils.swallow(logger.info, key.cancel())
+    swallowError(key.cancel())
   }
 
   /**
@@ -255,7 +268,7 @@ private[kafka] class Processor(val handl
    */
   def accept(socketChannel: SocketChannel) {
     newConnections.add(socketChannel)
-    selector.wakeup()
+    wakeup()
   }
 
   /**
@@ -264,41 +277,11 @@ private[kafka] class Processor(val handl
   private def configureNewConnections() {
     while(newConnections.size() > 0) {
       val channel = newConnections.poll()
-      if(logger.isDebugEnabled())
-        logger.debug("Listening to new connection from " + channel.socket.getRemoteSocketAddress)
+      debug("Listening to new connection from " + channel.socket.getRemoteSocketAddress)
       channel.register(selector, SelectionKey.OP_READ)
     }
   }
 
-  /**
-   * Handle a completed request producing an optional response
-   */
-  private def handle(key: SelectionKey, request: Receive): Option[Send] = {
-    val requestTypeId = request.buffer.getShort()
-    if(requestLogger.isTraceEnabled) {
-      requestTypeId match {
-        case RequestKeys.Produce =>
-          requestLogger.trace("Handling produce request from " + channelFor(key).socket.getRemoteSocketAddress())
-        case RequestKeys.Fetch =>
-          requestLogger.trace("Handling fetch request from " + channelFor(key).socket.getRemoteSocketAddress())
-        case RequestKeys.MultiFetch =>
-          requestLogger.trace("Handling multi-fetch request from " + channelFor(key).socket.getRemoteSocketAddress())
-        case RequestKeys.MultiProduce =>
-          requestLogger.trace("Handling multi-produce request from " + channelFor(key).socket.getRemoteSocketAddress())
-        case RequestKeys.Offsets =>
-          requestLogger.trace("Handling offset request from " + channelFor(key).socket.getRemoteSocketAddress())
-        case _ => throw new InvalidRequestException("No mapping found for handler id " + requestTypeId)
-      }
-    }
-    val handler = handlerMapping(requestTypeId, request)
-    if(handler == null)
-      throw new InvalidRequestException("No handler found for request")
-    val start = time.nanoseconds
-    val maybeSend = handler(request)
-    stats.recordRequest(requestTypeId, time.nanoseconds - start)
-    maybeSend
-  }
-
   /*
    * Process reads from ready sockets
    */
@@ -311,23 +294,18 @@ private[kafka] class Processor(val handl
     }
     val read = request.readFrom(socketChannel)
     stats.recordBytesRead(read)
-    if(logger.isTraceEnabled)
-      logger.trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress())
+    trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress())
     if(read < 0) {
       close(key)
-      return
     } else if(request.complete) {
-      val maybeResponse = handle(key, request)
+      val req = RequestChannel.Request(processor = id, requestKey = key, request = request, start = time.nanoseconds)
+      requestChannel.sendRequest(req)
+      trace("Recieved request, sending for processing by handler: " + req)
       key.attach(null)
-      // if there is a response, send it, otherwise do nothing
-      if(maybeResponse.isDefined) {
-        key.attach(maybeResponse.getOrElse(None))
-        key.interestOps(SelectionKey.OP_WRITE)
-      }
     } else {
       // more reading to be done
       key.interestOps(SelectionKey.OP_READ)
-      selector.wakeup()
+      wakeup()
     }
   }
 
@@ -335,18 +313,19 @@ private[kafka] class Processor(val handl
    * Process writes to ready sockets
    */
   def write(key: SelectionKey) {
-    val response = key.attachment().asInstanceOf[Send]
     val socketChannel = channelFor(key)
+    var response = key.attachment().asInstanceOf[Send]
+    if(response == null)
+      throw new IllegalStateException("Registered for write interest but no response attached to key.")
     val written = response.writeTo(socketChannel)
     stats.recordBytesWritten(written)
-    if(logger.isTraceEnabled)
-      logger.trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress())
+    trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress())
     if(response.complete) {
       key.attach(null)
       key.interestOps(SelectionKey.OP_READ)
     } else {
       key.interestOps(SelectionKey.OP_WRITE)
-      selector.wakeup()
+      wakeup()
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala Fri Jan 13 00:03:02 2012
@@ -43,7 +43,7 @@ private[network] trait Transmission exte
 /**
  * A transmission that is being received from a channel
  */
-private[kafka] trait Receive extends Transmission {
+trait Receive extends Transmission {
   
   def buffer: ByteBuffer
   
@@ -63,7 +63,7 @@ private[kafka] trait Receive extends Tra
 /**
  * A transmission that is being sent out to the channel
  */
-private[kafka] trait Send extends Transmission {
+trait Send extends Transmission {
     
   def writeTo(channel: GatheringByteChannel): Int
   

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Fri Jan 13 00:03:02 2012
@@ -145,8 +145,8 @@ class SyncProducer(val config: SyncProdu
     try {
       if(channel != null) {
         info("Disconnecting from " + config.host + ":" + config.port)
-        Utils.swallow(logger.warn, channel.close())
-        Utils.swallow(logger.warn, channel.socket.close())
+        swallow(channel.close())
+        swallow(channel.socket.close())
         channel = null
       }
     } catch {
@@ -217,7 +217,7 @@ class SyncProducerStats extends SyncProd
 object SyncProducerStats extends Logging {
   private val kafkaProducerstatsMBeanName = "kafka:type=kafka.KafkaProducerStats"
   private val stats = new SyncProducerStats
-  Utils.swallow(logger.warn, Utils.registerMBean(stats, kafkaProducerstatsMBeanName))
+  swallow(Utils.registerMBean(stats, kafkaProducerstatsMBeanName))
 
   def recordProduceRequest(requestMs: Long) = stats.recordProduceRequest(requestMs)
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala Fri Jan 13 00:03:02 2012
@@ -52,7 +52,7 @@ private[kafka] class AsyncProducer[T](co
     if(eventHandler != null) eventHandler else new DefaultEventHandler[T](new ProducerConfig(config.props), cbkHandler),
     cbkHandler, config.queueTime, config.batchSize, AsyncProducer.Shutdown)
   sendThread.setDaemon(false)
-  Utils.swallow(logger.warn, Utils.registerMBean(
+  swallowWarn(Utils.registerMBean(
     new AsyncProducerQueueSizeStats[T](queue), AsyncProducer.ProducerQueueSizeMBeanName + "-" + asyncProducerID))
 
   def this(config: AsyncProducerConfig) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Fri Jan 13 00:03:02 2012
@@ -43,9 +43,15 @@ class KafkaConfig(props: Properties) ext
   /* the maximum number of bytes in a socket request */
   val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
   
-  /* the number of worker threads that the server uses for handling all client requests*/
-  val numThreads = Utils.getIntInRange(props, "num.threads", Runtime.getRuntime().availableProcessors, (1, Int.MaxValue))
+  /* the number of network threads that the server uses for handling network requests */
+  val numNetworkThreads = Utils.getIntInRange(props, "network.threads", 3, (1, Int.MaxValue))
+
+  /* the number of io threads that the server uses for carrying out network requests */
+  val numIoThreads = Utils.getIntInRange(props, "io.threads", 8, (1, Int.MaxValue))
   
+  /* the number of queued requests allowed before blocking the network threads */
+  val numQueuedRequests = Utils.getIntInRange(props, "max.queued.requests", 500, (1, Int.MaxValue))
+
   /* the interval in which to measure performance statistics */
   val monitoringPeriodSecs = Utils.getIntInRange(props, "monitoring.period.secs", 600, (1, Int.MaxValue))
   

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Fri Jan 13 00:03:02 2012
@@ -18,28 +18,27 @@
 package kafka.server
 
 import scala.reflect.BeanProperty
-import kafka.log.LogManager
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.atomic.AtomicBoolean
-import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler, Logging}
-import kafka.network.{SocketServerStats, SocketServer}
+import java.util.concurrent._
+import java.util.concurrent.atomic._
 import java.io.File
+import org.apache.log4j.Logger
+import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler, Logging}
+import kafka.network.{SocketServerStats, SocketServer, RequestChannel}
+import kafka.log.LogManager
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.
  */
 class KafkaServer(val config: KafkaConfig) extends Logging {
-  val CLEAN_SHUTDOWN_FILE = ".kafka_cleanshutdown"
-  private val isShuttingDown = new AtomicBoolean(false)
-  
+
+  val CleanShutdownFile = ".kafka_cleanshutdown"
+  private val isShuttingDown = new AtomicBoolean(false)  
   private val shutdownLatch = new CountDownLatch(1)
   private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
-  
   var socketServer: SocketServer = null
-  
+  var requestHandlerPool: KafkaRequestHandlerPool = null
   val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
-  
   private var logManager: LogManager = null
 
   /**
@@ -49,7 +48,7 @@ class KafkaServer(val config: KafkaConfi
   def startup() {
     info("Starting Kafka server...")
     var needRecovery = true
-    val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
+    val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
     if (cleanShutDownFile.exists) {
       needRecovery = false
       cleanShutDownFile.delete
@@ -60,24 +59,24 @@ class KafkaServer(val config: KafkaConfi
                                 1000L * 60 * config.logCleanupIntervalMinutes,
                                 1000L * 60 * 60 * config.logRetentionHours,
                                 needRecovery)
-                                                    
-    val handlers = new KafkaRequestHandlers(logManager)
+                                                
     socketServer = new SocketServer(config.port,
-                                    config.numThreads,
+                                    config.numNetworkThreads,
                                     config.monitoringPeriodSecs,
-                                    handlers.handlerFor,
-                                    config.socketSendBuffer,
-                                    config.socketReceiveBuffer,                                    
+                                    config.numQueuedRequests,
                                     config.maxSocketRequestSize)
     Utils.registerMBean(socketServer.stats, statsMBeanName)
-    socketServer.startup()
+    requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, new KafkaApis(logManager).handle, config.numIoThreads)
+    socketServer.startup
+
     Mx4jLoader.maybeLoad
+
     /**
      *  Registers this broker in ZK. After this, consumers can connect to broker.
      *  So this should happen after socket server start.
      */
-    logManager.startup()
-    info("Kafka server started.")
+    logManager.startup
+    info("Server started.")
   }
   
   /**
@@ -91,13 +90,15 @@ class KafkaServer(val config: KafkaConfi
       scheduler.shutdown()
       if (socketServer != null)
         socketServer.shutdown()
+      if(requestHandlerPool != null)
+        requestHandlerPool.shutdown()
       Utils.unregisterMBean(statsMBeanName)
-      if (logManager != null)
+      if(logManager != null)
         logManager.close()
 
-      val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
+      val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
+      debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
       cleanShutDownFile.createNewFile
-
       shutdownLatch.countDown()
       info("Kafka server shut down completed")
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala Fri Jan 13 00:03:02 2012
@@ -64,13 +64,13 @@ class KafkaServerStartable(val serverCon
     catch {
       case e =>
         fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
-        Runtime.getRuntime.halt(1)
+        System.exit(1)
     }
   }
 
-  def awaitShutdown() {
+  def awaitShutdown() = 
     server.awaitShutdown
-  }
+
 }
 
 class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala Fri Jan 13 00:03:02 2012
@@ -35,6 +35,9 @@ trait Logging {
     if (logger.isTraceEnabled())
       logger.trace(msg,e)
   }
+  def swallowTrace(action: => Unit) {
+    Utils.swallow(logger.trace, action)
+  }
 
   def debug(msg: => String): Unit = {
     if (logger.isDebugEnabled())
@@ -48,6 +51,9 @@ trait Logging {
     if (logger.isDebugEnabled())
       logger.debug(msg,e)
   }
+  def swallowDebug(action: => Unit) {
+    Utils.swallow(logger.debug, action)
+  }
 
   def info(msg: => String): Unit = {
     if (logger.isInfoEnabled())
@@ -61,6 +67,9 @@ trait Logging {
     if (logger.isInfoEnabled())
       logger.info(msg,e)
   }
+  def swallowInfo(action: => Unit) {
+    Utils.swallow(logger.info, action)
+  }
 
   def warn(msg: => String): Unit = {
     logger.warn(msg)
@@ -70,7 +79,11 @@ trait Logging {
   }
   def warn(msg: => String, e: => Throwable) = {
     logger.warn(msg,e)
-  }	
+  }
+  def swallowWarn(action: => Unit) {
+    Utils.swallow(logger.warn, action)
+  }
+  def swallow(action: => Unit) = swallowWarn(action)
 
   def error(msg: => String):Unit = {
     logger.error(msg)
@@ -81,6 +94,9 @@ trait Logging {
   def error(msg: => String, e: => Throwable) = {
     logger.error(msg,e)
   }
+  def swallowError(action: => Unit) {
+    Utils.swallow(logger.error, action)
+  }
 
   def fatal(msg: => String): Unit = {
     logger.fatal(msg)
@@ -91,4 +107,5 @@ trait Logging {
   def fatal(msg: => String, e: => Throwable) = {
     logger.fatal(msg,e)
   }
+ 
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Fri Jan 13 00:03:02 2012
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -20,7 +21,7 @@ package kafka.consumer
 import junit.framework.Assert._
 import kafka.zk.ZooKeeperTestHarness
 import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
+import kafka.server._
 import scala.collection._
 import kafka.utils.{Utils, Logging}
 import kafka.utils.{TestZKUtils, TestUtils}
@@ -51,7 +52,7 @@ class ZookeeperConsumerConnectorTest ext
   val nMessages = 2
 
   def testBasic() {
-    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    val requestHandlerLogger = Logger.getLogger(classOf[KafkaApis])
     requestHandlerLogger.setLevel(Level.FATAL)
 
     var actualMessages: List[Message] = Nil
@@ -121,7 +122,7 @@ class ZookeeperConsumerConnectorTest ext
   }
 
   def testCompression() {
-    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
     requestHandlerLogger.setLevel(Level.FATAL)
 
     println("Sending messages for 1st consumer")
@@ -174,7 +175,7 @@ class ZookeeperConsumerConnectorTest ext
   }
 
   def testCompressionSetConsumption() {
-    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
     requestHandlerLogger.setLevel(Level.FATAL)
 
     var actualMessages: List[Message] = Nil
@@ -208,7 +209,7 @@ class ZookeeperConsumerConnectorTest ext
   }
 
   def testConsumerDecoder() {
-    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
     requestHandlerLogger.setLevel(Level.FATAL)
 
     val sentMessages = sendMessages(nMessages, "batch1", NoCompressionCodec).

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala Fri Jan 13 00:03:02 2012
@@ -23,7 +23,7 @@ import java.nio.channels.ClosedByInterru
 import java.util.concurrent.atomic.AtomicInteger
 import kafka.utils.{ZKGroupTopicDirs, Logging}
 import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
-import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import kafka.server._
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.{TestUtils, TestZKUtils}
@@ -41,7 +41,7 @@ class AutoOffsetResetTest extends JUnit3
   val largeOffset = 10000
   val smallOffset = -1
   
-  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+  val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
 
   override def setUp() {
     super.setUp()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Fri Jan 13 00:03:02 2012
@@ -21,7 +21,7 @@ import scala.collection._
 import junit.framework.Assert._
 import kafka.common.OffsetOutOfRangeException
 import kafka.api.{ProducerRequest, FetchRequest}
-import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.{TestUtils, Utils}
@@ -39,7 +39,7 @@ class LazyInitProducerTest extends JUnit
                }
   val configs = List(config)
   var servers: List[KafkaServer] = null
-  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
   override def setUp() {
     super.setUp
@@ -73,16 +73,14 @@ class LazyInitProducerTest extends JUnit
     TestUtils.checkEquals(sent.iterator, fetched.iterator)
 
     // send an invalid offset
-    var exceptionThrown = false
     try {
       val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
       fetchedWithError.iterator
+      fail("Expected an OffsetOutOfRangeException exception to be thrown")
     }
     catch {
-      case e: OffsetOutOfRangeException => exceptionThrown = true
-      case e2 => throw e2
+      case e: OffsetOutOfRangeException => 
     }
-    assertTrue(exceptionThrown)
   }
 
   def testProduceAndMultiFetch() {
@@ -113,17 +111,15 @@ class LazyInitProducerTest extends JUnit
       for(topic <- topics)
         fetches += new FetchRequest(topic, 0, -1, 10000)
 
-      var exceptionThrown = false
-      try {
-        val responses = consumer.multifetch(fetches: _*)
-        for(resp <- responses)
+      val responses = consumer.multifetch(fetches: _*)
+      for(resp <- responses) {
+        try {
           resp.iterator
+          fail("Expected an OffsetOutOfRangeException exception to be thrown")
+        } catch {
+          case e: OffsetOutOfRangeException => 
+        }
       }
-      catch {
-        case e: OffsetOutOfRangeException => exceptionThrown = true
-        case e2 => throw e2
-      }
-      assertTrue(exceptionThrown)
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala Fri Jan 13 00:03:02 2012
@@ -45,7 +45,7 @@ class LogCorruptionTest extends JUnit3Su
   val partition = 0
 
   def testMessageSizeTooLarge() {
-    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
     val fetcherLogger = Logger.getLogger(classOf[kafka.consumer.FetcherRunnable])
 
     requestHandlerLogger.setLevel(Level.FATAL)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Fri Jan 13 00:03:02 2012
@@ -21,7 +21,7 @@ import scala.collection._
 import junit.framework.Assert._
 import kafka.api.{ProducerRequest, FetchRequest}
 import kafka.common.{OffsetOutOfRangeException, InvalidPartitionException}
-import kafka.server.{KafkaRequestHandlers, KafkaConfig}
+import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
 import java.util.Properties
@@ -40,9 +40,10 @@ class PrimitiveApiTest extends JUnit3Sui
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props) {
                  override val enableZookeeper = false
+                 override val flushInterval = 1
                }
   val configs = List(config)
-  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
   def testDefaultEncoderProducerAndFetch() {
     val topic = "test-topic"

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Fri Jan 13 00:03:02 2012
@@ -20,7 +20,7 @@ package kafka.javaapi.consumer
 import junit.framework.Assert._
 import kafka.zk.ZooKeeperTestHarness
 import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
+import kafka.server._
 import kafka.utils.{Utils, Logging}
 import kafka.utils.{TestZKUtils, TestUtils}
 import org.scalatest.junit.JUnit3Suite
@@ -28,7 +28,7 @@ import scala.collection.JavaConversions.
 import kafka.javaapi.message.ByteBufferMessageSet
 import kafka.consumer.{ConsumerConfig, KafkaMessageStream}
 import org.apache.log4j.{Level, Logger}
-import kafka.message.{NoCompressionCodec, CompressionCodec, Message}
+import kafka.message._
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
 
@@ -49,7 +49,7 @@ class ZookeeperConsumerConnectorTest ext
   val nMessages = 2
 
   def testBasic() {
-    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    val requestHandlerLogger = Logger.getLogger(classOf[KafkaApis])
     requestHandlerLogger.setLevel(Level.FATAL)
     var actualMessages: List[Message] = Nil
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala Fri Jan 13 00:03:02 2012
@@ -20,7 +20,7 @@ package kafka.javaapi.integration
 import scala.collection._
 import kafka.api.FetchRequest
 import kafka.common.{InvalidPartitionException, OffsetOutOfRangeException}
-import kafka.server.{KafkaRequestHandlers, KafkaConfig}
+import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
 import kafka.javaapi.message.ByteBufferMessageSet
@@ -39,7 +39,7 @@ class PrimitiveApiTest extends JUnit3Sui
                  override val enableZookeeper = false
                }
   val configs = List(config)
-  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
   def testProduceAndFetch() {
     // send some messages

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala Fri Jan 13 00:03:02 2012
@@ -19,14 +19,14 @@ package kafka.javaapi.producer
 
 import java.util.Properties
 import org.apache.log4j.{Logger, Level}
-import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
 import kafka.zk.EmbeddedZookeeper
 import kafka.utils.{TestZKUtils, TestUtils}
 import org.junit.{After, Before, Test}
 import junit.framework.Assert
 import collection.mutable.HashMap
 import org.easymock.EasyMock
-import kafka.utils.Utils
+import kafka.utils._
 import java.util.concurrent.ConcurrentHashMap
 import kafka.cluster.Partition
 import kafka.common.{UnavailableProducerException, InvalidPartitionException, InvalidConfigException}
@@ -42,6 +42,7 @@ import kafka.api.FetchRequest
 import kafka.message.{NoCompressionCodec, Message}
 
 class ProducerTest extends JUnitSuite {
+  
   private val topic = "test-topic"
   private val brokerId1 = 0
   private val brokerId2 = 1  
@@ -54,7 +55,7 @@ class ProducerTest extends JUnitSuite {
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
   private var zkServer:EmbeddedZookeeper = null
-  private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+  private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
   @Before
   def setUp() {
@@ -96,7 +97,7 @@ class ProducerTest extends JUnitSuite {
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)
 
-    Thread.sleep(500)
+    Thread.sleep(300)
   }
 
   @After
@@ -107,7 +108,7 @@ class ProducerTest extends JUnitSuite {
     server2.shutdown
     Utils.rm(server1.config.logDir)
     Utils.rm(server2.config.logDir)    
-    Thread.sleep(500)
+    Thread.sleep(300)
     zkServer.shutdown
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Fri Jan 13 00:03:02 2012
@@ -47,7 +47,8 @@ class LogManagerTest extends JUnitSuite 
 
   @After
   def tearDown() {
-    logManager.close()
+    if(logManager != null)
+      logManager.close()
     Utils.rm(logDir)
   }
   

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Fri Jan 13 00:03:02 2012
@@ -31,37 +31,39 @@ import org.apache.log4j._
 
 class SocketServerTest extends JUnitSuite {
 
-  Logger.getLogger("kafka").setLevel(Level.INFO)
-
-  def echo(receive: Receive): Option[Send] = {
-    val id = receive.buffer.getShort
-    Some(new BoundedByteBufferSend(receive.buffer.slice))
-  }
-  
-  val server = new SocketServer(port = TestUtils.choosePort, 
-                                numProcessorThreads = 1, 
-                                monitoringPeriodSecs = 30, 
-                                handlerFactory = (requestId: Short, receive: Receive) => echo, 
-                                sendBufferSize = 300000,
-                                receiveBufferSize = 300000,
-                                maxRequestSize = 50)
+  val server: SocketServer = new SocketServer(port = TestUtils.choosePort, 
+                                              numProcessorThreads = 1, 
+                                              monitoringPeriodSecs = 30, 
+                                              maxQueuedRequests = 50,
+                                              maxRequestSize = 50)
   server.startup()
 
-  def sendRequest(id: Short, request: Array[Byte]): Array[Byte] = {
-    val socket = new Socket("localhost", server.port)
+  def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {
     val outgoing = new DataOutputStream(socket.getOutputStream)
     outgoing.writeInt(request.length + 2)
     outgoing.writeShort(id)
     outgoing.write(request)
     outgoing.flush()
+  }
+
+  def receiveResponse(socket: Socket): Array[Byte] = { 
     val incoming = new DataInputStream(socket.getInputStream)
     val len = incoming.readInt()
     val response = new Array[Byte](len)
     incoming.readFully(response)
-    socket.close()
     response
   }
 
+  /* A simple request handler that just echos back the response */
+  def processRequest(channel: RequestChannel) {
+    val request = channel.receiveRequest
+    val id = request.request.buffer.getShort
+    val send = new BoundedByteBufferSend(request.request.buffer.slice)
+    channel.sendResponse(new RequestChannel.Response(request.processor, request.requestKey, send, request.start, 15))
+  }
+
+  def connect() = new Socket("localhost", server.port)
+
   @After
   def cleanup() {
     server.shutdown()
@@ -69,15 +71,20 @@ class SocketServerTest extends JUnitSuit
 
   @Test
   def simpleRequest() {
-    val response = new String(sendRequest(0, "hello".getBytes))
-    
+    val socket = connect()
+    sendRequest(socket, 0, "hello".getBytes)
+    processRequest(server.requestChannel)
+    val response = new String(receiveResponse(socket))
+    assertEquals("hello", response)
   }
 
   @Test(expected=classOf[IOException])
   def tooBigRequestIsRejected() {
     val tooManyBytes = new Array[Byte](server.maxRequestSize + 1)
     new Random().nextBytes(tooManyBytes)
-    sendRequest(0, tooManyBytes)
+    val socket = connect()
+    sendRequest(socket, 0, tooManyBytes)
+    receiveResponse(socket)
   }
 
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Fri Jan 13 00:03:02 2012
@@ -20,7 +20,7 @@ package kafka.producer
 import async.{AsyncProducerConfig, AsyncProducer}
 import java.util.Properties
 import org.apache.log4j.{Logger, Level}
-import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
 import kafka.zk.EmbeddedZookeeper
 import org.junit.{After, Before, Test}
 import junit.framework.Assert
@@ -49,7 +49,7 @@ class ProducerTest extends JUnitSuite {
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
   private var zkServer:EmbeddedZookeeper = null
-  private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+  private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
   @Before
   def setUp() {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Fri Jan 13 00:03:02 2012
@@ -32,7 +32,6 @@ import kafka.message.{NoCompressionCodec
 class SyncProducerTest extends JUnitSuite {
   private var messageBytes =  new Array[Byte](2);
   private var server: KafkaServer = null
-  val simpleProducerLogger = Logger.getLogger(classOf[SyncProducer])
 
   @Before
   def setUp() {
@@ -44,7 +43,8 @@ class SyncProducerTest extends JUnitSuit
 
   @After
   def tearDown() {
-    server.shutdown
+    if(server != null)
+      server.shutdown
   }
 
   @Test

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1230845&r1=1230844&r2=1230845&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Fri Jan 13 00:03:02 2012
@@ -66,7 +66,7 @@ class ServerShutdownTest extends JUnitSu
       Thread.sleep(200)
       // do a clean shutdown
       server.shutdown()
-      val cleanShutDownFile = new File(new File(config.logDir), server.CLEAN_SHUTDOWN_FILE)
+      val cleanShutDownFile = new File(new File(config.logDir), server.CleanShutdownFile)
       assertTrue(cleanShutDownFile.exists)
     }