You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/15 23:04:00 UTC
svn commit: r1244755 - in /incubator/kafka/trunk/core/src/main/scala/kafka:
log/Log.scala server/KafkaRequestHandlers.scala
Author: junrao
Date: Wed Feb 15 22:04:00 2012
New Revision: 1244755
URL: http://svn.apache.org/viewvc?rev=1244755&view=rev
Log:
add JMX on broker to track bytes/messages per topic; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-272
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1244755&r1=1244754&r2=1244755&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Wed Feb 15 22:04:00 2012
@@ -25,6 +25,7 @@ import kafka.utils._
import kafka.common._
import kafka.api.OffsetRequest
import java.util._
+import kafka.server.BrokerTopicStat
private[log] object Log {
val FileSuffix = ".kafka"
@@ -207,6 +208,8 @@ private[log] class Log(val dir: File, va
numberOfMessages += 1;
}
+ BrokerTopicStat.getBrokerTopicStat(getTopicName).recordMessagesIn(numberOfMessages)
+ BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
logStats.recordAppendedMessages(numberOfMessages)
// they are valid, insert them in the log
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala?rev=1244755&r1=1244754&r2=1244755&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala Wed Feb 15 22:04:00 2012
@@ -23,8 +23,8 @@ import kafka.network._
import kafka.message._
import kafka.api._
import kafka.common.ErrorMapping
-import kafka.utils.SystemTime
-import kafka.utils.Logging
+import java.util.concurrent.atomic.AtomicLong
+import kafka.utils._
/**
* Logic to handle the various Kafka requests
@@ -69,6 +69,8 @@ private[kafka] class KafkaRequestHandler
logManager.getOrCreateLog(request.topic, partition).append(request.messages)
trace(request.messages.sizeInBytes + " bytes written to logs.")
request.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum)))
+ BrokerTopicStat.getBrokerTopicStat(request.topic).recordBytesIn(request.messages.sizeInBytes)
+ BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(request.messages.sizeInBytes)
}
catch {
case e =>
@@ -100,8 +102,11 @@ private[kafka] class KafkaRequestHandler
try {
trace("Fetching log segment for topic, partition, offset, maxSize = " + fetchRequest)
val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition)
- if (log != null)
+ if (log != null) {
response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize))
+ BrokerTopicStat.getBrokerTopicStat(fetchRequest.topic).recordBytesOut(response.messages.sizeInBytes)
+ BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(response.messages.sizeInBytes)
+ }
else
response = new MessageSetSend()
}
@@ -122,3 +127,48 @@ private[kafka] class KafkaRequestHandler
Some(response)
}
}
+
+trait BrokerTopicStatMBean {
+ def getMessagesIn: Long
+ def getBytesIn: Long
+ def getBytesOut: Long
+}
+
+@threadsafe
+class BrokerTopicStat extends BrokerTopicStatMBean {
+ private val numCumulatedMessagesIn = new AtomicLong(0)
+ private val numCumulatedBytesIn = new AtomicLong(0)
+ private val numCumulatedBytesOut = new AtomicLong(0)
+
+ def getMessagesIn: Long = numCumulatedMessagesIn.get
+
+ def recordMessagesIn(nMessages: Int) = numCumulatedMessagesIn.getAndAdd(nMessages)
+
+ def getBytesIn: Long = numCumulatedBytesIn.get
+
+ def recordBytesIn(nBytes: Long) = numCumulatedBytesIn.getAndAdd(nBytes)
+
+ def getBytesOut: Long = numCumulatedBytesOut.get
+
+ def recordBytesOut(nBytes: Long) = numCumulatedBytesOut.getAndAdd(nBytes)
+}
+
+object BrokerTopicStat extends Logging {
+ private val stats = new Pool[String, BrokerTopicStat]
+ private val allTopicStat = new BrokerTopicStat
+ Utils.registerMBean(allTopicStat, "kafka:type=kafka.BrokerAllTopicStat")
+
+ def getBrokerAllTopicStat(): BrokerTopicStat = allTopicStat
+
+ def getBrokerTopicStat(topic: String): BrokerTopicStat = {
+ var stat = stats.get(topic)
+ if (stat == null) {
+ stat = new BrokerTopicStat
+ if (stats.putIfNotExists(topic, stat) == null)
+ Utils.registerMBean(stat, "kafka:type=kafka.BrokerTopicStat." + topic)
+ else
+ stat = stats.get(topic)
+ }
+ return stat
+ }
+}