You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/03 06:57:07 UTC
svn commit: r1178298 - in
/incubator/kafka/trunk/core/src/main/scala/kafka/consumer:
ConsumerIterator.scala ConsumerTopicStat.scala KafkaMessageStream.scala
ZookeeperConsumerConnector.scala
Author: junrao
Date: Mon Oct 3 04:57:06 2011
New Revision: 1178298
URL: http://svn.apache.org/viewvc?rev=1178298&view=rev
Log:
JMX bean that reports #message/sec in consumer; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-136
Added:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1178298&r1=1178297&r2=1178298&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Mon Oct 3 04:57:06 2011
@@ -29,7 +29,8 @@ import kafka.serializer.Decoder
* The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
*
*/
-class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
+class ConsumerIterator[T](private val topic: String,
+ private val channel: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val decoder: Decoder[T])
extends IteratorTemplate[T] {
@@ -47,6 +48,7 @@ class ConsumerIterator[T](private val ch
currentTopicInfo.resetConsumeOffset(consumedOffset)
if(logger.isTraceEnabled)
logger.trace("Setting consumed offset to %d".format(consumedOffset))
+ ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
decodedMessage
}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala?rev=1178298&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala Mon Oct 3 04:57:06 2011
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.concurrent.atomic.AtomicLong
+import org.apache.log4j.Logger
+import kafka.utils.{Pool, Utils, threadsafe}
+
+trait ConsumerTopicStatMBean {
+ def getMessagesPerTopic: Long
+}
+
+@threadsafe
+class ConsumerTopicStat extends ConsumerTopicStatMBean {
+ private val numCumulatedMessagesPerTopic = new AtomicLong(0)
+
+ def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
+
+ def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
+}
+
+object ConsumerTopicStat {
+ private val logger = Logger.getLogger(getClass())
+ private val stats = new Pool[String, ConsumerTopicStat]
+
+ def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
+ var stat = stats.get(topic)
+ if (stat == null) {
+ stat = new ConsumerTopicStat
+ if (stats.putIfNotExists(topic, stat) == null)
+ Utils.swallow(logger.warn, Utils.registerMBean(stat, "kafka:type=kafka.ConsumerTopicStat." + topic))
+ else
+ stat = stats.get(topic)
+ }
+ return stat
+ }
+}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala?rev=1178298&r1=1178297&r2=1178298&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala Mon Oct 3 04:57:06 2011
@@ -26,14 +26,15 @@ import kafka.serializer.{DefaultDecoder,
* All calls to elements should produce the same thread-safe iterator? Should have a seperate thread
* that feeds messages into a blocking queue for processing.
*/
-class KafkaMessageStream[T](private val queue: BlockingQueue[FetchedDataChunk],
+class KafkaMessageStream[T](val topic: String,
+ private val queue: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val decoder: Decoder[T])
extends Iterable[T] with java.lang.Iterable[T]{
private val logger = Logger.getLogger(getClass())
private val iter: ConsumerIterator[T] =
- new ConsumerIterator[T](queue, consumerTimeoutMs, decoder)
+ new ConsumerIterator[T](topic, queue, consumerTimeoutMs, decoder)
/**
* Create an iterator over messages in the stream.
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1178298&r1=1178297&r2=1178298&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Mon Oct 3 04:57:06 2011
@@ -186,7 +186,7 @@ private[kafka] class ZookeeperConsumerCo
for (threadId <- threadIdSet) {
val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
queues.put((topic, threadId), stream)
- streamList ::= new KafkaMessageStream[T](stream, config.consumerTimeoutMs, decoder)
+ streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder)
}
ret += (topic -> streamList)
logger.debug("adding topic " + topic + " and stream to map..")