You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/09/16 05:34:56 UTC
git commit: KAFKA-1597 New metrics: ResponseQueueSize and
BeingSentResponses; reviewed by Neha Narkhede and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk d677701b9 -> cf0f5750b
KAFKA-1597 New metrics: ResponseQueueSize and BeingSentResponses; reviewed by Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cf0f5750
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cf0f5750
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cf0f5750
Branch: refs/heads/trunk
Commit: cf0f5750b39e675cf9a9c6d6394a665366db0f58
Parents: d677701
Author: Alexis Midon <mi...@apache.org>
Authored: Mon Sep 15 20:34:14 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Sep 15 20:34:44 2014 -0700
----------------------------------------------------------------------
.../scala/kafka/network/RequestChannel.scala | 4 ++++
.../main/scala/kafka/network/SocketServer.scala | 20 ++++++++++++++++++--
2 files changed, 22 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/cf0f5750/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 60b0400..4560d8f 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -121,6 +121,10 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
}
)
+ newGauge("ResponseQueueSize", new Gauge[Int]{
+ def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()}
+ })
+
for(i <- 0 until numProcessors) {
newGauge(
"Processor-" + i + "-ResponseQueueSize",
http://git-wip-us.apache.org/repos/asf/kafka/blob/cf0f5750/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index ff0c4ba..d678990 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -17,6 +17,7 @@
package kafka.network
+import java.util
import java.util.concurrent._
import java.util.concurrent.atomic._
import java.net._
@@ -28,7 +29,7 @@ import scala.collection._
import kafka.common.KafkaException
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
-import com.yammer.metrics.core.Meter
+import com.yammer.metrics.core.{Gauge, Meter}
/**
* An NIO socket server. The threading model is
@@ -71,6 +72,11 @@ class SocketServer(val brokerId: Int,
quotas)
Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
}
+
+ newGauge("ResponsesBeingSent", new Gauge[Int] {
+ def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
+ })
+
// register the processor threads for notification of responses
requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
@@ -170,7 +176,17 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
close(key)
}
}
-
+
+ def countInterestOps(ops: Int): Int = {
+ var count = 0
+ val it = this.selector.keys().iterator()
+ while (it.hasNext) {
+ if ((it.next().interestOps() & ops) != 0) {
+ count += 1
+ }
+ }
+ count
+ }
}
/**