You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/15 23:04:00 UTC

svn commit: r1244755 - in /incubator/kafka/trunk/core/src/main/scala/kafka: log/Log.scala server/KafkaRequestHandlers.scala

Author: junrao
Date: Wed Feb 15 22:04:00 2012
New Revision: 1244755

URL: http://svn.apache.org/viewvc?rev=1244755&view=rev
Log:
add JMX on broker to track bytes/messages per topic; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-272

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1244755&r1=1244754&r2=1244755&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Wed Feb 15 22:04:00 2012
@@ -25,6 +25,7 @@ import kafka.utils._
 import kafka.common._
 import kafka.api.OffsetRequest
 import java.util._
+import kafka.server.BrokerTopicStat
 
 private[log] object Log {
   val FileSuffix = ".kafka"
@@ -207,6 +208,8 @@ private[log] class Log(val dir: File, va
       numberOfMessages += 1;
     }
 
+    BrokerTopicStat.getBrokerTopicStat(getTopicName).recordMessagesIn(numberOfMessages)
+    BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
     logStats.recordAppendedMessages(numberOfMessages)
     
     // they are valid, insert them in the log

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala?rev=1244755&r1=1244754&r2=1244755&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala Wed Feb 15 22:04:00 2012
@@ -23,8 +23,8 @@ import kafka.network._
 import kafka.message._
 import kafka.api._
 import kafka.common.ErrorMapping
-import kafka.utils.SystemTime
-import kafka.utils.Logging
+import java.util.concurrent.atomic.AtomicLong
+import kafka.utils._
 
 /**
  * Logic to handle the various Kafka requests
@@ -69,6 +69,8 @@ private[kafka] class KafkaRequestHandler
       logManager.getOrCreateLog(request.topic, partition).append(request.messages)
       trace(request.messages.sizeInBytes + " bytes written to logs.")
       request.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum)))
+      BrokerTopicStat.getBrokerTopicStat(request.topic).recordBytesIn(request.messages.sizeInBytes)
+      BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(request.messages.sizeInBytes)
     }
     catch {
       case e =>
@@ -100,8 +102,11 @@ private[kafka] class KafkaRequestHandler
     try {
       trace("Fetching log segment for topic, partition, offset, maxSize = " + fetchRequest)
       val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition)
-      if (log != null)
+      if (log != null) {
         response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize))
+        BrokerTopicStat.getBrokerTopicStat(fetchRequest.topic).recordBytesOut(response.messages.sizeInBytes)
+        BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(response.messages.sizeInBytes)
+      }
       else
         response = new MessageSetSend()
     }
@@ -122,3 +127,48 @@ private[kafka] class KafkaRequestHandler
     Some(response)
   }
 }
+
+trait BrokerTopicStatMBean {
+  def getMessagesIn: Long
+  def getBytesIn: Long
+  def getBytesOut: Long
+}
+
+@threadsafe
+class BrokerTopicStat extends BrokerTopicStatMBean {
+  private val numCumulatedMessagesIn = new AtomicLong(0)
+  private val numCumulatedBytesIn = new AtomicLong(0)
+  private val numCumulatedBytesOut = new AtomicLong(0)
+
+  def getMessagesIn: Long = numCumulatedMessagesIn.get
+
+  def recordMessagesIn(nMessages: Int) = numCumulatedMessagesIn.getAndAdd(nMessages)
+
+  def getBytesIn: Long = numCumulatedBytesIn.get
+
+  def recordBytesIn(nBytes: Long) = numCumulatedBytesIn.getAndAdd(nBytes)
+
+  def getBytesOut: Long = numCumulatedBytesOut.get
+
+  def recordBytesOut(nBytes: Long) = numCumulatedBytesOut.getAndAdd(nBytes)
+}
+
+object BrokerTopicStat extends Logging {
+  private val stats = new Pool[String, BrokerTopicStat]
+  private val allTopicStat = new BrokerTopicStat
+  Utils.registerMBean(allTopicStat, "kafka:type=kafka.BrokerAllTopicStat")
+
+  def getBrokerAllTopicStat(): BrokerTopicStat = allTopicStat
+
+  def getBrokerTopicStat(topic: String): BrokerTopicStat = {
+    var stat = stats.get(topic)
+    if (stat == null) {
+      stat = new BrokerTopicStat
+      if (stats.putIfNotExists(topic, stat) == null)
+        Utils.registerMBean(stat, "kafka:type=kafka.BrokerTopicStat." + topic)
+      else
+        stat = stats.get(topic)
+    }
+    return stat
+  }
+}