You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2012/06/19 07:29:00 UTC
svn commit: r1351572 - in
/incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer:
ConsumerIterator.scala ConsumerTopicStat.scala PartitionTopicInfo.scala
Author: joestein
Date: Tue Jun 19 05:29:00 2012
New Revision: 1351572
URL: http://svn.apache.org/viewvc?rev=1351572&view=rev
Log:
KAFKA-366 add jmx beans in broker to track # bytes in consumer patch by Jun Rao reviewed by Joel Koshy merged into the 0.7.1 release
Modified:
incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
Modified: incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1351572&r1=1351571&r2=1351572&view=diff
==============================================================================
--- incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Tue Jun 19 05:29:00 2012
@@ -47,6 +47,7 @@ class ConsumerIterator[T](private val ch
val topic = currentTopicInfo.topic
trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
+ ConsumerTopicStat.getConsumerAllTopicStat().recordMessagesPerTopic(1)
item
}
Modified: incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala?rev=1351572&r1=1351571&r2=1351572&view=diff
==============================================================================
--- incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (original)
+++ incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala Tue Jun 19 05:29:00 2012
@@ -22,19 +22,29 @@ import kafka.utils.{Pool, Utils, threads
trait ConsumerTopicStatMBean {
def getMessagesPerTopic: Long
+ def getBytesPerTopic: Long
}
@threadsafe
class ConsumerTopicStat extends ConsumerTopicStatMBean {
private val numCumulatedMessagesPerTopic = new AtomicLong(0)
+ private val numCumulatedBytesPerTopic = new AtomicLong(0)
def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
+
+ def getBytesPerTopic: Long = numCumulatedBytesPerTopic.get
+
+ def recordBytesPerTopic(nBytes: Long) = numCumulatedBytesPerTopic.getAndAdd(nBytes)
}
object ConsumerTopicStat extends Logging {
private val stats = new Pool[String, ConsumerTopicStat]
+ private val allTopicStat = new ConsumerTopicStat
+ Utils.registerMBean(allTopicStat, "kafka:type=kafka.ConsumerAllTopicStat")
+
+ def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat
def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
var stat = stats.get(topic)
Modified: incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1351572&r1=1351571&r2=1351572&view=diff
==============================================================================
--- incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/branches/0.7.1/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Tue Jun 19 05:29:00 2012
@@ -61,6 +61,8 @@ private[consumer] class PartitionTopicIn
chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
val newOffset = fetchedOffset.addAndGet(size)
debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
+ ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size)
+ ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size)
}
size
}