You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/09/16 05:34:56 UTC

git commit: KAFKA-1597 New metrics: ResponseQueueSize and BeingSentResponses; reviewed by Neha Narkhede and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk d677701b9 -> cf0f5750b


KAFKA-1597 New metrics: ResponseQueueSize and BeingSentResponses; reviewed by Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cf0f5750
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cf0f5750
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cf0f5750

Branch: refs/heads/trunk
Commit: cf0f5750b39e675cf9a9c6d6394a665366db0f58
Parents: d677701
Author: Alexis Midon <mi...@apache.org>
Authored: Mon Sep 15 20:34:14 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Sep 15 20:34:44 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/network/RequestChannel.scala    |  4 ++++
 .../main/scala/kafka/network/SocketServer.scala | 20 ++++++++++++++++++--
 2 files changed, 22 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cf0f5750/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 60b0400..4560d8f 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -121,6 +121,10 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
     }
   )
 
+  newGauge("ResponseQueueSize", new Gauge[Int]{
+    def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()}
+  })
+
   for(i <- 0 until numProcessors) {
     newGauge(
       "Processor-" + i + "-ResponseQueueSize",

http://git-wip-us.apache.org/repos/asf/kafka/blob/cf0f5750/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index ff0c4ba..d678990 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -17,6 +17,7 @@
 
 package kafka.network
 
+import java.util
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import java.net._
@@ -28,7 +29,7 @@ import scala.collection._
 import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
-import com.yammer.metrics.core.Meter
+import com.yammer.metrics.core.{Gauge, Meter}
 
 /**
  * An NIO socket server. The threading model is
@@ -71,6 +72,11 @@ class SocketServer(val brokerId: Int,
                                     quotas)
       Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
     }
+
+    newGauge("ResponsesBeingSent", new Gauge[Int] {
+      def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
+    })
+
     // register the processor threads for notification of responses
     requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
    
@@ -170,7 +176,17 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
       close(key)
     }
   }
-  
+
+  def countInterestOps(ops: Int): Int = {
+    var count = 0
+    val it = this.selector.keys().iterator()
+    while (it.hasNext) {
+      if ((it.next().interestOps() & ops) != 0) {
+        count += 1
+      }
+    }
+    count
+  }
 }
 
 /**