You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/06/19 02:32:19 UTC
svn commit: r1351546 - in
/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer:
ConsumerIterator.scala ConsumerTopicStat.scala PartitionTopicInfo.scala
Author: junrao
Date: Tue Jun 19 00:32:18 2012
New Revision: 1351546
URL: http://svn.apache.org/viewvc?rev=1351546&view=rev
Log:
upgrade ZKClient to allow conditional updates in ZK; patched by Jun Rao; reviewed by Joel Koshy; KAFKA-337
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1351546&r1=1351545&r2=1351546&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Tue Jun 19 00:32:18 2012
@@ -47,6 +47,7 @@ class ConsumerIterator[T](private val ch
val topic = currentTopicInfo.topic
trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
+ ConsumerTopicStat.getConsumerAllTopicStat().recordMessagesPerTopic(1)
item
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala?rev=1351546&r1=1351545&r2=1351546&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala Tue Jun 19 00:32:18 2012
@@ -22,19 +22,29 @@ import kafka.utils.{Pool, Utils, threads
trait ConsumerTopicStatMBean {
def getMessagesPerTopic: Long
+ def getBytesPerTopic: Long
}
@threadsafe
class ConsumerTopicStat extends ConsumerTopicStatMBean {
private val numCumulatedMessagesPerTopic = new AtomicLong(0)
+ private val numCumulatedBytesPerTopic = new AtomicLong(0)
def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
+
+ def getBytesPerTopic: Long = numCumulatedBytesPerTopic.get
+
+ def recordBytesPerTopic(nBytes: Long) = numCumulatedBytesPerTopic.getAndAdd(nBytes)
}
object ConsumerTopicStat extends Logging {
private val stats = new Pool[String, ConsumerTopicStat]
+ private val allTopicStat = new ConsumerTopicStat
+ Utils.registerMBean(allTopicStat, "kafka:type=kafka.ConsumerAllTopicStat")
+
+ def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat
def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
var stat = stats.get(topic)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1351546&r1=1351545&r2=1351546&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Tue Jun 19 00:32:18 2012
@@ -60,6 +60,8 @@ private[consumer] class PartitionTopicIn
chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
val newOffset = fetchedOffset.addAndGet(size)
debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
+ ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size)
+ ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size)
}
size
}