You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/06/19 02:27:00 UTC

svn commit: r1351544 - in /incubator/kafka/trunk/core/src/main/scala/kafka/consumer: ConsumerIterator.scala ConsumerTopicStat.scala PartitionTopicInfo.scala

Author: junrao
Date: Tue Jun 19 00:27:00 2012
New Revision: 1351544

URL: http://svn.apache.org/viewvc?rev=1351544&view=rev
Log:
add jmx beans in broker to track # bytes in consumer; patched by Jun Rao; reviewed by Joel Koshy; KAFKA-336

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1351544&r1=1351543&r2=1351544&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Tue Jun 19 00:27:00 2012
@@ -47,6 +47,7 @@ class ConsumerIterator[T](private val ch
     val topic = currentTopicInfo.topic
     trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
     ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
+    ConsumerTopicStat.getConsumerAllTopicStat().recordMessagesPerTopic(1)
     item
   }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala?rev=1351544&r1=1351543&r2=1351544&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala Tue Jun 19 00:27:00 2012
@@ -22,19 +22,29 @@ import kafka.utils.{Pool, Utils, threads
 
 trait ConsumerTopicStatMBean {
   def getMessagesPerTopic: Long
+  def getBytesPerTopic: Long
 }
 
 @threadsafe
 class ConsumerTopicStat extends ConsumerTopicStatMBean {
   private val numCumulatedMessagesPerTopic = new AtomicLong(0)
+  private val numCumulatedBytesPerTopic = new AtomicLong(0)
 
   def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
 
   def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
+
+  def getBytesPerTopic: Long = numCumulatedBytesPerTopic.get
+
+  def recordBytesPerTopic(nBytes: Long) = numCumulatedBytesPerTopic.getAndAdd(nBytes)
 }
 
 object ConsumerTopicStat extends Logging {
   private val stats = new Pool[String, ConsumerTopicStat]
+  private val allTopicStat = new ConsumerTopicStat
+  Utils.registerMBean(allTopicStat, "kafka:type=kafka.ConsumerAllTopicStat")
+
+  def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat
 
   def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
     var stat = stats.get(topic)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1351544&r1=1351543&r2=1351544&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Tue Jun 19 00:27:00 2012
@@ -61,6 +61,8 @@ private[consumer] class PartitionTopicIn
       chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
       val newOffset = fetchedOffset.addAndGet(size)
       debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
+      ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size)
+      ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size)
     }
     size
   }