You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/10 03:52:29 UTC
git commit: kafka-1353;
report capacity used by request thread pool and network thread pool;
patched by Guozhang Wang; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 47019a849 -> 44ee6b7c9
kafka-1353;report capacity used by request thread pool and network thread pool; patched by Guozhang Wang; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/44ee6b7c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/44ee6b7c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/44ee6b7c
Branch: refs/heads/trunk
Commit: 44ee6b7c9d9da207bebe6d927b38ed7df1388df3
Parents: 47019a8
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Wed Apr 9 18:52:23 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Apr 9 18:52:23 2014 -0700
----------------------------------------------------------------------
.../scala/kafka/network/RequestChannel.scala | 4 +++
.../main/scala/kafka/network/SocketServer.scala | 28 ++++++++++++++----
.../kafka/server/KafkaRequestHandler.scala | 31 ++++++++++++++++----
3 files changed, 53 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/44ee6b7c/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index a6ec970..60b0400 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -156,6 +156,10 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
onResponse(processor)
}
+ /** Get the next request or block until specified time has elapsed */
+ def receiveRequest(timeout: Long): RequestChannel.Request =
+ requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
+
/** Get the next request or block until there is one */
def receiveRequest(): RequestChannel.Request =
requestQueue.take()
http://git-wip-us.apache.org/repos/asf/kafka/blob/44ee6b7c/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index dcfca3f..4976d9c 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -24,7 +24,9 @@ import java.io._
import java.nio.channels._
import kafka.common.KafkaException
+import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
+import com.yammer.metrics.core.Meter
/**
* An NIO socket server. The threading model is
@@ -39,19 +41,24 @@ class SocketServer(val brokerId: Int,
val maxQueuedRequests: Int,
val sendBufferSize: Int,
val recvBufferSize: Int,
- val maxRequestSize: Int = Int.MaxValue) extends Logging {
+ val maxRequestSize: Int = Int.MaxValue) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Socket Server on Broker " + brokerId + "], "
private val time = SystemTime
private val processors = new Array[Processor](numProcessorThreads)
@volatile private var acceptor: Acceptor = null
val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
+ /* a meter to track the average free capacity of the network processors */
+ private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
+
/**
* Start the socket server
*/
def startup() {
for(i <- 0 until numProcessorThreads) {
- processors(i) = new Processor(i, time, maxRequestSize, requestChannel)
+ processors(i) = new Processor(i, time, maxRequestSize, aggregateIdleMeter,
+ newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS),
+ numProcessorThreads, requestChannel)
Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
}
// register the processor threads for notification of responses
@@ -219,9 +226,12 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce
private[kafka] class Processor(val id: Int,
val time: Time,
val maxRequestSize: Int,
+ val aggregateIdleMeter: Meter,
+ val idleMeter: Meter,
+ val totalProcessorThreads: Int,
val requestChannel: RequestChannel) extends AbstractServerThread {
- private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();
+ private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
override def run() {
startupComplete()
@@ -230,9 +240,17 @@ private[kafka] class Processor(val id: Int,
configureNewConnections()
// register any new responses for writing
processNewResponses()
- val startSelectTime = SystemTime.milliseconds
+ val startSelectTime = SystemTime.nanoseconds
val ready = selector.select(300)
- trace("Processor id " + id + " selection time = " + (SystemTime.milliseconds - startSelectTime) + " ms")
+ val idleTime = SystemTime.nanoseconds - startSelectTime
+ idleMeter.mark(idleTime)
+ // We use a single meter for aggregate idle percentage for the thread pool.
+ // Since meter is calculated as total_recorded_value / time_window and
+ // time_window is independent of the number of threads, each recorded idle
+ // time should be discounted by # threads.
+ aggregateIdleMeter.mark(idleTime / totalProcessorThreads)
+
+ trace("Processor id " + id + " selection time = " + idleTime + " ns")
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
http://git-wip-us.apache.org/repos/asf/kafka/blob/44ee6b7c/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 871212b..f11f6e2 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -21,17 +21,34 @@ import kafka.network._
import kafka.utils._
import kafka.metrics.KafkaMetricsGroup
import java.util.concurrent.TimeUnit
+import com.yammer.metrics.core.Meter
/**
* A thread that answers kafka requests.
*/
-class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging {
+class KafkaRequestHandler(id: Int,
+ brokerId: Int,
+ val aggregateIdleMeter: Meter,
+ val totalHandlerThreads: Int,
+ val requestChannel: RequestChannel,
+ apis: KafkaApis) extends Runnable with Logging {
this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
- def run() {
+ def run() {
while(true) {
try {
- val req = requestChannel.receiveRequest()
+ var req : RequestChannel.Request = null
+ while (req == null) {
+ // We use a single meter for aggregate idle percentage for the thread pool.
+ // Since meter is calculated as total_recorded_value / time_window and
+ // time_window is independent of the number of threads, each recorded idle
+ // time should be discounted by # threads.
+ val startSelectTime = SystemTime.nanoseconds
+ req = requestChannel.receiveRequest(300)
+ val idleTime = SystemTime.nanoseconds - startSelectTime
+ aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
+ }
+
if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
@@ -52,12 +69,16 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
- numThreads: Int) extends Logging {
+ numThreads: Int) extends Logging with KafkaMetricsGroup {
+
+ /* a meter to track the average free capacity of the request handlers */
+ private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
+
this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
val threads = new Array[Thread](numThreads)
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
- runnables(i) = new KafkaRequestHandler(i, brokerId, requestChannel, apis)
+ runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
}