You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/10 03:52:29 UTC

git commit: kafka-1353; report capacity used by request thread pool and network thread pool; patched by Guozhang Wang; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 47019a849 -> 44ee6b7c9


kafka-1353;report capacity used by request thread pool and network thread pool; patched by Guozhang Wang; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/44ee6b7c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/44ee6b7c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/44ee6b7c

Branch: refs/heads/trunk
Commit: 44ee6b7c9d9da207bebe6d927b38ed7df1388df3
Parents: 47019a8
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Wed Apr 9 18:52:23 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Apr 9 18:52:23 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/network/RequestChannel.scala    |  4 +++
 .../main/scala/kafka/network/SocketServer.scala | 28 ++++++++++++++----
 .../kafka/server/KafkaRequestHandler.scala      | 31 ++++++++++++++++----
 3 files changed, 53 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/44ee6b7c/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index a6ec970..60b0400 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -156,6 +156,10 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
       onResponse(processor)
   }
 
+  /** Get the next request or block until specified time has elapsed */
+  def receiveRequest(timeout: Long): RequestChannel.Request =
+    requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
+
   /** Get the next request or block until there is one */
   def receiveRequest(): RequestChannel.Request =
     requestQueue.take()

http://git-wip-us.apache.org/repos/asf/kafka/blob/44ee6b7c/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index dcfca3f..4976d9c 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -24,7 +24,9 @@ import java.io._
 import java.nio.channels._
 
 import kafka.common.KafkaException
+import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
+import com.yammer.metrics.core.Meter
 
 /**
  * An NIO socket server. The threading model is
@@ -39,19 +41,24 @@ class SocketServer(val brokerId: Int,
                    val maxQueuedRequests: Int,
                    val sendBufferSize: Int,
                    val recvBufferSize: Int,
-                   val maxRequestSize: Int = Int.MaxValue) extends Logging {
+                   val maxRequestSize: Int = Int.MaxValue) extends Logging with KafkaMetricsGroup {
   this.logIdent = "[Socket Server on Broker " + brokerId + "], "
   private val time = SystemTime
   private val processors = new Array[Processor](numProcessorThreads)
   @volatile private var acceptor: Acceptor = null
   val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
 
+  /* a meter to track the average free capacity of the network processors */
+  private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
+
   /**
    * Start the socket server
    */
   def startup() {
     for(i <- 0 until numProcessorThreads) {
-      processors(i) = new Processor(i, time, maxRequestSize, requestChannel)
+      processors(i) = new Processor(i, time, maxRequestSize, aggregateIdleMeter,
+        newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS),
+        numProcessorThreads, requestChannel)
       Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
     }
     // register the processor threads for notification of responses
@@ -219,9 +226,12 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce
 private[kafka] class Processor(val id: Int,
                                val time: Time,
                                val maxRequestSize: Int,
+                               val aggregateIdleMeter: Meter,
+                               val idleMeter: Meter,
+                               val totalProcessorThreads: Int,
                                val requestChannel: RequestChannel) extends AbstractServerThread {
   
-  private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();
+  private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
 
   override def run() {
     startupComplete()
@@ -230,9 +240,17 @@ private[kafka] class Processor(val id: Int,
       configureNewConnections()
       // register any new responses for writing
       processNewResponses()
-      val startSelectTime = SystemTime.milliseconds
+      val startSelectTime = SystemTime.nanoseconds
       val ready = selector.select(300)
-      trace("Processor id " + id + " selection time = " + (SystemTime.milliseconds - startSelectTime) + " ms")
+      val idleTime = SystemTime.nanoseconds - startSelectTime
+      idleMeter.mark(idleTime)
+      // We use a single meter for aggregate idle percentage for the thread pool.
+      // Since meter is calculated as total_recorded_value / time_window and
+      // time_window is independent of the number of threads, each recorded idle
+      // time should be discounted by # threads.
+      aggregateIdleMeter.mark(idleTime / totalProcessorThreads)
+
+      trace("Processor id " + id + " selection time = " + idleTime + " ns")
       if(ready > 0) {
         val keys = selector.selectedKeys()
         val iter = keys.iterator()

http://git-wip-us.apache.org/repos/asf/kafka/blob/44ee6b7c/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 871212b..f11f6e2 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -21,17 +21,34 @@ import kafka.network._
 import kafka.utils._
 import kafka.metrics.KafkaMetricsGroup
 import java.util.concurrent.TimeUnit
+import com.yammer.metrics.core.Meter
 
 /**
  * A thread that answers kafka requests.
  */
-class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging {
+class KafkaRequestHandler(id: Int,
+                          brokerId: Int,
+                          val aggregateIdleMeter: Meter,
+                          val totalHandlerThreads: Int,
+                          val requestChannel: RequestChannel,
+                          apis: KafkaApis) extends Runnable with Logging {
   this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
 
-  def run() { 
+  def run() {
     while(true) {
       try {
-        val req = requestChannel.receiveRequest()
+        var req : RequestChannel.Request = null
+        while (req == null) {
+          // We use a single meter for aggregate idle percentage for the thread pool.
+          // Since meter is calculated as total_recorded_value / time_window and
+          // time_window is independent of the number of threads, each recorded idle
+          // time should be discounted by # threads.
+          val startSelectTime = SystemTime.nanoseconds
+          req = requestChannel.receiveRequest(300)
+          val idleTime = SystemTime.nanoseconds - startSelectTime
+          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
+        }
+
         if(req eq RequestChannel.AllDone) {
           debug("Kafka request handler %d on broker %d received shut down command".format(
             id, brokerId))
@@ -52,12 +69,16 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha
 class KafkaRequestHandlerPool(val brokerId: Int,
                               val requestChannel: RequestChannel,
                               val apis: KafkaApis,
-                              numThreads: Int) extends Logging {
+                              numThreads: Int) extends Logging with KafkaMetricsGroup {
+
+  /* a meter to track the average free capacity of the request handlers */
+  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
+
   this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
   val threads = new Array[Thread](numThreads)
   val runnables = new Array[KafkaRequestHandler](numThreads)
   for(i <- 0 until numThreads) {
-    runnables(i) = new KafkaRequestHandler(i, brokerId, requestChannel, apis)
+    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
     threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
     threads(i).start()
   }